44
55from bluesky_stomp .messaging import StompClient
66from bluesky_stomp .models import Broker , DestinationBase , MessageTopic
7- from dodal .common .beamlines .beamline_utils import (
8- get_path_provider ,
9- set_path_provider ,
10- )
117
128from blueapi .cli .scratch import get_python_environment
13- from blueapi .client .numtracker import NumtrackerClient
149from blueapi .config import ApplicationConfig , OIDCConfig , StompConfig
1510from blueapi .core .context import BlueskyContext
1611from blueapi .core .event import EventStream
2318 TaskRequest ,
2419 WorkerTask ,
2520)
26- from blueapi .utils .invalid_config_error import InvalidConfigError
27- from blueapi .utils .path_provider import StartDocumentPathProvider
2821from blueapi .worker .event import TaskStatusEnum , WorkerState
2922from blueapi .worker .task import Task
3023from blueapi .worker .task_worker import TaskWorker , TrackableTask
@@ -48,14 +41,10 @@ def set_config(new_config: ApplicationConfig):
4841
4942@cache
5043def context () -> BlueskyContext :
51- ctx = BlueskyContext ()
44+ ctx = BlueskyContext (config () )
5245 return ctx
5346
5447
55- def configure_context () -> None :
56- context ().with_config (config ().env )
57-
58-
5948@cache
6049def worker () -> TaskWorker :
6150 worker = TaskWorker (
@@ -96,76 +85,23 @@ def stomp_client() -> StompClient | None:
9685 return None
9786
9887
99- @cache
100- def numtracker_client () -> NumtrackerClient | None :
101- conf = config ()
102- if conf .numtracker is not None :
103- if conf .env .metadata is not None :
104- return NumtrackerClient (url = conf .numtracker .url )
105- else :
106- raise InvalidConfigError (
107- "Numtracker url has been configured, but there is no instrument or"
108- " instrument_session in the environment metadata"
109- )
110- else :
111- return None
112-
113-
114- def _update_scan_num (md : dict [str , Any ]) -> int :
115- numtracker = numtracker_client ()
116- if numtracker is not None :
117- scan = numtracker .create_scan (md ["instrument_session" ], md ["instrument" ])
118- md ["data_session_directory" ] = str (scan .scan .directory .path )
119- md ["scan_file" ] = scan .scan .scan_file
120- return scan .scan .scan_number
121- else :
122- raise InvalidConfigError (
123- "Blueapi was configured to talk to numtracker but numtracker is not"
124- "configured, this should not happen, please contact the DAQ team"
125- )
126-
127-
12888def setup (config : ApplicationConfig ) -> None :
12989 """Creates and starts a worker with supplied config"""
13090 set_config (config )
13191 set_up_logging (config .logging )
13292
13393 # Eagerly initialize worker and messaging connection
13494 worker ()
135-
136- # if numtracker is configured, use a StartDocumentPathProvider
137- if numtracker_client () is not None :
138- context ().run_engine .scan_id_source = _update_scan_num
139- _hook_run_engine_and_path_provider ()
140-
141- configure_context ()
142-
143- if numtracker_client () is not None and not isinstance (
144- get_path_provider (), StartDocumentPathProvider
145- ):
146- raise InvalidConfigError (
147- "Numtracker has been configured but a path provider was imported"
148- " with the devices. Remove this path provider to use numtracker."
149- )
150-
15195 stomp_client ()
15296
15397
154- def _hook_run_engine_and_path_provider () -> None :
155- path_provider = StartDocumentPathProvider ()
156- set_path_provider (path_provider )
157- run_engine = context ().run_engine
158- run_engine .subscribe (path_provider .update_run , "start" )
159-
160-
16198def teardown () -> None :
16299 worker ().stop ()
163100 if (stomp_client_ref := stomp_client ()) is not None :
164101 stomp_client_ref .disconnect ()
165102 context .cache_clear ()
166103 worker .cache_clear ()
167104 stomp_client .cache_clear ()
168- numtracker_client .cache_clear ()
169105
170106
171107def _publish_event_streams (
@@ -224,20 +160,13 @@ def begin_task(
224160 task : WorkerTask , pass_through_headers : Mapping [str , str ] | None = None
225161) -> WorkerTask :
226162 """Trigger a task. Will fail if the worker is busy"""
227- if pass_through_headers :
228- _try_configure_numtracker (pass_through_headers )
229-
163+ if nt := context ().numtracker :
164+ nt .set_headers (pass_through_headers or {})
230165 if task .task_id is not None :
231166 worker ().begin_task (task .task_id )
232167 return task
233168
234169
235- def _try_configure_numtracker (pass_through_headers : Mapping [str , str ]) -> None :
236- numtracker = numtracker_client ()
237- if numtracker is not None :
238- numtracker .set_headers (pass_through_headers )
239-
240-
241170def get_tasks_by_status (status : TaskStatusEnum ) -> list [TrackableTask ]:
242171 """Retrieve a list of tasks based on their status."""
243172 return worker ().get_tasks_by_status (status )
0 commit comments