Lumiera  0.pre.03
»edit your freedom«
scheduler-service-test.cpp
1 /*
2  SchedulerService(Test) - component integration test for the scheduler
3 
4  Copyright (C)
5  2023, Hermann Vosseler <Ichthyostega@web.de>
6 
7   **Lumiera** is free software; you can redistribute it and/or modify it
8   under the terms of the GNU General Public License as published by the
9   Free Software Foundation; either version 2 of the License, or (at your
10   option) any later version. See the file COPYING for further details.
11 
12 * *****************************************************************/
13 
19 #include "lib/test/run.hpp"
20 #include "test-chain-load.hpp"
21 #include "activity-detector.hpp"
22 #include "vault/gear/scheduler.hpp"
23 #include "lib/time/timevalue.hpp"
24 #include "lib/format-cout.hpp"
25 #include "lib/format-string.hpp"
26 #include "lib/test/transiently.hpp"
28 #include "lib/util.hpp"
29 
30 #include <thread>
31 
32 using test::Test;
33 
34 
35 namespace vault{
36 namespace gear {
37 namespace test {
38 
39  using util::max;
40  using util::_Fmt;
41  using lib::time::Time;
42  using std::this_thread::sleep_for;
43 
44  namespace {
45  Time t100us = Time{FSecs{1, 10'000}};
46  Time t200us = t100us + t100us;
47  Time t500us = t200us + t200us + t100us;
48  Time t1ms = Time{1,0};
49 
50  const uint TYPICAL_TIME_FOR_ONE_SCHEDULE_us = 3;
51  }
52 
53 
54 
55 
56  /*************************************************************************//**
57  * @test Scheduler component integration test: use the service API for
58  * state control and to add Jobs and watch processing patterns.
59  * @see SchedulerActivity_test
60  * @see SchedulerInvocation_test
61  * @see SchedulerCommutator_test
62  * @see SchedulerLoadControl_test
63  */
64  class SchedulerService_test : public Test
65  {
66 
67  virtual void
68  run (Arg)
69  {
70  seedRand();
71  simpleUsage();
72  verify_StartStop();
73  verify_LoadFactor();
74  invokeWorkFunction();
75  scheduleRenderJob();
76  processSchedule();
77  }
78 
79 
82  void
83  simpleUsage()
84  {
85  BlockFlowAlloc bFlow;
86  EngineObserver watch;
87  Scheduler scheduler{bFlow, watch};
88  CHECK (scheduler.empty());
89 
90  auto task = onetimeCrunch(4ms);
91  CHECK (1 == task.remainingInvocations());
92 
93  Job job{ task
94  , InvocationInstanceID()
95  , Time::ANYTIME
96  };
97  scheduler.defineSchedule(job)
98  .startOffset(6ms)
99  .lifeWindow(2ms)
100  .post();
101  CHECK (not scheduler.empty());
102 
103  sleep_for (3ms); // not invoked yet
104  CHECK (1 == task.remainingInvocations());
105 
106  sleep_for (20ms);
107  CHECK (0 == task.remainingInvocations());
108  } // task has been invoked
109 
110 
111 
112 
120  static void
121  postNewTask (Scheduler& scheduler, Activity& chain, Time start)
122  {
123  ActivationEvent actEvent{chain, start, start + Time{50,0}}; // add dummy deadline +50ms
124  scheduler.layer2_.postChain (actEvent, scheduler.layer1_);
125  }
126 
127 
128 
131  void
132  verify_StartStop()
133  {
134  BlockFlowAlloc bFlow;
135  EngineObserver watch;
136  Scheduler scheduler{bFlow, watch};
137  CHECK (scheduler.empty());
138 
139  Activity dummy{Activity::FEED};
140  auto postIt = [&] { postNewTask (scheduler, dummy, RealClock::now()+t200us); };
141 
142  scheduler.ignite();
143  CHECK (not scheduler.empty());// repeated »tick« task enlisted....
144 
145  postIt();
146  CHECK (not scheduler.empty());
147 
148  scheduler.terminateProcessing();
149  CHECK (scheduler.empty());
150 
151  postIt();
152  postIt();
153  scheduler.ignite();
154  CHECK (not scheduler.empty());
155  //... and just walk away => scheduler unwinds cleanly from destructor
156  }// Note: BlockFlow and WorkForce unwinding is covered in dedicated tests
157 
158 
159 
175  void
176  verify_LoadFactor()
177  {
178  MARK_TEST_FUN
179  BlockFlowAlloc bFlow;
180  EngineObserver watch;
181  Scheduler scheduler{bFlow, watch};
182  CHECK (scheduler.empty());
183 
184  // use a single FEED as content
185  Activity dummy{Activity::FEED};
186 
187  auto anchor = RealClock::now();
188  auto offset = [&](Time when =RealClock::now()){ return _raw(when) - _raw(anchor); };
189 
190  auto createLoad = [&](Offset start, uint cnt)
191  { // use internal API (this test is declared as friend)
192  for (uint i=0; i<cnt; ++i) // flood the queue
193  postNewTask (scheduler, dummy, anchor + start + TimeValue{i});
194  };
195 
196 
197  auto LOAD_PEAK_DURATION_us = 2000;
198  auto fatPackage = LOAD_PEAK_DURATION_us/TYPICAL_TIME_FOR_ONE_SCHEDULE_us;
199 
200  createLoad (Offset{Time{ 5,0}}, fatPackage);
201  createLoad (Offset{Time{15,0}}, fatPackage);
202 
203  scheduler.ignite();
204  cout << "Timing : start-up required.."<<offset()<<" µs"<<endl;
205 
206  // now watch change of load and look out for two peaks....
207  uint peak1_s =0;
208  uint peak1_dur=0;
209  double peak1_max=0;
210  uint peak2_s =0;
211  uint peak2_dur=0;
212  double peak2_max=0;
213 
214  uint phase=0;
215  _Fmt row{"%6d | Load: %5.3f Head:%5d Lag:%6d\n"};
216 
217  while (not scheduler.empty()) // should fall empty at end
218  {
219  sleep_for(50us);
220  double load = scheduler.getLoadIndicator();
221 
222  switch (phase) {
223  case 0:
224  if (load > 1.0)
225  {
226  ++phase;
227  peak1_s = offset();
228  }
229  break;
230  case 1:
231  peak1_max = max (load, peak1_max);
232  if (load < 1.0)
233  {
234  ++phase;
235  peak1_dur = offset() - peak1_s;
236  }
237  break;
238  case 2:
239  if (load > 1.0)
240  {
241  ++phase;
242  peak2_s = offset();
243  }
244  break;
245  case 3:
246  peak2_max = max (load, peak2_max);
247  if (load < 1.0)
248  {
249  ++phase;
250  peak2_dur = offset() - peak2_s;
251  }
252  break;
253  }
254  cout << row % offset() % load
255  % offset(scheduler.layer1_.headTime())
256  % scheduler.loadControl_.averageLag();
257  }
258  uint done = offset();
259 
260  //--------Summary-Table------------------------------
261  _Fmt peak{"\nPeak %d ....... %5d +%dµs %34tmax=%3.1f"};
262  cout << "-------+-------------+----------+----------"
263  << "\n\n"
264  << peak % 1 % peak1_s % peak1_dur % peak1_max
265  << peak % 2 % peak2_s % peak2_dur % peak2_max
266  << "\nTick ....... "<<done
267  <<endl;
268 
269  CHECK (phase == 4);
270  CHECK (peak1_s > 5000); // first peak was scheduled at 5ms
271  CHECK (peak1_s < 10000);
272  CHECK (peak2_s > 15000); // second peak was scheduled at 15ms
273  CHECK (peak2_s < 20000);
274  CHECK (peak1_max > 2.0);
275  CHECK (peak2_max > 2.0);
276 
277  CHECK (done > 50000); // »Tick« period is 50ms
278  // and this tick should determine end of timeline
279 
280  cout << "\nwaiting for shutdown of WorkForce";
281  while (scheduler.workForce_.size() > 0)
282  {
283  sleep_for(10ms);
284  cout << "." << std::flush;
285  }
286  uint shutdown = offset();
287  cout << "\nShutdown after "<<shutdown / 1.0e6<<"sec"<<endl;
288  CHECK (shutdown > 2.0e6);
289  }
290 
291 
292 
320  void
321  invokeWorkFunction()
322  {
323  MARK_TEST_FUN
324  BlockFlowAlloc bFlow;
325  EngineObserver watch;
326  Scheduler scheduler{bFlow, watch};
327 
328  ActivityDetector detector;
329  Activity& probe = detector.buildActivationProbe ("testProbe");
330 
331  TimeVar start;
332  int64_t delay_us;
333  int64_t slip_us;
334  activity::Proc res;
335 
336  auto post = [&](Time start)
337  { // this test class is declared friend to get a backdoor into Scheduler internals...
338  scheduler.layer2_.acquireGoomingToken();
339  postNewTask (scheduler, probe, start);
340  };
341 
342  auto pullWork = [&] {
343  delay_us = lib::test::benchmarkTime([&]{ res = scheduler.doWork(); });
344  slip_us = _raw(detector.invokeTime(probe)) - _raw(start);
345  cout << "res:"<<res<<" delay="<<delay_us<<"µs slip="<<slip_us<<"µs"<<endl;
346  };
347 
348 
349  auto wasClose = [](TimeValue a, TimeValue b)
350  { // 500µs are considered "close"
351  return Duration{Offset{a,b}} < Duration{FSecs{1,2000}};
352  };
353  auto wasInvoked = [&](Time start)
354  {
355  Time invoked = detector.invokeTime (probe);
356  return invoked >= start
357  and wasClose (invoked, start);
358  };
359 
360 
361  cout << "pullWork() on empty queue..."<<endl;
362  pullWork(); // Call the work-Function on empty Scheduler queue
363  CHECK (activity::WAIT == res); // the result instructs this thread to go to sleep immediately
364 
365 
366  cout << "Due at pullWork()..."<<endl;
367  TimeVar now = RealClock::now();
368  start = now + t100us; // Set a schedule 100ms ahead of "now"
369  post (start);
370  CHECK (not scheduler.empty()); // was enqueued
371  CHECK (not wasInvoked(start)); // ...but not activated yet
372 
373  sleep_for (100us); // wait beyond the planned start point (typically waits ~150µs or more)
374  pullWork();
375  CHECK (wasInvoked(start));
376  CHECK (slip_us < 300); // Note: typically there is a slip of 100..200µs, because sleep waits longer
377  CHECK (scheduler.empty()); // The scheduler is empty now and this thread will go to sleep,
378  CHECK (delay_us < 20200); // however the sleep-cycle is first re-shuffled by a wait between 0 ... 20ms
379  CHECK (activity::PASS == res); // this thread is instructed to check back once
380  pullWork();
381  CHECK (activity::WAIT == res); // ...yet since the queue is still empty, it is sent immediately to sleep
382  CHECK (delay_us < 40);
383 
384 
385  cout << "next some time ahead => up-front delay"<<endl;
386  now = RealClock::now();
387  start = now + t500us; // Set a schedule significantly into the future...
388  post (start);
389  CHECK (not scheduler.empty());
390 
391  pullWork(); // ...and invoke the work-Function immediately "now"
392  CHECK (activity::PASS == res); // Result: this thread was kept in sleep in the work-Function
393  CHECK (not wasInvoked(start)); // but the next dispatch did not happen yet; we are instructed to re-invoke immediately
394  CHECK (delay_us > 500); // this proves that there was a delay to wait for the next schedule
395  CHECK (delay_us < 1000);
396  pullWork(); // if we now re-invoke the work-Function as instructed...
397  CHECK (wasInvoked(start)); // then the next schedule is already slightly overdue and immediately invoked
398  CHECK (scheduler.empty()); // the queue is empty and thus this thread will be sent to sleep
399  CHECK (delay_us < 20200); // but beforehand the sleep-cycle is re-shuffled by a wait between 0 ... 20ms
400  CHECK (slip_us < 300);
401  CHECK (activity::PASS == res); // instruction to check back once
402  pullWork();
403  CHECK (activity::WAIT == res); // but next call will send this thread to sleep right away
404  CHECK (delay_us < 40);
405 
406 
407  cout << "follow-up with some distance => follow-up delay"<<endl;
408  now = RealClock::now();
409  start = now + t100us;
410  post (start); // This time the schedule is set to be "soon"
411  post (start+t1ms); // But another schedule is placed 1ms behind
412  sleep_for (100us); // wait for "soon" to pass...
413  pullWork();
414  CHECK (wasInvoked(start)); // Result: the first invocation happened immediately
415  CHECK (slip_us < 300);
416  CHECK (delay_us > 900); // yet this thread was afterwards kept in sleep to await the next task;
417  CHECK (activity::PASS == res); // returns instruction to re-invoke immediately
418  CHECK (not scheduler.empty()); // since there is still work in the queue
419 
420  start += t1ms; // (just re-adjust the reference point to calculate slip_us)
421  pullWork(); // re-invoke immediately as instructed
422  CHECK (wasInvoked(start)); // Result: also the next Activity has been dispatched
423  CHECK (slip_us < 400); // not much slip
424  CHECK (delay_us < 20200); // ...and the post-delay is used to re-shuffle the sleep cycle as usual
425  CHECK (activity::PASS == res); // since queue is empty, we will call back once...
426  CHECK (scheduler.empty());
427  pullWork();
428  CHECK (activity::WAIT == res); // and then go to sleep.
429 
430 
431  cout << "already tended-next => re-target capacity"<<endl;
432  now = RealClock::now();
433  start = now + t500us; // Set the next schedule with some distance...
434  post (start);
435 
436  // Access scheduler internals (as friend)
437  CHECK (start == scheduler.layer1_.headTime()); // next schedule indeed appears as next-head
438  CHECK (not scheduler.loadControl_.tendedNext(start)); // but this next time was not yet marked as "tended"
439 
440  scheduler.loadControl_.tendNext(start); // manipulate scheduler to mark next-head as "tended"
441  CHECK ( scheduler.loadControl_.tendedNext(start));
442 
443  CHECK (start == scheduler.layer1_.headTime()); // other state still the same
444  CHECK (not scheduler.empty());
445 
446  pullWork();
447  CHECK (not wasInvoked(start)); // since next-head was marked as "tended"...
448  CHECK (not scheduler.empty()); // ...this thread is not used to dispatch it
449  CHECK (delay_us < 6000); // rather it is re-focussed as free capacity within WORK_HORIZON
450  }
451 
452 
453 
454 
466  void
467  scheduleRenderJob()
468  {
469  BlockFlowAlloc bFlow;
470  EngineObserver watch;
471  Scheduler scheduler{bFlow, watch};
472 
473  // prevent scale-up of the Scheuler's WorkForce
475 
476  Time nominal{7,7};
477  Time start{0,1};
478  Time dead{0,10};
479 
480  ActivityDetector detector;
481  Job testJob{detector.buildMockJob("testJob", nominal, 1337)};
482 
483  CHECK (scheduler.empty());
484 
485  // use the public Render-Job builder API
486  scheduler.defineSchedule(testJob)
487  .startOffset(400us)
488  .lifeWindow (2ms)
489  .post();
490 
491  CHECK (not scheduler.empty());
492 
493  // cause the new entry to migrate to the priority queue...
494  scheduler.layer2_.maybeFeed(scheduler.layer1_);
495 
496  // investigate the generated ActivationEvent at queue head
497  auto entry = scheduler.layer1_.peekHead();
498  auto now = RealClock::now();
499 
500  CHECK (entry.activity->is(Activity::POST));
501  CHECK (entry.activity->next->is(Activity::GATE));
502  CHECK (entry.activity->next->next->is(Activity::WORKSTART));
503  CHECK (entry.activity->next->next->next->is(Activity::INVOKE));
504  CHECK (entry.startTime() - now < _uTicks( 400us));
505  CHECK (entry.deathTime() - now < _uTicks(2400us));
506  CHECK (entry.manifestation == 0);
507  CHECK (entry.isCompulsory == false);
508 
509 
510  sleep_for(400us); // wait to be sure the new entry has reached maturity
511  detector.incrementSeq(); // mark this point in the detector-log...
512 
513  // Explicitly invoke the work-Function (normally done by the workers)
514  CHECK (activity::PASS == scheduler.doWork());
515 
516  CHECK (detector.verifySeqIncrement(1)
517  .beforeInvocation("testJob").arg("7.007", 1337));
518 
519 // cout << detector.showLog()<<endl; // HINT: use this for investigation...
520  }
521 
522 
523 
524 
525 
537  void
539  {
541  TestChainLoad<16> testLoad{64};
543  .buildTopology();
544 
545  auto stats = testLoad.computeGraphStatistics();
546  cout << _Fmt{"Test-Load: Nodes: %d Levels: %d ∅Node/Level: %3.1f Forks: %d Joins: %d"}
547  % stats.nodes
548  % stats.levels
549  % stats.indicators[STAT_NODE].pL
550  % stats.indicators[STAT_FORK].cnt
551  % stats.indicators[STAT_JOIN].cnt
552  << endl;
553 
554  // while building the calculation-plan graph
555  // node hashes were computed, observing dependencies
556  size_t expectedHash = testLoad.getHash();
557 
558  // some jobs/nodes are marked with a weight-step
559  // these can be instructed to spend some CPU time
560  auto LOAD_BASE = 500us;
561  testLoad.performGraphSynchronously(LOAD_BASE);
562  CHECK (testLoad.getHash() == expectedHash);
563 
564  double referenceTime = testLoad.calcRuntimeReference(LOAD_BASE);
565  cout << "refTime(singleThr): "<<referenceTime/1000<<"ms"<<endl;
566 
567 
568  // Perform through Scheduler----------
569  BlockFlowAlloc bFlow;
570  EngineObserver watch;
571  Scheduler scheduler{bFlow, watch};
572 
573  double performanceTime =
574  testLoad.setupSchedule(scheduler)
575  .withLoadTimeBase(LOAD_BASE)
576  .withJobDeadline(30ms)
577  .launch_and_wait();
578 
579  cout << "runTime(Scheduler): "<<performanceTime/1000<<"ms"<<endl;
580 
581  // invocation through Scheduler has reproduced all node hashes
582  CHECK (testLoad.getHash() == expectedHash);
583 
584  // due to the massive load burst at end, Scheduler falls behind plan
585  CHECK (performanceTime < 2*referenceTime);
586  // typical values: refTime ≡ ~35ms, runTime ≡ ~45ms
587  }
588  };
589 
590 
592  LAUNCHER (SchedulerService_test, "unit engine");
593 
594 
595 
596 }}} // namespace vault::gear::test
const StatKey STAT_NODE
all nodes
signal start of some processing and transition grooming mode ⟼ *work mode
Definition: activity.hpp:233
Automatically use custom string conversion in C++ stream output.
#define TRANSIENTLY(_OO_)
Macro to simplify capturing assignments.
AnyPair entry(Query< TY > const &query, typename WrapReturn< TY >::Wrapper &obj)
helper to simplify creating mock table entries, wrapped correctly
Definition: run.hpp:40
Front-end for printf-style string template interpolation.
TestChainLoad && buildTopology()
Use current configuration and seed to (re)build Node connectivity.
Generate synthetic computation load for Scheduler performance tests.
Functions to perform (multithreaded) timing measurement on a given functor.
A Generator for synthetic Render Jobs for Scheduler load testing.
A front-end for using printf-style formatting.
Lumiera&#39;s internal time value datatype.
Definition: timevalue.hpp:299
post a message providing a chain of further time-bound Activities
Definition: activity.hpp:237
Abstract Base Class for all testcases.
Definition: run.hpp:53
»Scheduler-Service« : coordinate render activities.
Definition: scheduler.hpp:213
Service for coordination and dispatch of render activities.
Diagnostic context to record and evaluate activations within the Scheduler.
#define MARK_TEST_FUN
Macro to mark the current test function in STDOUT.
Simplistic test class runner.
Tiny helper functions and shortcuts to be used everywhere Consider this header to be effectively incl...
TestChainLoad && configureShape_chain_loadBursts()
preconfigured topology: single graph with massive »load bursts«
const StatKey STAT_JOIN
joining node
uint incrementSeq()
increment the internal invocation sequence number
probe window + count-down; activate next Activity, else re-schedule
Definition: activity.hpp:236
Diagnostic setup to instrument and observe Activity activations.
Statistic computeGraphStatistics()
Operator on TestChainLoad to evaluate current graph connectivity.
Test helper to perform temporary manipulations within a test scope.
dispatch a JobFunctor into a worker thread
Definition: activity.hpp:232
const StatKey STAT_FORK
forking node
static size_t COMPUTATION_CAPACITY
Nominal »full size« of a pool of concurrent workers.
Definition: work-force.hpp:106
Individual frame rendering task, forwarding to a closure.
Definition: job.h:268
a family of time value like entities and their relationships.
Pool of worker threads for rendering.
Definition: work-force.hpp:240
ActivityMatch & arg(ARGS const &...args)
qualifier: additionally match the function arguments
Vault-Layer implementation namespace root.
Collector and aggregator for performance data.