Lumiera  0.pre.03
»edit your freedom«
scheduler-commutator.hpp
Go to the documentation of this file.
1 /*
2  SCHEDULER-COMMUTATOR.hpp - coordination layer of the render engine scheduler
3 
4  Copyright (C)
5  2023, Hermann Vosseler <Ichthyostega@web.de>
6 
7   **Lumiera** is free software; you can redistribute it and/or modify it
8   under the terms of the GNU General Public License as published by the
9   Free Software Foundation; either version 2 of the License, or (at your
10   option) any later version. See the file COPYING for further details.
11 
12 */
13 
14 
61 #ifndef SRC_VAULT_GEAR_SCHEDULER_COMMUTATOR_H_
62 #define SRC_VAULT_GEAR_SCHEDULER_COMMUTATOR_H_
63 
64 
65 #include "vault/common.hpp"
66 #include "vault/gear/activity.hpp"
70 #include "lib/time/timevalue.hpp"
71 #include "lib/format-string.hpp"
72 #include "lib/nocopy.hpp"
73 
74 #include <thread>
75 #include <atomic>
76 
77 
78 namespace vault{
79 namespace gear {
80 
81  using lib::time::Offset;
82  using lib::time::FSecs;
83  using lib::time::Time;
84  using std::atomic;
85  using std::memory_order::memory_order_relaxed;
86  using std::memory_order::memory_order_acquire;
87  using std::memory_order::memory_order_release;
88  using std::chrono_literals::operator ""us;
89  using std::chrono::microseconds;
90 
91  namespace { // Configuration / Scheduling limit
92 
93  microseconds GROOMING_WAIT_CYCLE{70us};
94 
96  auto inline thisThread() { return std::this_thread::get_id(); }
97  }
98 
99 
100 
101  /*************************************************************/
111  {
112  using ThreadID = std::thread::id;
113  atomic<ThreadID> groomingToken_{};
114 
115 
116  public:
117  SchedulerCommutator() = default;
118 
127  bool
129  {
130  ThreadID expect_noThread; // expect no one else to be in...
131  return groomingToken_.compare_exchange_strong (expect_noThread, thisThread()
132  ,memory_order_acquire // success also constitutes an acquire barrier
133  ,memory_order_relaxed // failure has no synchronisation ramifications
134  );
135  }
136 
143  void
144  dropGroomingToken() noexcept
145  { // expect that this thread actually holds the Grooming-Token
146  REQUIRE (groomingToken_.load(memory_order_relaxed) == thisThread());
147  const ThreadID noThreadHoldsIt;
148  groomingToken_.store (noThreadHoldsIt, memory_order_release);
149  }
150 
155  bool
156  holdsGroomingToken (ThreadID id) noexcept
157  {
158  return id == groomingToken_.load (memory_order_relaxed);
159  }
160 
161 
162  class ScopedGroomingGuard;
165 
166 
167 
169  void
171  {
172  if (layer1.hasPendingInput()
174  or acquireGoomingToken()))
175  layer1.feedPrioritisation();
176  }
177 
183  bool
185  {
186  ENSURE (holdsGroomingToken (thisThread()));
187  layer1.feedPrioritisation();
188  while (layer1.isOutdated (now) and not layer1.isOutOfTime(now))
189  layer1.pullHead();
190  return not layer1.isOutOfTime(now);
191  }
192 
200  {
202  or acquireGoomingToken())
203  {
204  layer1.feedPrioritisation();
205  while (layer1.isOutdated (now) and not layer1.isOutOfTime(now))
206  layer1.pullHead();
207  if (not maintainQueueHead (layer1,now))
208  ALERT (engine, "MISSED compulsory job -- should raise Scheduler-Emergency");
209  else
210  if (layer1.isDue (now))
211  return layer1.pullHead();
212  }
213  return ActivationEvent();
214  }
215 
216 
217 
218  /***********************************************************/
234  {
236  layer1.feedPrioritisation (move (event));
237  else
238  layer1.instruct (move (event));
239  return activity::PASS;
240  }
241 
242 
248  template<class DISPATCH, class CLOCK>
251 
252 
253 
254  private:
256  scatteredDelay (Time now, Time head
257  ,LoadController& loadController
258  ,LoadController::Capacity capacity);
259 
260  void
261  ensureDroppedGroomingToken()
262  {
265  }
266 
272  {
273  activity::Proc lastResult = activity::PASS;
274 
278  operator activity::Proc()
279  {
280  return activity::SKIP == lastResult? activity::PASS
281  : lastResult;
282  }
283 
284  template<class FUN>
286  performStep (FUN step)
287  {
288  if (activity::PASS == lastResult)
289  lastResult = step();
290  return move(*this);
291  }
292  };
293  };
294 
295 
296 
297 
298 
299 
318  template<class DISPATCH, class CLOCK>
319  inline activity::Proc
321  ,LoadController& loadController
322  ,DISPATCH executeActivity
323  ,CLOCK getSchedTime
324  )
325  {
326  try {
327  auto res = WorkerInstruction{}
328  .performStep([&]{
329  maybeFeed(layer1);
330  Time now = getSchedTime();
331  Time head = layer1.headTime();
332  return scatteredDelay(now, head, loadController,
333  loadController.markIncomingCapacity (head,now));
334  })
335  .performStep([&]{
336  Time now = getSchedTime();
337  auto toDispatch = findWork (layer1,now);
338  if (not toDispatch) return activity::KICK; // contention
339  return executeActivity (toDispatch);
340  })
341  .performStep([&]{
342  maybeFeed(layer1);
343  Time now = getSchedTime();
344  Time head = layer1.headTime();
345  return scatteredDelay(now, head, loadController,
346  loadController.markOutgoingCapacity (head,now));
347  });
348 
349  // ensure lock clean-up
350  if (res != activity::PASS)
351  ensureDroppedGroomingToken();
352  return res;
353  }
354  catch(...)
355  {
356  ensureDroppedGroomingToken();
357  throw;
358  }
359  }
360 
361 
376  inline activity::Proc
378  ,LoadController& loadController
379  ,LoadController::Capacity capacity)
380  {
381  auto doTargetedSleep = [&]
382  { // ensure not to block the Scheduler after management work
383  ensureDroppedGroomingToken();
384  // relocate this thread(capacity) to a time where its more useful
385  Offset targetedDelay = loadController.scatteredDelayTime (now, capacity);
386  std::this_thread::sleep_for (std::chrono::microseconds (_raw(targetedDelay)));
387  };
388  auto doTendNextHead = [&]
389  {
390  if (not loadController.tendedNext(head)
392  or acquireGoomingToken()))
393  loadController.tendNext(head);
394  };
395 
396  switch (capacity) {
398  return activity::PASS;
400  std::this_thread::yield();
401  return activity::SKIP; // prompts to abort chain but call again immediately
403  return activity::WAIT; // prompts to switch this thread into sleep mode
405  doTendNextHead();
406  doTargetedSleep(); // let this thread wait until next head time is due
407  return activity::SKIP;
408  default:
409  doTargetedSleep();
410  return activity::SKIP; // prompts to abort this processing-chain for good
411  }
412  }
413 
414 
415 
416 
417 
420  {
421  SchedulerCommutator& commutator_;
422  bool handledActively_;
423 
424  bool
425  ensureHoldsToken()
426  {
427  if (commutator_.holdsGroomingToken(thisThread()))
428  return false;
429  while (not commutator_.acquireGoomingToken())
430  std::this_thread::sleep_for (GROOMING_WAIT_CYCLE);
431  return true;
432  }
433 
434  public:
437  : commutator_(layer2)
438  , handledActively_{ensureHoldsToken()}
439  { }
440 
442  {
443  if (handledActively_ and
444  commutator_.holdsGroomingToken(thisThread()))
445  commutator_.dropGroomingToken();
446  }
447  };
448 
449 
465  {
466  return ScopedGroomingGuard(*this);
467  }
468 
469 
470 
471 }} // namespace vault::gear
472 #endif /*SRC_VAULT_GEAR_SCHEDULER_COMMUTATOR_H_*/
bool tendedNext(Time nextHead) const
did we already tend for the indicated next relevant head time?
activity::Proc postChain(ActivationEvent event, SchedulerInvocation &layer1)
This is the primary entrance point to the Scheduler.
Scheduler resource usage coordination.
activity::Proc scatteredDelay(Time now, Time head, LoadController &loadController, LoadController::Capacity capacity)
A worker asking for work constitutes free capacity, which can be redirected into a focused zone of th...
bool holdsGroomingToken(ThreadID id) noexcept
check if the indicated thread currently holds the right to conduct internal state transitions...
microseconds GROOMING_WAIT_CYCLE
wait-sleep in case a thread must forcibly acquire the Grooming-Token
void dropGroomingToken() noexcept
relinquish the right for internal state transitions.
Any copy and copy construction prohibited.
Definition: nocopy.hpp:37
Scheduler Layer-2 : execution of Scheduler Activities.
auto thisThread()
convenient short-notation, also used by SchedulerService
Types marked with this mix-in may be moved but not copied.
Definition: nocopy.hpp:49
Front-end for printf-style string template interpolation.
Capacity markIncomingCapacity(Time head, Time now)
decide how this thread&#39;s capacity shall be used when returning from idle wait and asking for work ...
ActivationEvent findWork(SchedulerInvocation &layer1, Time now)
Look into the queues and possibly retrieve work due by now.
Lumiera&#39;s internal time value datatype.
Definition: timevalue.hpp:299
Controller to coordinate resource usage related to the Scheduler.
void instruct(ActivationEvent actEvent)
Accept an ActivationEvent with an Activity for time-bound execution.
Layer-1 of the Scheduler: queueing and prioritisation of activities.
Mix-Ins to allow or prohibit various degrees of copying and cloning.
bool isOutdated(Time now) const
determine if Activity at scheduler is outdated and should be discarded
Offset scatteredDelayTime(Time now, Capacity capacity)
Generate a time offset to relocate currently unused capacity to a time range where it&#39;s likely to be ...
A language framework to define and interconnect scheduler activity verbs.
boost::rational< int64_t > FSecs
rational representation of fractional seconds
Definition: timevalue.hpp:220
bool maintainQueueHead(SchedulerInvocation &layer1, Time now)
update queue head to discard obsolete content.
void tendNext(Time nextHead)
Mark the indicated next head time as tended.
monad-like step sequence: perform sequence of steps, as long as the result remains activity::PASS ...
Basic set of definitions and includes commonly used together (Vault).
bool isOutOfTime(Time now) const
detect a compulsory Activity at scheduler head with missed deadline
Offset measures a distance in time.
Definition: timevalue.hpp:358
awaiting imminent activities
ScopedGroomingGuard requireGroomingTokenHere()
a scope guard to force acquisition of the GroomingToken
Proc
Result instruction from Activity activation.
Definition: activity.hpp:140
ActivationEvent pullHead()
Retrieve from the scheduling queue the entry with earliest start time.
Capacity markOutgoingCapacity(Time head, Time now)
decide how this thread&#39;s capacity shall be used after it returned from being actively employed ...
a family of time value like entities and their relationships.
void feedPrioritisation()
Pick up all new events from the entrance queue and enqueue them to be retrieved ordered by start time...
Capacity
Allocation of capacity to time horizon of expected work.
void maybeFeed(SchedulerInvocation &layer1)
tend to the input queue if possible
Vault-Layer implementation namespace root.
bool isDue(Time now) const
Determine if there is work to do right now.
bool acquireGoomingToken() noexcept
acquire the right to perform internal state transitions.
Scheduler Layer-1 : time based dispatch.
activity::Proc dispatchCapacity(SchedulerInvocation &, LoadController &, DISPATCH, CLOCK)
Implementation of the worker-Functor:
Descriptor for a piece of operational logic performed by the scheduler.