Lumiera  0.pre.03
»edit your freedom«
scheduler-commutator-test.cpp
Go to the documentation of this file.
1 /*
2  SchedulerCommutator(Test) - verify dependent activity processing in the scheduler
3 
4  Copyright (C)
5  2023, Hermann Vosseler <Ichthyostega@web.de>
6 
7   **Lumiera** is free software; you can redistribute it and/or modify it
8   under the terms of the GNU General Public License as published by the
9   Free Software Foundation; either version 2 of the License, or (at your
10   option) any later version. See the file COPYING for further details.
11 
12 * *****************************************************************/
13 
19 #include "lib/test/run.hpp"
20 #include "activity-detector.hpp"
23 #include "lib/time/timevalue.hpp"
24 #include "lib/format-cout.hpp"
25 #include "lib/thread.hpp"
26 #include "lib/util.hpp"
27 
28 #include <chrono>
29 
30 using test::Test;
32 
33 
34 namespace vault{
35 namespace gear {
36 namespace test {
37 
38  using lib::time::Time;
39  using lib::time::FSecs;
40  using std::atomic_bool;
41  using lib::ThreadHookable;
43  using util::isSameObject;
44 
45  using std::unique_ptr;
46  using std::make_unique;
47  using std::this_thread::yield;
48  using std::this_thread::sleep_for;
49  using std::chrono_literals::operator ""us;
50 
51 
52  namespace { // Load test parameters
53  const size_t NUM_THREADS = 20;
54  const size_t REPETITIONS = 100;
55  }
56 
57 
58 
59 
60 
61  /******************************************************************/
82  {
83 
84  virtual void
85  run (Arg)
86  {
87  seedRand();
97  }
98 
99 
102  void
104  { MARK_TEST_FUN
105  SchedulerInvocation queue;
106  SchedulerCommutator sched;
107  Activity activity;
108  Time when{3,4};
109  Time dead{5,6};
110 
111  // use the ActivityDetector for test instrumentation...
112  ActivityDetector detector;
113  Time now = detector.executionCtx.getSchedTime();
114  CHECK (now < dead);
115 
116  // prepare scenario: some activity is enqueued
117  queue.instruct ({activity, when, dead});
118 
119  // retrieve one event from queue and dispatch it
120  ActivationEvent act = sched.findWork(queue,now);
121  ActivityLang::dispatchChain (act, detector.executionCtx);
122 
123  CHECK (detector.verifyInvocation("CTX-tick").arg(now));
124  CHECK (queue.empty());
125 
126 // cout << detector.showLog()<<endl; // HINT: use this for investigation...
127  }
128 
129 
130 
133  void
135  { MARK_TEST_FUN
136 
137  SchedulerCommutator sched;
138 
139  auto myself = std::this_thread::get_id();
140  CHECK (not sched.holdsGroomingToken (myself));
141 
142  CHECK (sched.acquireGoomingToken());
143  CHECK ( sched.holdsGroomingToken (myself));
144 
145  sched.dropGroomingToken();
146  CHECK (not sched.holdsGroomingToken (myself));
148  }
149 
151  static void
153  {
154  auto myself = std::this_thread::get_id();
155  CHECK (not sched.holdsGroomingToken(myself));
156  CHECK (sched.acquireGoomingToken());
157  sched.dropGroomingToken();
158  }
159 
160 
161 
166  void
168  { MARK_TEST_FUN
169 
170  SchedulerCommutator sched;
171 
172  // Case-1: if a thread already holds the token....
173  CHECK (sched.acquireGoomingToken());
174  CHECK (sched.holdsGroomingToken (thisThread()));
175  {
176  auto guard = sched.requireGroomingTokenHere();
177  CHECK (sched.holdsGroomingToken (thisThread()));
178  }// leave scope -> nothing happens in this case
179  CHECK (sched.holdsGroomingToken (thisThread()));
180 
181  // Case-2: when not holding the token...
182  sched.dropGroomingToken();
183  {
184  // acquire automatically (this may block)
185  auto guard = sched.requireGroomingTokenHere();
186  CHECK (sched.holdsGroomingToken (thisThread()));
187  }// leave scope -> dropped automatically
188  CHECK (not sched.holdsGroomingToken (thisThread()));
189 
191  }
192 
193 
194 
200  void
202  { MARK_TEST_FUN
203 
204  SchedulerCommutator sched;
205 
206  size_t checkSum{0};
207  auto pause_and_sum = [&](size_t i) -> size_t
208  {
209  auto oldSum = checkSum;
210  sleep_for (500us);
211  checkSum = oldSum + i;
212  return 1;
213  };
214  auto protected_sum = [&](size_t i) -> size_t
215  {
216  while (not sched.acquireGoomingToken())
217  yield(); // contend until getting exclusive access
218  pause_and_sum(i);
219  sched.dropGroomingToken();
220  return 1;
221  };
222 
223  threadBenchmark<NUM_THREADS> (pause_and_sum, REPETITIONS);
224 
225  size_t brokenSum = checkSum;
226  checkSum = 0;
227 
228  threadBenchmark<NUM_THREADS> (protected_sum, REPETITIONS);
229 
230  CHECK (brokenSum < checkSum);
231  CHECK (checkSum = NUM_THREADS * REPETITIONS*(REPETITIONS-1)/2);
233  }
234 
235 
236 
237  atomic_bool stopTheHog_{false};
238  unique_ptr<ThreadHookable> groomingHog_;
239  using Launch = ThreadHookable::Launch;
240 
242  void
244  {
245  REQUIRE (not groomingHog_);
246  if (sched.holdsGroomingToken(std::this_thread::get_id()))
247  sched.dropGroomingToken();
248 
249  stopTheHog_ = false;
250  groomingHog_ = make_unique<ThreadHookable>(
251  Launch{[&]{
252  CHECK (sched.acquireGoomingToken());
253  do sleep_for (100us);
254  while (not stopTheHog_);
255  sched.dropGroomingToken();
256  }}
257  .atExit([&](ThreadWrapper& handle)
258  {
260  groomingHog_.reset();
261  })
262  .threadID("grooming-hog"));
263  sleep_for (500us);
264  ENSURE (groomingHog_);
265  }
266 
268  void
270  {
271  stopTheHog_ = true;
272  while (groomingHog_)
273  yield();
274  }
275 
276 
277 
280  void
282  { MARK_TEST_FUN
283 
284  SchedulerInvocation queue;
285  SchedulerCommutator sched;
286 
287  Time t1{10,0};
288  Time t2{20,0};
289  Time t3{30,0};
290  Time now{t2};
291 
292  CHECK (not sched.findWork (queue, now)); // empty queue, no work found
293 
294  Activity a1{1u,1u};
295  Activity a2{2u,2u};
296  Activity a3{3u,3u};
297 
298  queue.instruct ({a3, t3}); // activity scheduled into the future
299  CHECK (not sched.findWork (queue, now)); // ... not found with time `now`
300  CHECK (t3 == queue.headTime());
301 
302  queue.instruct ({a1, t1});
303  CHECK (isSameObject (a1, *sched.findWork(queue, now))); // but past activity is found
304  CHECK (not sched.findWork (queue, now)); // activity was retrieved
305 
306  queue.instruct ({a2, t2});
307  CHECK (isSameObject (a2, *sched.findWork(queue, now))); // activity scheduled for `now` is found
308  CHECK (not sched.findWork (queue, now)); // nothing more found for `now`
309  CHECK (t3 == queue.headTime());
310  CHECK (not queue.empty()); // yet the future activity a3 is still queued...
311 
312  CHECK (isSameObject (a3, *sched.findWork(queue, t3))); // ...and will be found when querying "later"
313  CHECK (not sched.findWork (queue, t3));
314  CHECK ( queue.empty()); // Everything retrieved and queue really empty
315 
316  queue.instruct ({a2, t2});
317  queue.instruct ({a1, t1});
318  CHECK (isSameObject (a1, *sched.findWork(queue, now))); // the earlier activity is found first
319  CHECK (t2 == queue.headTime());
320  CHECK (isSameObject (a2, *sched.findWork(queue, now)));
321  CHECK (not sched.findWork (queue, now));
322  CHECK ( queue.empty());
323 
324  queue.instruct ({a2, t2}); // prepare activity which /would/ be found...
325  blockGroomingToken(sched); // but prevent this thread from acquiring the GroomingToken
326  CHECK (not sched.findWork (queue, now)); // thus search aborts immediately
327  CHECK (not queue.empty());
328 
329  unblockGroomingToken(); // yet when we're able to get the GroomingToken
330  CHECK (isSameObject (a2, *sched.findWork(queue, now))); // the task can be retrieved
331  CHECK (queue.empty());
332  }
333 
334 
335 
345  void
347  { MARK_TEST_FUN
348 
349  SchedulerInvocation queue;
350  SchedulerCommutator sched;
351 
352  Time t1{10,0}; Activity a1{1u,1u};
353  Time t2{20,0}; Activity a2{2u,2u};
354  Time t3{30,0}; Activity a3{3u,3u};
355  Time t4{40,0}; Activity a4{4u,4u};
356  // start,deadline, manif.ID, isCompulsory
357  queue.instruct ({a1, t1, t4, ManifestationID{5}});
358  queue.instruct ({a2, t2, t2});
359  queue.instruct ({a3, t3, t3, ManifestationID{23}, true});
360  queue.instruct ({a4, t4, t4});
361  queue.activate(ManifestationID{5});
362  queue.activate(ManifestationID{23});
363 
364  queue.feedPrioritisation();
365  CHECK (t1 == queue.headTime());
366  CHECK (isSameObject (a1, *queue.peekHead()));
367  CHECK (not queue.isMissed(t1));
368  CHECK (not queue.isOutdated(t1));
369 
370  queue.drop(ManifestationID{5});
371  CHECK (t1 == queue.headTime());
372  CHECK (not queue.isMissed(t1));
373  CHECK ( queue.isOutdated(t1));
374 
375  CHECK (not sched.findWork(queue, t1));
376  CHECK (t2 == queue.headTime());
377  CHECK (isSameObject (a2, *queue.peekHead()));
378  CHECK (not queue.isMissed (t2));
379  CHECK (not queue.isOutdated(t2));
380  CHECK ( queue.isMissed (t3));
381  CHECK ( queue.isOutdated(t3));
382 
383  CHECK (not sched.findWork(queue, t2+Time{5,0}));
384  CHECK (t3 == queue.headTime());
385  CHECK (isSameObject (a3, *queue.peekHead()));
386  CHECK (not queue.isMissed (t3));
387  CHECK (not queue.isOutdated (t3));
388  CHECK (not queue.isOutOfTime(t3));
389  CHECK ( queue.isMissed (t4));
390  CHECK ( queue.isOutdated (t4));
391  CHECK ( queue.isOutOfTime(t4));
392 
393  CHECK (not sched.findWork(queue, t4));
394  CHECK (t3 == queue.headTime());
395  CHECK (not queue.isMissed (t3));
396  CHECK (not queue.isOutdated (t3));
397  CHECK (not queue.isOutOfTime(t3));
398  CHECK ( queue.isMissed (t4));
399  CHECK ( queue.isOutdated (t4));
400  CHECK ( queue.isOutOfTime(t4));
401 
402  queue.drop(ManifestationID{23});
403  CHECK (t3 == queue.headTime());
404  CHECK (not queue.isMissed (t3));
405  CHECK ( queue.isOutdated (t3));
406  CHECK (not queue.isOutOfTime(t3));
407  CHECK ( queue.isMissed (t4));
408  CHECK ( queue.isOutdated (t4));
409  CHECK (not queue.isOutOfTime(t4));
410 
411  CHECK (isSameObject (a3, *queue.peekHead()));
412  CHECK (isSameObject (a4, *sched.findWork(queue, t4)));
413  CHECK (queue.empty());
414  }
415 
416 
417 
420  void
422  { MARK_TEST_FUN
423 
424  // rigged execution environment to detect activations--------------
425  ActivityDetector detector;
426  Activity& activity = detector.buildActivationProbe ("testActivity");
427  auto makeEvent = [&](Time start) { return ActivationEvent{activity, start, start+Time{0,1}}; };
428  // set a dummy deadline to pass the sanity check
429  SchedulerInvocation queue;
430  SchedulerCommutator sched;
431 
432  Time now = detector.executionCtx.getSchedTime();
433  Time past {Time::ZERO};
434  Time future{now+now};
435 
436  // no one holds the GroomingToken
438  auto myself = std::this_thread::get_id();
439  CHECK (sched.acquireGoomingToken());
440 
441  // Activity with start time way into the past is enqueued, but then discarded
442  CHECK (activity::PASS == sched.postChain (makeEvent(past), queue));
443  CHECK (detector.ensureNoInvocation("testActivity")); // not invoked
444  CHECK (queue.peekHead()); // still in the queue...
445  CHECK (not sched.findWork (queue,now)); // but it is not retrieved due to deadline
446  CHECK (not queue.peekHead()); // and thus was dropped
447  CHECK (queue.empty());
448 
449  // future Activity is enqueued by short-circuit directly into the PriorityQueue if possible
450  CHECK (activity::PASS == sched.postChain (makeEvent(future), queue));
451  CHECK ( sched.holdsGroomingToken (myself));
452  CHECK (not queue.empty());
453  CHECK (isSameObject (activity, *queue.peekHead())); // appears at Head, implying it's in Priority-Queue
454 
455  queue.pullHead();
456  sched.dropGroomingToken();
457  CHECK (not sched.holdsGroomingToken (myself));
458  CHECK (queue.empty());
459 
460  // ...but GroomingToken is not acquired explicitly; Activity is just placed into the Instruct-Queue
461  CHECK (activity::PASS == sched.postChain (makeEvent(future), queue));
462  CHECK (not sched.holdsGroomingToken (myself));
463  CHECK (not queue.peekHead()); // not appearing at Head this time,
464  CHECK (not queue.empty()); // rather waiting in the Instruct-Queue
465 
466 
467  blockGroomingToken(sched);
468  CHECK (activity::PASS == sched.postChain (makeEvent(now), queue));
469  CHECK (not sched.holdsGroomingToken (myself));
470  CHECK (not queue.peekHead()); // was enqueued, not executed
471 
472  // Note: this test did not cause any direct invocation;
473  // all provided events were queued only
474  CHECK (detector.ensureNoInvocation("testActivity"));
475 
476  // As sanity-check: the first event was enqueued and the picked up;
477  // two further cases where enqueued; we could retrieve them if
478  // re-acquiring the GroomingToken and using suitable query-time
480  queue.feedPrioritisation();
481  CHECK (now == queue.headTime());
482  CHECK (isSameObject (activity, *sched.findWork(queue, now)));
483  CHECK (sched.holdsGroomingToken (myself)); // findWork() acquired the token
484  CHECK (future == queue.headTime());
485  CHECK (not queue.isDue(now));
486  CHECK ( queue.isDue(future));
487  CHECK (sched.findWork(queue, future));
488  CHECK ( queue.empty());
489  }
490 
491 
492 
500  void
502  { MARK_TEST_FUN
503 
504  // rigged execution environment to detect activations--------------
505  ActivityDetector detector;
506  Activity& activity = detector.buildActivationProbe ("testActivity");
507  // set a dummy deadline to pass the sanity check
508  SchedulerInvocation queue;
509  SchedulerCommutator sched;
510  LoadController lCtrl;
511 
512  Time start{0,1};
513  Time dead{0,10};
514  // prepare the queue with one activity
515  CHECK (Time::NEVER == queue.headTime());
516  queue.instruct ({activity, start, dead});
517  queue.feedPrioritisation();
518  CHECK (start == queue.headTime());
519 
520  // for the first testcase,
521  // set Grooming-Token to be blocked
522  blockGroomingToken(sched);
523  auto myself = std::this_thread::get_id();
524  CHECK (not sched.holdsGroomingToken (myself));
525 
526  // invoking the dequeue and dispatch requires some wiring
527  // with functionality provided by other parts of the scheduler
528  auto getSchedTime = detector.executionCtx.getSchedTime;
529  auto executeActivity = [&](ActivationEvent event)
530  {
531  return ActivityLang::dispatchChain (event, detector.executionCtx);
532  };
533 
534  // Invoke the pull-work functionality directly from this thread
535  // (in real usage, this function is invoked from a worker)
536  CHECK (activity::KICK == sched.dispatchCapacity (queue
537  ,lCtrl
538  ,executeActivity
539  ,getSchedTime));
540  CHECK (not queue.empty());
541  // the first invocation was kicked back,
542  // since the Grooming-token could not be acquired.
544 
545  // ...now this thread can acquire, fetch from queue and dispatch....
546  CHECK (activity::PASS == sched.dispatchCapacity (queue
547  ,lCtrl
548  ,executeActivity
549  ,getSchedTime));
550 
551  CHECK (queue.empty());
552  CHECK (not sched.holdsGroomingToken (myself));
553  CHECK (detector.verifyInvocation("testActivity"));
554  }
555 
556 
557 
558 
569  void
571  { // ·==================================================================== setup a rigged Job
573  Time nominal{7,7};
574  Time start{0,1};
575  Time dead{0,10};
576 
577  ActivityDetector detector;
578  Job testJob{detector.buildMockJob("testJob", nominal, 12345)};
579 
580  BlockFlowAlloc bFlow;
581  ActivityLang activityLang{bFlow};
582 
583  // Build the Activity-Term for a simple calculation job...
584  Activity& anchor = activityLang.buildCalculationJob (testJob, start,dead)
585  .post(); // retrieve the entrance point to the chain
586 
587  // insert instrumentation to trace activation
588  detector.watchGate (anchor.next, "theGate");
589 
590 
591  // ·=================================================================== setup test subject
592  SchedulerInvocation queue;
593  SchedulerCommutator sched;
594 
595  // no one holds the GroomingToken
597  auto myself = std::this_thread::get_id();
598  CHECK (not sched.holdsGroomingToken (myself));
599 
600  TimeVar now{Time::ZERO};
601 
602  // rig the ExecutionCtx to allow manipulating "current scheduler time"
603  detector.executionCtx.getSchedTime = [&]{ return Time{now}; };
604  // rig the λ-work to verify GroomingToken and to drop it then
605  detector.executionCtx.work.implementedAs(
606  [&](Time, size_t)
607  {
608  CHECK (sched.holdsGroomingToken (myself));
609  sched.dropGroomingToken();
610  });
611 
612 
613  // ·=================================================================== actual test sequence
614  // Add the Activity-Term to be scheduled for planned start-Time
615  sched.postChain (ActivationEvent{anchor, start}, queue);
616  CHECK (detector.ensureNoInvocation("testJob"));
617  CHECK (not sched.holdsGroomingToken (myself));
618  CHECK (not queue.empty());
619 
620  // later->"now"
621  now = Time{555,5};
622  detector.incrementSeq();
623 
624  // Assuming a worker runs "later" and retrieves work...
625  ActivationEvent act = sched.findWork(queue,now);
626  CHECK (sched.holdsGroomingToken (myself)); // acquired the GroomingToken
627  CHECK (isSameObject(*act, anchor)); // "found" the rigged Activity as next piece of work
628 
629  // dispatch the Activity-chain just retrieved from the queue
630  ActivityLang::dispatchChain (act, detector.executionCtx);
631 
632  CHECK (queue.empty());
633  CHECK (not sched.holdsGroomingToken (myself)); // the λ-work was invoked and dropped the GroomingToken
634 
635  CHECK (detector.verifySeqIncrement(1)
636  .beforeInvocation("theGate").arg("5.555 ⧐ Act(GATE")
637  .beforeInvocation("after-theGate").arg("⧐ Act(WORKSTART")
638  .beforeInvocation("CTX-work").arg("5.555","")
639  .beforeInvocation("testJob") .arg("7.007",12345)
640  .beforeInvocation("CTX-done").arg("5.555",""));
641 
642 // cout << detector.showLog()<<endl; // HINT: use this for investigation...
643  }
644  };
645 
646 
648  LAUNCHER (SchedulerCommutator_test, "unit engine");
649 
650 
651 
652 }}} // namespace vault::gear::test
a mutable time value, behaving like a plain number, allowing copy and re-accessing ...
Definition: timevalue.hpp:232
activity::Proc postChain(ActivationEvent event, SchedulerInvocation &layer1)
This is the primary entrance point to the Scheduler.
Record to describe an Activity, to happen within the Scheduler&#39;s control flow.
Definition: activity.hpp:226
auto threadBenchmark(FUN const &subject, const size_t repeatCnt=DEFAULT_RUNS)
perform a multithreaded microbenchmark.
Automatically use custom string conversion in C++ stream output.
bool holdsGroomingToken(ThreadID id) noexcept
check if the indicated thread currently holds the right to conduct internal state transitions...
void dropGroomingToken() noexcept
relinquish the right for internal state transitions.
Definition: run.hpp:40
Scheduler Layer-2 : execution of Scheduler Activities.
auto thisThread()
convenient short-notation, also used by SchedulerService
ActivationEvent findWork(SchedulerInvocation &layer1, Time now)
Look into the queues and possibly retrieve work due by now.
Functions to perform (multithreaded) timing measurement on a given functor.
activity::Proc activate(Time now, EXE &executionCtx)
Core Operation: Activate and perform this Activity.
Definition: activity.hpp:626
Lumiera&#39;s internal time value datatype.
Definition: timevalue.hpp:299
static void ___ensureGroomingTokenReleased(SchedulerCommutator &sched)
Controller to coordinate resource usage related to the Scheduler.
void instruct(ActivationEvent actEvent)
Accept an ActivationEvent with an Activity for time-bound execution.
Abstract Base Class for all testcases.
Definition: run.hpp:53
Term builder and execution framework to perform chains of scheduler Activities.
Marker for current (and obsolete) manifestations of a CalcStream processed by the Render-Engine...
Definition: activity.hpp:84
Diagnostic context to record and evaluate activations within the Scheduler.
#define MARK_TEST_FUN
Macro to mark the current test function in STDOUT.
Simplistic test class runner.
void seedRand()
draw a new random seed from a common nucleus, and re-seed the default-Gen.
Definition: suite.cpp:211
Tiny helper functions and shortcuts to be used everywhere Consider this header to be effectively incl...
Convenience front-end to simplify and codify basic thread handling.
boost::rational< int64_t > FSecs
rational representation of fractional seconds
Definition: timevalue.hpp:220
uint incrementSeq()
increment the internal invocation sequence number
Activity * next
Activities are organised into chains to represent relations based on verbs.
Definition: activity.hpp:249
Diagnostic setup to instrument and observe Activity activations.
Layer-2 of the Scheduler: coordination and interaction of activities.
static const Time NEVER
border condition marker value. NEVER >= any time value
Definition: timevalue.hpp:314
Extended variant of the standard case, allowing to install callbacks (hook functions) to be invoked d...
Definition: thread.hpp:716
ScopedGroomingGuard requireGroomingTokenHere()
a scope guard to force acquisition of the GroomingToken
static activity::Proc dispatchChain(Activity *chain, EXE &executionCtx)
Execution Framework: dispatch performance of a chain of Activities.
ActivationEvent pullHead()
Retrieve from the scheduling queue the entry with earliest start time.
Individual frame rendering task, forwarding to a closure.
Definition: job.h:268
a family of time value like entities and their relationships.
void feedPrioritisation()
Pick up all new events from the entrance queue and enqueue them to be retrieved ordered by start time...
ActivityMatch & arg(ARGS const &...args)
qualifier: additionally match the function arguments
Vault-Layer implementation namespace root.
bool isDue(Time now) const
Determine if there is work to do right now.
bool acquireGoomingToken() noexcept
acquire the right to perform internal state transitions.
Scheduler Layer-1 : time based dispatch.
activity::Proc dispatchCapacity(SchedulerInvocation &, LoadController &, DISPATCH, CLOCK)
Implementation of the worker-Functor:
void detach_thread_from_wrapper()
allow to detach explicitly — independent from thread-function&#39;s state.
Definition: thread.hpp:231
bool isSameObject(A const &a, B const &b)
compare plain object identity, based directly on the referee&#39;s memory identities. ...
Definition: util.hpp:421