Lumiera 0.pre.04
»edit your freedom«
Loading...
Searching...
No Matches
work-force.hpp
Go to the documentation of this file.
1/*
2 WORK-FORCE.hpp - actively coordinated pool of workers for rendering
3
4 Copyright (C)
5 2023, Hermann Vosseler <Ichthyostega@web.de>
6
7  **Lumiera** is free software; you can redistribute it and/or modify it
8  under the terms of the GNU General Public License as published by the
9  Free Software Foundation; either version 2 of the License, or (at your
10  option) any later version. See the file COPYING for further details.
11
12*/
13
14
43#ifndef SRC_VAULT_GEAR_WORK_FORCE_H_
44#define SRC_VAULT_GEAR_WORK_FORCE_H_
45
46
47#include "vault/common.hpp"
49#include "lib/meta/function.hpp"
50#include "lib/thread.hpp"
51#include "lib/nocopy.hpp"
52#include "lib/util.hpp"
53
54#include <utility>
55#include <chrono>
56#include <atomic>
57#include <list>
58
59
60namespace vault{
61namespace gear {
62
63 using std::move;
64 using std::atomic;
65 using util::unConst;
66 using std::chrono::milliseconds;
67 using std::chrono::microseconds;
68 using std::chrono_literals::operator ""ms;
69 using std::chrono_literals::operator ""us;
70 using std::this_thread::sleep_for;
71
72
73 namespace {
74 const double MAX_OVERPROVISIONING = 3.0;
75
76 const size_t CONTEND_SOFT_LIMIT = 3;
79 const size_t CONTEND_SOFT_FACTOR = 100;
80 const size_t CONTEND_RANDOM_STEP = 11;
81 const microseconds CONTEND_WAIT = 100us;
82
83 inline size_t
85 {
86 return std::hash<std::thread::id>{} (std::this_thread::get_id());
87 }
88 }
89
90 namespace work {
91
93 using SIG_FinalHook = void(bool);
94
104 struct Config
105 {
106 static size_t COMPUTATION_CAPACITY;
107
108 const milliseconds IDLE_WAIT = 20ms;
109 const size_t DISMISS_CYCLES = 100;
110
111 static size_t getDefaultComputationCapacity();
112 };
113
114
115
116 void performRandomisedSpin (size_t,size_t);
117 microseconds steppedRandDelay(size_t,size_t);
118
119
120 using Launch = lib::Thread::Launch;
121
122 /*************************************/
126 template<class CONF>
127 class Worker
128 : CONF
130 {
131 public:
132 Worker (CONF config)
133 : CONF{move (config)}
134 , thread_{Launch{&Worker::pullWork, this}
135 .threadID("Worker")
136 .decorateCounter()}
137 { }
138
140 std::atomic<bool> emergency{false};
141
143 bool isDead() const { return not thread_; }
144
145
146 private:
148
149 void
151 {
152 ASSERT_MEMBER_FUNCTOR (&CONF::doWork, SIG_WorkFun);
153 ASSERT_MEMBER_FUNCTOR (&CONF::finalHook, SIG_FinalHook);
154
155 bool regularExit{false};
156 try /* ================ pull work ===================== */
157 {
158 while (true)
159 {
160 activity::Proc res = CONF::doWork();
161 if (emergency.load (std::memory_order_relaxed))
162 break;
163 if (res == activity::KICK)
164 res = contentionWait();
165 else
166 if (kickLevel_)
167 kickLevel_ /= 2;
168 if (res == activity::WAIT)
169 res = idleWait();
170 else
171 idleCycles_ = 0;
172 if (res != activity::PASS)
173 break;
174 }
175 regularExit = true;
176 }
177 ERROR_LOG_AND_IGNORE (threadpool, "defunct worker thread")
178
179 try /* ================ thread-exit hook ============== */
180 {
181 CONF::finalHook (not regularExit);
182 }
183 ERROR_LOG_AND_IGNORE (threadpool, "failure in thread-exit hook")
184 }// Thread will terminate....
185
188 {
189 ++idleCycles_;
190 if (idleCycles_ < CONF::DISMISS_CYCLES)
191 {
192 sleep_for (CONF::IDLE_WAIT);
193 return activity::PASS;
194 }
195 else // idle beyond threshold => terminate worker
196 return activity::HALT;
197 }
198 size_t idleCycles_{0};
199
200
203 {
204 if (not randFact_)
205 randFact_ = thisThreadHash() % CONTEND_RANDOM_STEP;
206
207 if (kickLevel_ <= CONTEND_SOFT_LIMIT)
208 for (uint i=0; i<kickLevel_; ++i)
209 {
211 std::this_thread::yield();
212 }
213 else
214 {
215 auto stepping = util::min (kickLevel_, CONTEND_STARK_LIMIT) - CONTEND_SOFT_LIMIT;
216 std::this_thread::sleep_for (steppedRandDelay(stepping,randFact_));
217 }
218
219 if (kickLevel_ < CONTEND_SATURATION)
220 ++kickLevel_;
221 return activity::PASS;
222 }
223 size_t kickLevel_{0};
224 size_t randFact_{0};
225 };
226 }//(End)namespace work
227
228
229
230
231 /***********************************************************/
239 template<class CONF>
242 {
243 using Pool = std::list<work::Worker<CONF>>;
244
245 CONF setup_;
247
248
249 public:
250 WorkForce (CONF config)
251 : setup_{move (config)}
252 , workers_{}
253 { }
254
256 {
257 try {
259 }
260 ERROR_LOG_AND_IGNORE (threadpool, "defunct worker thread")
261 }
262
263
270 void
271 activate (double degree =1.0)
272 {
273 size_t scale{setup_.COMPUTATION_CAPACITY};
274 scale = size_t(util::limited (0.0, degree*scale, scale*MAX_OVERPROVISIONING));
275 for (uint i = workers_.size(); i < scale; ++i)
276 workers_.emplace_back (setup_);
277 }
278
279 void
280 incScale(uint step =+1)
281 {
282 uint i = workers_.size();
283 uint target = util::min (i+step, setup_.COMPUTATION_CAPACITY);
284 for ( ; i < target; ++i)
285 workers_.emplace_back (setup_);
286 }
287
288 void
290 {
291 for (auto& w : workers_)
292 w.emergency.store (true, std::memory_order_relaxed);
293 while (0 < size())
294 sleep_for (setup_.IDLE_WAIT);
295 }
296
297 size_t
298 size() const
299 {
300 unConst(workers_).remove_if([](auto& w){ return w.isDead(); });
301 return workers_.size();
302 }
303 };
304
305
306
307}} // namespace vault::gear
308#endif /*SRC_VAULT_GEAR_WORK_FORCE_H_*/
Descriptor for a piece of operational logic performed by the scheduler.
A thin convenience wrapper to simplify thread-handling.
Definition thread.hpp:650
Any copy and copy construction prohibited.
Definition nocopy.hpp:38
Pool of worker threads for rendering.
void incScale(uint step=+1)
std::list< work::Worker< CONF > > Pool
void activate(double degree=1.0)
Activate or scale up the worker pool.
Individual worker thread: repeatedly pulls the doWork functor.
std::atomic< bool > emergency
emergency break to trigger cooperative halt
activity::Proc idleWait()
activity::Proc contentionWait()
bool isDead() const
this Worker starts out active, but may terminate
#define ERROR_LOG_AND_IGNORE(_FLAG_, _OP_DESCR_)
convenience shortcut for a sequence of catch blocks just logging and consuming an error.
Definition error.hpp:267
Metaprogramming tools for detecting and transforming function types.
#define ASSERT_MEMBER_FUNCTOR(_EXPR_, _SIG_)
Macro for a compile-time check to verify some member is present and comprises something invokable wit...
Definition function.hpp:342
unsigned int uint
Definition integral.hpp:29
constexpr NUM limited(NB lowerBound, NUM val, NB upperBound)
force a numeric to be within bounds, inclusively
Definition util.hpp:91
OBJ * unConst(const OBJ *)
shortcut to save some typing when having to define const and non-const variants of member functions
Definition util.hpp:358
auto min(IT &&elms)
Proc
Result instruction from Activity activation.
Definition activity.hpp:140
@ PASS
pass on the activation down the chain
Definition activity.hpp:140
@ KICK
back pressure; get out of the way but be back soon
Definition activity.hpp:143
@ WAIT
nothing to do; wait and re-check for work later
Definition activity.hpp:142
@ HALT
abandon this play / render process
Definition activity.hpp:144
const double MAX_OVERPROVISIONING
safety guard to prevent catastrophic over-provisioning
const microseconds CONTEND_WAIT
base time unit for the exponentially stepped-up sleep delay in case of contention
const size_t CONTEND_SATURATION
upper limit for the contention event count
const size_t CONTEND_SOFT_LIMIT
zone for soft anti-contention measures, counting continued contention events
const size_t CONTEND_STARK_LIMIT
zone for stark measures, performing a sleep with exponential stepping
const size_t CONTEND_RANDOM_STEP
stepping for randomisation of anti-contention measures
const size_t CONTEND_SOFT_FACTOR
base counter for a spinning wait loop
void(bool) SIG_FinalHook
config should define callable invoked at exit (argument: isFailure)
lib::Thread::Launch Launch
activity::Proc(void) SIG_WorkFun
config should define a callable with this signature to perform work
void performRandomisedSpin(size_t, size_t)
This is part of the weak level of anti-contention measures.
microseconds steppedRandDelay(size_t, size_t)
Calculate the delay time for a stronger anti-contention wait.
Vault-Layer implementation namespace root.
Mix-Ins to allow or prohibit various degrees of copying and cloning.
Primary class template for std::hash.
Base for configuration of the worker pool.
static size_t getDefaultComputationCapacity()
default value for full computing capacity is to use all (virtual) cores.
static size_t COMPUTATION_CAPACITY
Nominal »full size« of a pool of concurrent workers.
const size_t DISMISS_CYCLES
number of idle cycles after which the worker terminates
const milliseconds IDLE_WAIT
wait period when a worker falls idle
Convenience front-end to simplify and codify basic thread handling.
Tiny helper functions and shortcuts to be used everywhere Consider this header to be effectively incl...
Basic set of definitions and includes commonly used together (Vault).