1515#include " Framework/CompletionPolicyHelpers.h"
1616#include " Framework/DataRelayer.h"
1717#include " Framework/DataProcessingHeader.h"
18+ #include " Framework/DataProcessingStates.h"
19+ #include " Framework/DataProcessingStats.h"
20+ #include " Framework/DeviceState.h"
21+ #include " Framework/DriverConfig.h"
22+ #include " Framework/ServiceRegistryHelpers.h"
23+ #include " Framework/TimingHelpers.h"
1824#include < Monitoring/Monitoring.h>
1925#include < fairmq/TransportFactory.h>
2026#include < cstring>
2127#include < vector>
28+ #include < uv.h>
2229
2330using Monitoring = o2::monitoring::Monitoring;
2431using namespace o2 ::framework;
2532using DataHeader = o2::header::DataHeader;
2633using Stack = o2::header::Stack;
2734using RecordAction = o2::framework::DataRelayer::RecordAction;
2835
36+ struct BenchmarkServices {
37+ Monitoring monitoring;
38+ const DriverConfig driverConfig{.batch = false };
39+ DataProcessingStates states{
40+ TimingHelpers::defaultRealtimeBaseConfigurator (0 , uv_default_loop ()),
41+ TimingHelpers::defaultCPUTimeConfigurator (uv_default_loop ())};
42+ DataProcessingStats stats{
43+ TimingHelpers::defaultRealtimeBaseConfigurator (0 , uv_default_loop ()),
44+ TimingHelpers::defaultCPUTimeConfigurator (uv_default_loop ()),
45+ {}};
46+ DeviceState deviceState;
47+ ServiceRegistry registry;
48+
49+ ServiceRegistryRef ref ()
50+ {
51+ using MetricSpec = DataProcessingStats::MetricSpec;
52+ int quickUpdateInterval = 1 ;
53+ std::vector<MetricSpec> specs{
54+ MetricSpec{.name = " malformed_inputs" , .metricId = static_cast <short >(ProcessingStatsId::MALFORMED_INPUTS), .minPublishInterval = quickUpdateInterval},
55+ MetricSpec{.name = " dropped_computations" , .metricId = static_cast <short >(ProcessingStatsId::DROPPED_COMPUTATIONS), .minPublishInterval = quickUpdateInterval},
56+ MetricSpec{.name = " dropped_incoming_messages" , .metricId = static_cast <short >(ProcessingStatsId::DROPPED_INCOMING_MESSAGES), .minPublishInterval = quickUpdateInterval},
57+ MetricSpec{.name = " relayed_messages" , .metricId = static_cast <short >(ProcessingStatsId::RELAYED_MESSAGES), .minPublishInterval = quickUpdateInterval}};
58+ for (auto & spec : specs) {
59+ stats.registerMetric (spec);
60+ }
61+
62+ ServiceRegistryRef r{registry};
63+ r.registerService (ServiceRegistryHelpers::handleForService<Monitoring>(&monitoring));
64+ r.registerService (ServiceRegistryHelpers::handleForService<DataProcessingStates>(&states));
65+ r.registerService (ServiceRegistryHelpers::handleForService<DataProcessingStats>(&stats));
66+ r.registerService (ServiceRegistryHelpers::handleForService<DriverConfig const >(&driverConfig));
67+ r.registerService (ServiceRegistryHelpers::handleForService<DeviceState>(&deviceState));
68+ return r;
69+ }
70+ };
71+
2972// a simple benchmark of the contribution of the pure message creation
3073// this was important when the benchmarks below included the message
3174// creation inside the benchmark loop, its somewhat obsolete now but
@@ -54,7 +97,7 @@ BENCHMARK(BM_RelayMessageCreation);
5497// and the subsequent InputRecord is immediately requested.
5598static void BM_RelaySingleSlot (benchmark::State& state)
5699{
57- Monitoring metrics ;
100+ BenchmarkServices services ;
58101 InputSpec spec{" clusters" , " TPC" , " CLUSTERS" };
59102
60103 std::vector<InputRoute> inputs = {
@@ -64,8 +107,7 @@ static void BM_RelaySingleSlot(benchmark::State& state)
64107 std::vector<InputChannelInfo> infos{1 };
65108 TimesliceIndex index{1 , infos};
66109 auto policy = CompletionPolicyHelpers::consumeWhenAny ();
67- ServiceRegistry registry;
68- DataRelayer relayer (policy, inputs, index, {registry}, -1 );
110+ DataRelayer relayer (policy, inputs, index, services.ref (), -1 );
69111 relayer.setPipelineLength (4 );
70112
71113 // Let's create a dummy O2 Message with two headers in the stack:
@@ -106,7 +148,7 @@ BENCHMARK(BM_RelaySingleSlot);
106148// This one will simulate a single input.
107149static void BM_RelayMultipleSlots (benchmark::State& state)
108150{
109- Monitoring metrics ;
151+ BenchmarkServices services ;
110152 InputSpec spec{" clusters" , " TPC" , " CLUSTERS" };
111153
112154 std::vector<InputRoute> inputs = {
@@ -117,8 +159,7 @@ static void BM_RelayMultipleSlots(benchmark::State& state)
117159 TimesliceIndex index{1 , infos};
118160
119161 auto policy = CompletionPolicyHelpers::consumeWhenAny ();
120- ServiceRegistry registry;
121- DataRelayer relayer (policy, inputs, index, {registry}, -1 );
162+ DataRelayer relayer (policy, inputs, index, services.ref (), -1 );
122163 relayer.setPipelineLength (4 );
123164
124165 // Let's create a dummy O2 Message with two headers in the stack:
@@ -163,7 +204,7 @@ BENCHMARK(BM_RelayMultipleSlots);
163204// / In this case we have a record with two entries
164205static void BM_RelayMultipleRoutes (benchmark::State& state)
165206{
166- Monitoring metrics ;
207+ BenchmarkServices services ;
167208 InputSpec spec1{" clusters" , " TPC" , " CLUSTERS" };
168209 InputSpec spec2{" tracks" , " TPC" , " TRACKS" };
169210
@@ -176,8 +217,7 @@ static void BM_RelayMultipleRoutes(benchmark::State& state)
176217 TimesliceIndex index{1 , infos};
177218
178219 auto policy = CompletionPolicyHelpers::consumeWhenAny ();
179- ServiceRegistry registry;
180- DataRelayer relayer (policy, inputs, index, {registry}, -1 );
220+ DataRelayer relayer (policy, inputs, index, services.ref (), -1 );
181221 relayer.setPipelineLength (4 );
182222
183223 // Let's create a dummy O2 Message with two headers in the stack:
@@ -241,7 +281,7 @@ BENCHMARK(BM_RelayMultipleRoutes);
241281// / In this case we have a record with two entries
242282static void BM_RelaySplitParts (benchmark::State& state)
243283{
244- Monitoring metrics ;
284+ BenchmarkServices services ;
245285 InputSpec spec1{" clusters" , " TPC" , " CLUSTERS" };
246286
247287 std::vector<InputRoute> inputs = {
@@ -253,8 +293,7 @@ static void BM_RelaySplitParts(benchmark::State& state)
253293 TimesliceIndex index{1 , infos};
254294
255295 auto policy = CompletionPolicyHelpers::consumeWhenAny ();
256- ServiceRegistry registry;
257- DataRelayer relayer (policy, inputs, index, {registry}, -1 );
296+ DataRelayer relayer (policy, inputs, index, services.ref (), -1 );
258297 relayer.setPipelineLength (4 );
259298
260299 // Let's create a dummy O2 Message with two headers in the stack:
@@ -301,7 +340,7 @@ BENCHMARK(BM_RelaySplitParts)->Arg(10)->Arg(100)->Arg(1000);
301340
302341static void BM_RelayMultiplePayloads (benchmark::State& state)
303342{
304- Monitoring metrics ;
343+ BenchmarkServices services ;
305344 InputSpec spec1{" clusters" , " TPC" , " CLUSTERS" };
306345
307346 std::vector<InputRoute> inputs = {
@@ -313,8 +352,7 @@ static void BM_RelayMultiplePayloads(benchmark::State& state)
313352 TimesliceIndex index{1 , infos};
314353
315354 auto policy = CompletionPolicyHelpers::consumeWhenAny ();
316- ServiceRegistry registry;
317- DataRelayer relayer (policy, inputs, index, {registry}, -1 );
355+ DataRelayer relayer (policy, inputs, index, services.ref (), -1 );
318356 relayer.setPipelineLength (4 );
319357
320358 // DataHeader matching the one provided in the input
0 commit comments