Lumiera  0.pre.03
»edit your freedom«
scheduler.hpp
Go to the documentation of this file.
1 /*
2  SCHEDULER.hpp - coordination of render activities under timing and dependency constraints
3 
4  Copyright (C)
5  2023, Hermann Vosseler <Ichthyostega@web.de>
6 
7   **Lumiera** is free software; you can redistribute it and/or modify it
8   under the terms of the GNU General Public License as published by the
9   Free Software Foundation; either version 2 of the License, or (at your
10   option) any later version. See the file COPYING for further details.
11 
12 */
13 
14 
95 #ifndef SRC_VAULT_GEAR_SCHEDULER_H_
96 #define SRC_VAULT_GEAR_SCHEDULER_H_
97 
98 
99 #include "lib/error.hpp"
100 #include "vault/gear/block-flow.hpp"
101 #include "vault/gear/work-force.hpp"
107 #include "vault/real-clock.hpp"
108 #include "lib/nocopy.hpp"
109 
110 #include <optional>
111 #include <utility>
112 
113 
114 namespace vault{
115 namespace gear {
116 
117  using std::move;
118  using lib::time::Time;
119  using lib::time::FSecs;
120  using lib::time::Offset;
121 
122  namespace test { // declared friend for test access
123  class SchedulerService_test;
124  }
125 
126  namespace { // Scheduler default config
127 
128  const auto IDLE_WAIT = 20ms;
129  const size_t DISMISS_CYCLES = 100;
130  Offset DUTY_CYCLE_PERIOD{FSecs(1,20)};
133  }
134 
135 
136  class Scheduler;
137 
140  {
141  Job job_;
142  TimeVar start_{Time::ANYTIME};
143  TimeVar death_{Time::NEVER};
144  ManifestationID manID_{};
145  bool isCompulsory_{false};
146 
147  Scheduler* theScheduler_;
148  std::optional<activity::Term> term_;
149 
150  public:
151  ScheduleSpec (Scheduler& sched, Job job)
152  : job_{job}
153  , theScheduler_{&sched}
154  , term_{std::nullopt}
155  { }
156 
158  startOffset (microseconds afterNow)
159  {
160  start_ = RealClock::now() + _uTicks(afterNow);
161  return move(*this);
162  }
163 
165  startTime (Time fixedTime)
166  {
167  start_ = fixedTime;
168  return move(*this);
169  }
170 
172  lifeWindow (microseconds afterStart)
173  {
174  death_ = start_ + _uTicks(afterStart);
175  return move(*this);
176  }
177 
179  manifestation (ManifestationID manID)
180  {
181  manID_ = manID;
182  return move(*this);
183  }
184 
186  compulsory (bool indeed =true)
187  {
188  isCompulsory_ = indeed;
189  return move(*this);
190  }
191 
192 
194  ScheduleSpec post();
195 
196  ScheduleSpec linkToSuccessor (ScheduleSpec&, bool unlimitedTime =false);
197  ScheduleSpec linkToPredecessor(ScheduleSpec&, bool unlimitedTime =false);
198  private:
199  void maybeBuildTerm();
200  };
201 
202 
203 
204  /******************************************************/
213  class Scheduler
215  {
218  {
219  Scheduler& scheduler;
220  activity::Proc doWork() { return scheduler.doWork(); }
221  void finalHook (bool _) { scheduler.handleWorkerTermination(_);}
222  };
223 
224 
225  SchedulerInvocation layer1_;
226  SchedulerCommutator layer2_;
227  WorkForce<Setup> workForce_;
228 
229  ActivityLang activityLang_;
230  LoadController loadControl_;
231  EngineObserver& engineObserver_;
232 
233 
234  public:
235  Scheduler (BlockFlowAlloc& activityAllocator
236  ,EngineObserver& engineObserver)
237  : layer1_{}
238  , layer2_{}
239  , workForce_{Setup{IDLE_WAIT, DISMISS_CYCLES, *this}}
240  , activityLang_{activityAllocator}
241  , loadControl_{connectMonitoring()}
242  , engineObserver_{engineObserver}
243  { }
244 
245 
246  bool
247  empty() const
248  {
249  return layer1_.empty();
250  }
251 
258  void
260  {
261  TRACE (engine, "Ignite Scheduler Dispatch.");
262  bool force_continued_run{true};
263  handleDutyCycle (RealClock::now(), force_continued_run);
264  if (not empty())
265  workForce_.activate();
266  }
267 
268 
277  void
279  {
280  TRACE (engine, "Forcibly terminate Scheduler Dispatch.");
281  workForce_.awaitShutdown();
282  layer1_.discardSchedule();
283  }
284 
285 
291  double
293  {
294  return loadControl_.effectiveLoad();
295  }
296 
297 
317  void
318  seedCalcStream (Job planningJob
320  ,FrameRate expectedAdditionalLoad = FrameRate(25))
321  {
322  auto guard = layer2_.requireGroomingTokenHere(); // allow mutation
323  layer1_.activate (manID);
324  activityLang_.announceLoad (expectedAdditionalLoad);
325  continueMetaJob (RealClock::now(), planningJob, manID);
326  }
327 
328 
332  void
334  ,Job planningJob
335  ,ManifestationID manID = ManifestationID())
336  {
337  bool isCompulsory = true;
338  Time deadline = nextStart + DUTY_CYCLE_TOLERANCE;
339  // place the meta-Job into the timeline...
340  postChain ({activityLang_.buildMetaJob(planningJob, nextStart, deadline)
341  .post()
342  , nextStart
343  , deadline
344  , manID
345  , isCompulsory});
346  }
347 
348 
358  {
359  return ScheduleSpec{*this, job};
360  }
361 
362 
363 
368  activity::Proc doWork();
369 
370 
371  private:
372  void postChain (ActivationEvent);
373  void sanityCheck (ActivationEvent const&);
374  void handleDutyCycle (Time now, bool =false);
375  void handleWorkerTermination (bool isFailure);
376  void maybeScaleWorkForce (Time startHorizon);
377 
378  void triggerEmergency();
379 
380 
384  {
386  setup.maxCapacity = []{ return work::Config::COMPUTATION_CAPACITY; };
387  setup.currWorkForceSize = [this]{ return workForce_.size(); };
388  setup.stepUpWorkForce = [this](uint steps){ workForce_.incScale(steps); };
389  return setup;
390  }
391 
393  Time
395  {
396  return RealClock::now();
397  }
398 
400  class ExecutionCtx;
401  friend class ExecutionCtx;
402 
404  friend class ScheduleSpec;
405 
408  };
409 
410 
411 
412 
413 
414 
417  : public EngineEvent
418  {
420  using EngineEvent::EngineEvent;
421 
422  static Symbol WORKSTART;
423  static Symbol WORKSTOP;
424 
425  public:
426  static WorkTiming start (Time now) { return WorkTiming{WORKSTART, Payload{now}}; }
427  static WorkTiming stop (Time now) { return WorkTiming{WORKSTOP, Payload{now}}; }
428  };
429 
440  {
441  Scheduler& scheduler_;
442  public:
443 
444  ActivationEvent rootEvent;
445 
446  ExecutionCtx(Scheduler& self, ActivationEvent toDispatch)
447  : scheduler_{self}
448  , rootEvent{toDispatch}
449  { }
450 
451 
452  /* ==== Implementation of the Concept ExecutionCtx ==== */
453 
463  post (Time when, Time dead, Activity* chain, ExecutionCtx& ctx)
464  {
465  REQUIRE (chain);
466  ActivationEvent chainEvent = ctx.rootEvent;
467  chainEvent.refineTo (chain, when, dead);
468  scheduler_.sanityCheck (chainEvent);
469  return scheduler_.layer2_.postChain (chainEvent, scheduler_.layer1_);
470  }
471 
478  void
479  work (Time now, size_t qualifier)
480  {
481  scheduler_.layer2_.dropGroomingToken();
482  scheduler_.engineObserver_.dispatchEvent(qualifier, WorkTiming::start(now));
483  }
484 
486  void
487  done (Time now, size_t qualifier)
488  {
489  scheduler_.engineObserver_.dispatchEvent(qualifier, WorkTiming::stop(now));
490  }
491 
494  tick (Time now)
495  {
496  scheduler_.handleDutyCycle (now);
497  return activity::PASS;
498  }
499 
501  Time
503  {
504  return scheduler_.getSchedTime();
505  }
506  };
507 
508 
509 
510 
511  /***********************************************************************/
519  inline activity::Proc
521  {
522  return layer2_.dispatchCapacity (layer1_
523  ,loadControl_
524  ,[this](ActivationEvent toDispatch)
525  {
526  ExecutionCtx ctx{*this, toDispatch};
527  return ActivityLang::dispatchChain (toDispatch, ctx);
528  }
529  ,[this] { return getSchedTime(); }
530  );
531  }
532 
533 
534 
535 
536 
546  inline ScheduleSpec
548  { // execute term-builder on-demand...
549  maybeBuildTerm();
550 
551  // set up new schedule by retrieving the Activity-chain...
552  theScheduler_->postChain ({term_->post(), start_
553  , death_
554  , manID_
555  , isCompulsory_});
556  return move(*this);
557  }
558 
563  inline void
565  {
566  if (term_) return;
567  term_ = move(
568  theScheduler_->activityLang_
569  .buildCalculationJob (job_, start_,death_));
570  }
571 
572  inline ScheduleSpec
573  ScheduleSpec::linkToSuccessor (ScheduleSpec& succSpec, bool unlimitedTime)
574  {
575  this->maybeBuildTerm();
576  succSpec.maybeBuildTerm();
577  term_->appendNotificationTo (*succSpec.term_, unlimitedTime);
578  return move(*this);
579  }
580 
581  inline ScheduleSpec
582  ScheduleSpec::linkToPredecessor (ScheduleSpec& predSpec, bool unlimitedTime)
583  {
584  predSpec.maybeBuildTerm();
585  this->maybeBuildTerm();
586  predSpec.term_->appendNotificationTo (*term_, unlimitedTime);
587  return move(*this);
588  }
589 
590 
591 
592  inline void
593  Scheduler::sanityCheck (ActivationEvent const& event)
594  {
595  if (not event)
596  throw error::Logic ("Empty event passed into Scheduler entrance");
597  if (event.startTime() == Time::ANYTIME)
598  throw error::Fatal ("Attempt to schedule an Activity without valid start time");
599  if (event.deathTime() == Time::NEVER)
600  throw error::Fatal ("Attempt to schedule an Activity without valid deadline");
601  Time now{getSchedTime()};
602  Offset toDeadline{now, event.deathTime()};
603  if (toDeadline > FUTURE_PLANNING_LIMIT)
604  throw error::Fatal (util::_Fmt{"Attempt to schedule Activity %s "
605  "with a deadline by %s into the future"}
606  % *event.activity
607  % toDeadline);
608  }
609 
610 
611 
612 
613  /*****************************************************************/
619  inline void
621  {
622  sanityCheck (actEvent);
623  maybeScaleWorkForce (actEvent.startTime());
624  layer2_.postChain (actEvent, layer1_);
625  }
626 
627 
628 
629 
646  inline void
647  Scheduler::handleDutyCycle (Time now, bool forceContinuation)
648  {
649  auto guard = layer2_.requireGroomingTokenHere();
650 
651  // consolidate queue and discard outdated tasks
652  if (not layer2_.maintainQueueHead(layer1_,now))
653  { // missed deadline of compulsory task
654  triggerEmergency();
655  return;// leave everything as-is
656  }
657 
658  // clean-up of obsolete Activities
659  activityLang_.discardBefore (now);
660 
661  loadControl_.updateState (now);
662 
663  if (not empty() or forceContinuation)
664  {// prepare next duty cycle »tick«
665  Time nextTick = now + (forceContinuation? WORK_HORIZON : DUTY_CYCLE_PERIOD);
666  Time deadline = nextTick + DUTY_CYCLE_TOLERANCE;
667  Activity& tickActivity = activityLang_.createTick (deadline);
668  ActivationEvent tickEvent{tickActivity, nextTick, deadline, ManifestationID(), true};
669  layer2_.postChain (tickEvent, layer1_);
670  } // *deliberately* use low-level entrance
671  } // to avoid ignite() cycles and derailed load-regulation
672 
677  inline void
679  {
680  if (isFailure)
681  triggerEmergency();
682  else
683  loadControl_.markWorkerExit();
684  }
685 
694  inline void
696  {
697  if (empty())
698  ignite();
699  else
700  loadControl_.ensureCapacity (startHorizon);
701  }
702 
712  inline void
714  {
715  UNIMPLEMENTED ("scheduler overrun -- trigger Emergency");
716  }
717 
718 
719 
720 }} // namespace vault::gear
721 #endif /*SRC_VAULT_GEAR_SCHEDULER_H_*/
static const Time ANYTIME
border condition marker value. ANYTIME <= any time value
Definition: timevalue.hpp:313
a mutable time value, behaving like a plain number, allowing copy and re-accessing ...
Definition: timevalue.hpp:232
activity::Proc postChain(ActivationEvent event, SchedulerInvocation &layer1)
This is the primary entrance point to the Scheduler.
Scheduler resource usage coordination.
Record to describe an Activity, to happen within the Scheduler&#39;s control flow.
Definition: activity.hpp:226
Types marked with this mix-in may be moved and move-assigned.
Definition: nocopy.hpp:63
void seedCalcStream(Job planningJob, ManifestationID manID=ManifestationID(), FrameRate expectedAdditionalLoad=FrameRate(25))
Set the Scheduler to work on a new CalcStream.
Definition: scheduler.hpp:318
Low-level Render Engine event — abstracted storage base.
void handleWorkerTermination(bool isFailure)
Callback invoked whenever a worker-thread is about to exit.
Definition: scheduler.hpp:678
void discardSchedule()
forcibly clear out the schedule
void dropGroomingToken() noexcept
relinquish the right for internal state transitions.
Definition: Setup.py:1
Memory management scheme for activities and parameter data passed through the Scheduler within the Lu...
Offset DUTY_CYCLE_PERIOD
period of the regular scheduler »tick« for state maintenance.
Definition: scheduler.hpp:130
Definition: run.hpp:40
Framerate specified as frames per second.
Definition: timevalue.hpp:655
activity::Proc tick(Time now)
λ-tick : scheduler management duty cycle.
Definition: scheduler.hpp:494
const size_t DISMISS_CYCLES
number of wait cycles before an idle worker terminates completely
Definition: scheduler.hpp:129
Any copy and copy construction prohibited.
Definition: nocopy.hpp:37
void announceLoad(FrameRate fps)
Scheduler Layer-2 : execution of Scheduler Activities.
Offset FUTURE_PLANNING_LIMIT
limit timespan of deadline into the future (~360 MiB max)
Definition: scheduler.hpp:132
Render Engine performance data collection service.
void postChain(ActivationEvent)
Enqueue for time-bound execution, possibly dispatch immediately.
Definition: scheduler.hpp:620
ScheduleSpec post()
build Activity chain and hand-over to the Scheduler.
Definition: scheduler.hpp:547
void ignite()
Spark the engine self-regulation cycle and power up WorkForce.
Definition: scheduler.hpp:259
Time getSchedTime()
access high-resolution-clock, rounded to µ-Ticks
Definition: scheduler.hpp:394
A front-end for using printf-style formatting.
LoadController::Wiring connectMonitoring()
Definition: scheduler.hpp:383
ScheduleSpec defineSchedule(Job job)
Render Job builder: start definition of a schedule to invoke the given Job.
Definition: scheduler.hpp:357
Lumiera&#39;s internal time value datatype.
Definition: timevalue.hpp:299
Offset DUTY_CYCLE_TOLERANCE
maximum slip tolerated on duty-cycle start before triggering Scheduler-emergency
Definition: scheduler.hpp:131
activity::Proc post(Time when, Time dead, Activity *chain, ExecutionCtx &ctx)
λ-post: enqueue for time-bound execution, within given ExecutionCtx.
Definition: scheduler.hpp:463
void done(Time now, size_t qualifier)
λ-done : signal end time of actual processing.
Definition: scheduler.hpp:487
Controller to coordinate resource usage related to the Scheduler.
void activate(ManifestationID manID)
Enable entries marked with a specific ManifestationID to be processed.
Derived specific exceptions within Lumiera&#39;s exception hierarchy.
Definition: error.hpp:190
Term builder and execution framework to perform chains of scheduler Activities.
»Scheduler-Service« : coordinate render activities.
Definition: scheduler.hpp:213
Layer-1 of the Scheduler: queueing and prioritisation of activities.
Token or Atom with distinct identity.
Definition: symbol.hpp:117
void maybeScaleWorkForce(Time startHorizon)
Hook invoked whenever a new task is passed in.
Definition: scheduler.hpp:695
void ensureCapacity(Time startHorizon)
Hook to check and possibly scale up WorkForce to handle one additional job.
Mix-Ins to allow or prohibit various degrees of copying and cloning.
Marker for current (and obsolete) manifestations of a CalcStream processed by the Render-Engine...
Definition: activity.hpp:84
const auto IDLE_WAIT
sleep-recheck cycle for workers deemed idle
Definition: scheduler.hpp:128
work-timing event for performance observation
Definition: scheduler.hpp:416
A language framework to define and interconnect scheduler activity verbs.
void handleDutyCycle(Time now, bool=false)
»Tick-hook« : code to maintain a sane running status.
Definition: scheduler.hpp:647
void triggerEmergency()
Trip the emergency brake and unwind processing while retaining all state.
Definition: scheduler.hpp:713
boost::rational< int64_t > FSecs
rational representation of fractional seconds
Definition: timevalue.hpp:220
bool maintainQueueHead(SchedulerInvocation &layer1, Time now)
update queue head to discard obsolete content.
Layer-2 of the Scheduler: coordination and interaction of activities.
void continueMetaJob(Time nextStart, Job planningJob, ManifestationID manID=ManifestationID())
Place a follow-up job-planning job into the timeline.
Definition: scheduler.hpp:333
activity::Proc doWork()
The worker-Functor: called by the active Workers from the WorkForce to pull / perform the actual rend...
Definition: scheduler.hpp:520
Lumiera error handling (C++ interface).
void markWorkerExit()
statistics update on scaling down the WorkForce
void activate(double degree=1.0)
Activate or scale up the worker pool.
Definition: work-force.hpp:271
static const Time NEVER
border condition marker value. NEVER >= any time value
Definition: timevalue.hpp:314
Offset measures a distance in time.
Definition: timevalue.hpp:358
auto setup(FUN &&workFun)
Helper: setup a Worker-Pool configuration for the test.
Time getSchedTime()
access high-resolution-clock, rounded to µ-Ticks
Definition: scheduler.hpp:502
void discardBefore(Time deadline)
ScopedGroomingGuard requireGroomingTokenHere()
a scope guard to force acquisition of the GroomingToken
static activity::Proc dispatchChain(Activity *chain, EXE &executionCtx)
Execution Framework: dispatch performance of a chain of Activities.
A pool of workers for multithreaded rendering.
static size_t COMPUTATION_CAPACITY
Nominal »full size« of a pool of concurrent workers.
Definition: work-force.hpp:106
Proc
Result instruction from Activity activation.
Definition: activity.hpp:140
Activity & createTick(Time deadline)
Individual frame rendering task, forwarding to a closure.
Definition: job.h:268
Duration WORK_HORIZON
the scope of activity currently in the works
Front-end for simplified access to the current wall clock time.
Pool of worker threads for rendering.
Definition: work-force.hpp:240
void terminateProcessing()
Bring down processing destructively as fast as possible.
Definition: scheduler.hpp:278
Base for configuration of the worker pool.
Definition: work-force.hpp:104
Vault-Layer implementation namespace root.
activity::Term buildMetaJob(Job job, Time start, Time deadline)
Builder-API: initiate definition of internal/planning job.
Collector and aggregator for performance data.
void updateState(Time)
periodic call to build integrated state indicators
Scheduler Layer-1 : time based dispatch.
void work(Time now, size_t qualifier)
λ-work : transition Managment-Mode -> Work-Mode.
Definition: scheduler.hpp:479
activity::Proc dispatchCapacity(SchedulerInvocation &, LoadController &, DISPATCH, CLOCK)
Implementation of the worker-Functor: