42 using std::this_thread::sleep_for;
45 Time t100us =
Time{FSecs{1, 10
'000}}; 46 Time t200us = t100us + t100us; 47 Time t500us = t200us + t200us + t100us; 48 Time t1ms = Time{1,0}; 50 const uint TYPICAL_TIME_FOR_ONE_SCHEDULE_us = 3; 56 /*************************************************************************//** 57 * @test Scheduler component integration test: use the service API for 58 * state control and to add Jobs and watch processing patterns. 59 * @see SchedulerActivity_test 60 * @see SchedulerInvocation_test 61 * @see SchedulerCommutator_test 62 * @see SchedulerLoadControl_test 64 class SchedulerService_test : public Test 87 Scheduler scheduler{bFlow, watch}; 88 CHECK (scheduler.empty()); 90 auto task = onetimeCrunch(4ms); 91 CHECK (1 == task.remainingInvocations()); 94 , InvocationInstanceID() 97 scheduler.defineSchedule(job) 101 CHECK (not scheduler.empty()); 103 sleep_for (3ms); // not invoked yet 104 CHECK (1 == task.remainingInvocations()); 107 CHECK (0 == task.remainingInvocations()); 108 } // task has been invoked 121 postNewTask (Scheduler& scheduler, Activity& chain, Time start) 123 ActivationEvent actEvent{chain, start, start + Time{50,0}}; // add dummy deadline +50ms 124 scheduler.layer2_.postChain (actEvent, scheduler.layer1_); 134 BlockFlowAlloc bFlow; 135 EngineObserver watch; 136 Scheduler scheduler{bFlow, watch}; 137 CHECK (scheduler.empty()); 139 Activity dummy{Activity::FEED}; 140 auto postIt = [&] { postNewTask (scheduler, dummy, RealClock::now()+t200us); }; 143 CHECK (not scheduler.empty());// repeated »tick« task enlisted.... 146 CHECK (not scheduler.empty()); 148 scheduler.terminateProcessing(); 149 CHECK (scheduler.empty()); 154 CHECK (not scheduler.empty()); 155 //... and just walk away => scheduler unwinds cleanly from destructor 156 }// Note: BlockFlow and WorkForce unwinding is covered in dedicated tests 179 BlockFlowAlloc bFlow; 180 EngineObserver watch; 181 Scheduler scheduler{bFlow, watch}; 182 CHECK (scheduler.empty()); 184 // use a single FEED as content 185 Activity dummy{Activity::FEED}; 187 auto anchor = RealClock::now(); 188 auto offset = [&](Time when =RealClock::now()){ return _raw(when) - _raw(anchor); }; 190 auto createLoad = [&](Offset start, uint cnt) 191 { // use internal API (this test is declared as friend) 192 for (uint i=0; i<cnt; ++i) // flood the queue 193 postNewTask (scheduler, dummy, anchor + start + TimeValue{i}); 197 auto LOAD_PEAK_DURATION_us = 2000; 198 auto fatPackage = LOAD_PEAK_DURATION_us/TYPICAL_TIME_FOR_ONE_SCHEDULE_us; 200 createLoad (Offset{Time{ 5,0}}, fatPackage); 201 createLoad (Offset{Time{15,0}}, fatPackage); 204 cout << "Timing : start-up required.."<<offset()<<" µs"<<endl; 206 // now watch change of load and look out for two peaks.... 215 _Fmt row{"%6d | Load: %5.3f Head:%5d Lag:%6d\n"}; 217 while (not scheduler.empty()) // should fall empty at end 220 double load = scheduler.getLoadIndicator(); 231 peak1_max = max (load, peak1_max); 235 peak1_dur = offset() - peak1_s; 246 peak2_max = max (load, peak2_max); 250 peak2_dur = offset() - peak2_s; 254 cout << row % offset() % load 255 % offset(scheduler.layer1_.headTime()) 256 % scheduler.loadControl_.averageLag(); 258 uint done = offset(); 260 //--------Summary-Table------------------------------ 261 _Fmt peak{"\nPeak %d ....... %5d +%dµs %34tmax=%3.1f"}; 262 cout << "-------+-------------+----------+----------" 264 << peak % 1 % peak1_s % peak1_dur % peak1_max 265 << peak % 2 % peak2_s % peak2_dur % peak2_max 266 << "\nTick ....... "<<done 270 CHECK (peak1_s > 5000); // first peak was scheduled at 5ms 271 CHECK (peak1_s < 10000); 272 CHECK (peak2_s > 15000); // second peak was scheduled at 15ms 273 CHECK (peak2_s < 20000); 274 CHECK (peak1_max > 2.0); 275 CHECK (peak2_max > 2.0); 277 CHECK (done > 50000); // »Tick« period is 50ms 278 // and this tick should determine end of timeline 280 cout << "\nwaiting for shutdown of WorkForce"; 281 while (scheduler.workForce_.size() > 0) 284 cout << "." << std::flush; 286 uint shutdown = offset(); 287 cout << "\nShutdown after "<<shutdown / 1.0e6<<"sec"<<endl; 288 CHECK (shutdown > 2.0e6); 324 BlockFlowAlloc bFlow; 325 EngineObserver watch; 326 Scheduler scheduler{bFlow, watch}; 328 ActivityDetector detector; 329 Activity& probe = detector.buildActivationProbe ("testProbe"); 336 auto post = [&](Time start) 337 { // this test class is declared friend to get a backdoor into Scheduler internals... 338 scheduler.layer2_.acquireGoomingToken(); 339 postNewTask (scheduler, probe, start); 342 auto pullWork = [&] { 343 delay_us = lib::test::benchmarkTime([&]{ res = scheduler.doWork(); }); 344 slip_us = _raw(detector.invokeTime(probe)) - _raw(start); 345 cout << "res:"<<res<<" delay="<<delay_us<<"µs slip="<<slip_us<<"µs"<<endl; 349 auto wasClose = [](TimeValue a, TimeValue b) 350 { // 500µs are considered "close" 351 return Duration{Offset{a,b}} < Duration{FSecs{1,2000}}; 353 auto wasInvoked = [&](Time start) 355 Time invoked = detector.invokeTime (probe); 356 return invoked >= start 357 and wasClose (invoked, start); 361 cout << "pullWork() on empty queue..."<<endl; 362 pullWork(); // Call the work-Function on empty Scheduler queue 363 CHECK (activity::WAIT == res); // the result instructs this thread to go to sleep immediately 366 cout << "Due at pullWork()..."<<endl; 367 TimeVar now = RealClock::now(); 368 start = now + t100us; // Set a schedule 100ms ahead of "now" 370 CHECK (not scheduler.empty()); // was enqueued 371 CHECK (not wasInvoked(start)); // ...but not activated yet 373 sleep_for (100us); // wait beyond the planned start point (typically waits ~150µs or more) 375 CHECK (wasInvoked(start)); 376 CHECK (slip_us < 300); // Note: typically there is a slip of 100..200µs, because sleep waits longer 377 CHECK (scheduler.empty()); // The scheduler is empty now and this thread will go to sleep, 378 CHECK (delay_us < 20200); // however the sleep-cycle is first re-shuffled by a wait between 0 ... 20ms 379 CHECK (activity::PASS == res); // this thread is instructed to check back once 381 CHECK (activity::WAIT == res); // ...yet since the queue is still empty, it is sent immediately to sleep 382 CHECK (delay_us < 40); 385 cout << "next some time ahead => up-front delay"<<endl; 386 now = RealClock::now(); 387 start = now + t500us; // Set a schedule significantly into the future... 389 CHECK (not scheduler.empty()); 391 pullWork(); // ...and invoke the work-Function immediately "now" 392 CHECK (activity::PASS == res); // Result: this thread was kept in sleep in the work-Function 393 CHECK (not wasInvoked(start)); // but the next dispatch did not happen yet; we are instructed to re-invoke immediately 394 CHECK (delay_us > 500); // this proves that there was a delay to wait for the next schedule 395 CHECK (delay_us < 1000); 396 pullWork(); // if we now re-invoke the work-Function as instructed... 397 CHECK (wasInvoked(start)); // then the next schedule is already slightly overdue and immediately invoked 398 CHECK (scheduler.empty()); // the queue is empty and thus this thread will be sent to sleep 399 CHECK (delay_us < 20200); // but beforehand the sleep-cycle is re-shuffled by a wait between 0 ... 20ms 400 CHECK (slip_us < 300); 401 CHECK (activity::PASS == res); // instruction to check back once 403 CHECK (activity::WAIT == res); // but next call will send this thread to sleep right away 404 CHECK (delay_us < 40); 407 cout << "follow-up with some distance => follow-up delay"<<endl; 408 now = RealClock::now(); 409 start = now + t100us; 410 post (start); // This time the schedule is set to be "soon" 411 post (start+t1ms); // But another schedule is placed 1ms behind 412 sleep_for (100us); // wait for "soon" to pass... 414 CHECK (wasInvoked(start)); // Result: the first invocation happened immediately 415 CHECK (slip_us < 300); 416 CHECK (delay_us > 900); // yet this thread was afterwards kept in sleep to await the next task; 417 CHECK (activity::PASS == res); // returns instruction to re-invoke immediately 418 CHECK (not scheduler.empty()); // since there is still work in the queue 420 start += t1ms; // (just re-adjust the reference point to calculate slip_us) 421 pullWork(); // re-invoke immediately as instructed 422 CHECK (wasInvoked(start)); // Result: also the next Activity has been dispatched 423 CHECK (slip_us < 400); // not much slip 424 CHECK (delay_us < 20200); // ...and the post-delay is used to re-shuffle the sleep cycle as usual 425 CHECK (activity::PASS == res); // since queue is empty, we will call back once... 426 CHECK (scheduler.empty()); 428 CHECK (activity::WAIT == res); // and then go to sleep. 431 cout << "already tended-next => re-target capacity"<<endl; 432 now = RealClock::now(); 433 start = now + t500us; // Set the next schedule with some distance... 436 // Access scheduler internals (as friend) 437 CHECK (start == scheduler.layer1_.headTime()); // next schedule indeed appears as next-head 438 CHECK (not scheduler.loadControl_.tendedNext(start)); // but this next time was not yet marked as "tended" 440 scheduler.loadControl_.tendNext(start); // manipulate scheduler to mark next-head as "tended" 441 CHECK ( scheduler.loadControl_.tendedNext(start)); 443 CHECK (start == scheduler.layer1_.headTime()); // other state still the same 444 CHECK (not scheduler.empty()); 447 CHECK (not wasInvoked(start)); // since next-head was marked as "tended"... 448 CHECK (not scheduler.empty()); // ...this thread is not used to dispatch it 449 CHECK (delay_us < 6000); // rather it is re-focussed as free capacity within WORK_HORIZON 469 BlockFlowAlloc bFlow; 470 EngineObserver watch; 471 Scheduler scheduler{bFlow, watch}; 473 // prevent scale-up of the Scheuler's
WorkForce 481 Job testJob{detector.buildMockJob(
"testJob", nominal, 1337)};
483 CHECK (scheduler.empty());
486 scheduler.defineSchedule(testJob)
491 CHECK (not scheduler.empty());
494 scheduler.layer2_.maybeFeed(scheduler.layer1_);
497 auto entry = scheduler.layer1_.peekHead();
498 auto now = RealClock::now();
504 CHECK (
entry.startTime() - now < _uTicks( 400us));
505 CHECK (
entry.deathTime() - now < _uTicks(2400us));
506 CHECK (
entry.manifestation == 0);
507 CHECK (
entry.isCompulsory ==
false);
514 CHECK (activity::PASS == scheduler.doWork());
516 CHECK (detector.verifySeqIncrement(1)
517 .beforeInvocation(
"testJob").
arg(
"7.007", 1337));
546 cout <<
_Fmt{
"Test-Load: Nodes: %d Levels: %d ∅Node/Level: %3.1f Forks: %d Joins: %d"}
556 size_t expectedHash = testLoad.getHash();
560 auto LOAD_BASE = 500us;
561 testLoad.performGraphSynchronously(LOAD_BASE);
562 CHECK (testLoad.getHash() == expectedHash);
564 double referenceTime = testLoad.calcRuntimeReference(LOAD_BASE);
565 cout <<
"refTime(singleThr): "<<referenceTime/1000<<
"ms"<<endl;
573 double performanceTime =
574 testLoad.setupSchedule(scheduler)
575 .withLoadTimeBase(LOAD_BASE)
576 .withJobDeadline(30ms)
579 cout <<
"runTime(Scheduler): "<<performanceTime/1000<<
"ms"<<endl;
582 CHECK (testLoad.getHash() == expectedHash);
585 CHECK (performanceTime < 2*referenceTime);
const StatKey STAT_NODE
all nodes
signal start of some processing and transition grooming mode ⟼ *work mode
#define TRANSIENTLY(_OO_)
Macro to simplify capturing assignments.
AnyPair entry(Query< TY > const &query, typename WrapReturn< TY >::Wrapper &obj)
helper to simplify creating mock table entries, wrapped correctly
TestChainLoad && buildTopology()
Use current configuration and seed to (re)build Node connectivity.
Generate synthetic computation load for Scheduler performance tests.
Functions to perform (multithreaded) timing measurement on a given functor.
A Generator for synthetic Render Jobs for Scheduler load testing.
A front-end for using printf-style formatting.
Lumiera's internal time value datatype.
post a message providing a chain of further time-bound Activities
Abstract Base Class for all testcases.
»Scheduler-Service« : coordinate render activities.
Service for coordination and dispatch of render activities.
Diagnostic context to record and evaluate activations within the Scheduler.
#define MARK_TEST_FUN
Macro to mark the current test function in STDOUT.
Simplistic test class runner.
Tiny helper functions and shortcuts to be used everywhere Consider this header to be effectively incl...
TestChainLoad && configureShape_chain_loadBursts()
preconfigured topology: single graph with massive »load bursts«
const StatKey STAT_JOIN
joining node
uint incrementSeq()
increment the internal invocation sequence number
probe window + count-down; activate next Activity, else re-schedule
Diagnostic setup to instrument and observe Activity activations.
Statistic computeGraphStatistics()
Operator on TestChainLoad to evaluate current graph connectivity.
Test helper to perform temporary manipulations within a test scope.
dispatch a JobFunctor into a worker thread
const StatKey STAT_FORK
forking node
static size_t COMPUTATION_CAPACITY
Nominal »full size« of a pool of concurrent workers.
Individual frame rendering task, forwarding to a closure.
a family of time value like entities and their relationships.
Pool of worker threads for rendering.
ActivityMatch & arg(ARGS const &...args)
qualifier: additionally match the function arguments
Vault-Layer implementation namespace root.
Collector and aggregator for performance data.