Lumiera  0.pre.03
»edit your freedom«
steam-dispatcher.cpp
Go to the documentation of this file.
1 /*
2  SteamDispatcher - Steam-Layer command dispatch and execution
3 
4  Copyright (C) Lumiera.org
5  2008, Hermann Vosseler <Ichthyostega@web.de>
6 
7  This program is free software; you can redistribute it and/or
8  modify it under the terms of the GNU General Public License as
9  published by the Free Software Foundation; either version 2 of
10  the License, or (at your option) any later version.
11 
12  This program is distributed in the hope that it will be useful,
13  but WITHOUT ANY WARRANTY; without even the implied warranty of
14  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15  GNU General Public License for more details.
16 
17  You should have received a copy of the GNU General Public License
18  along with this program; if not, write to the Free Software
19  Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20 
21 * *****************************************************/
22 
23 
83 #include "lib/error.hpp"
84 #include "include/logging.h"
88 #include "steam/control/looper.hpp"
91 #include "lib/depend-inject.hpp"
92 #include "lib/sync-barrier.hpp"
93 #include "lib/thread.hpp"
94 #include "lib/util.hpp"
95 
96 #include <utility>
97 #include <memory>
98 
99 using lib::Sync;
100 using lib::SyncBarrier;
101 using lib::ThreadHookable;
102 using lib::RecursiveLock_Waitable;
103 using std::make_unique;
104 using std::move;
105 
106 namespace steam {
107 namespace control {
108 
109  namespace error = lumiera::error;
110 
111 
112  /********************************************************************/
120  : public CommandDispatch
121  , public Sync<RecursiveLock_Waitable>
122  {
124  using Launch = lib::ThreadHookable::Launch;
125 
128 
129  SyncBarrier init_;
130  CommandQueue queue_;
131  string error_;
132  Looper looper_;
133  ThreadHookable thread_;
134 
135  public:
144  template<typename FUN>
145  DispatcherLoop (FUN&& atExit)
146  : commandService_{ServiceHandle::NOT_YET_STARTED}
147  , queue_{}
148  , error_{}
149  , looper_([&]() -> bool
150  {
151  return not queue_.empty();
152  })
153  , thread_{ //----the-Session-Thread---------------
154  Launch{&DispatcherLoop::runSessionThread, this}
155  .threadID("Session")
156  .atExit([this, signalEndOfThread = move(atExit)]
158  {
160  signalEndOfThread (&error_);
161  })}
162  {
163  init_.sync(); // done with setup; loop may run now....
164  INFO (session, "Steam-Dispatcher running...");
165  {
166  Lock sync{this}; // open public session interface:
167  commandService_.createInstance(*this);
168  }
169  }
170 
171  ~DispatcherLoop()
172  {
173  try {
174  commandService_.shutdown(); // redundant call, to ensure session interface is closed reliably
175  INFO (session, "Steam-Dispatcher stopped.");
176  if (thread_)
177  ALERT (session, "Dispatcher destroyed while Session thread is running. The rest is silence.");
178  }
179  ERROR_LOG_AND_IGNORE(session, "Stopping the Steam-Dispatcher");
180  }
181 
182  void
183  activateCommandProecssing()
184  {
185  Lock sync{this};
186  looper_.enableProcessing(true);
187  INFO (command, "Session command processing activated.");
188  sync.notify_all();
189  }
190 
191  void
192  deactivateCommandProecssing()
193  {
194  Lock sync{this};
195  looper_.enableProcessing(false);
196  INFO (command, "Session command interface closed.");
197  sync.notify_all();
198  }
199 
200  void
201  requestStop() noexcept
202  {
203  Lock sync{this};
204  commandService_.shutdown(); // closes Session interface
205  looper_.triggerShutdown();
206  sync.notify_all();
207  }
208 
209  void
210  awaitStateProcessed() const
211  {
212  Lock{this, [&]{ return isStateSynched(); }};
213  // wake-up typically by updateState()
214  }
215 
216  size_t
217  size() const
218  {
219  Lock sync{this};
220  return queue_.size();
221  }
222 
223 
224  /* === CommandDispatch interface === */
225 
226  void
227  enqueue (Command&& cmd) override
228  {
229  Lock sync{this};
230  queue_.feed (move (cmd));
231  sync.notify_all();
232  }
233 
234  void
235  clear() override
236  {
237  Lock sync{this};
238  queue_.clear();
239  sync.notify_all();
240  }
241 
242  private:
250  void
252  {
253  init_.sync();
254  try
255  {
256  while (looper_.shallLoop())
257  {
258  awaitAction();
259  if (looper_.isDying())
260  break;
261  if (looper_.runBuild())
262  startBuilder();
263  else
264  if (looper_.isWorking())
265  processCommands();
266  updateState();
267  }
268  }
269  catch (std::exception& problem)
270  { // could also be lumiera::Error
271  error_ = problem.what();
272  lumiera_error(); // clear error flag
273  }
274  catch (...)
275  {
276  error_ = string{lumiera_error()};
277  }
278  // Session thread terminates...
279  } // atExit-λ will invoke ~DispatcherLoop()
280 
281  void
282  awaitAction()
283  {
284  Lock{this}.wait_for (looper_.getTimeout()
285  ,[&]{ return looper_.requireAction(); });
286  }
287 
288  void
289  updateState()
290  {
291  looper_.markStateProcessed();
292  if (looper_.isDisabled()) // otherwise wake-up would not be safe
293  getMonitor(this).notify_all();
294  }
295 
296  bool
297  isStateSynched() const
298  {
299  if (thread_.invokedWithinThread())
300  throw error::Fatal("Possible Deadlock. "
301  "Attempt to synchronise to a command processing check point "
302  "from within the (single) session thread."
303  , error::LUMIERA_ERROR_LIFECYCLE);
304  return not looper_.hasPendingChanges();
305  }
306 
307  void
308  processCommands()
309  {
310  Command cmd;
311  {
312  Lock sync{this};
313  if (not queue_.empty())
314  cmd = queue_.pop();
315  }
316  if (cmd)
317  {
318  INFO (command, "+++ dispatch %s", cStr(cmd));
319 
321  if (util::startsWith (string(cmd.getID()), "test"))
322  {
323  INFO (command, "+++ -------->>> bang!");
324  auto resultState = cmd();
325  resultState.maybeThrow();
326  }
328  }
329  }
330 
331  void
332  startBuilder()
333  {
334  INFO (builder, "+++ start the Steam-Builder...");
335  }
336  };
337 
338 
339 
340 
341 
342 
345 
346 
347  /* ======== SteamDispatcher implementation ======== */
348 
349  SteamDispatcher::SteamDispatcher() { }
350  SteamDispatcher::~SteamDispatcher() { }
351 
360  bool
361  SteamDispatcher::start (Subsys::SigTerm termNotification)
362  {
363  Lock sync{this};
364  if (runningLoop_) return false;
365 
366  runningLoop_ =
367  make_unique<DispatcherLoop>(
368  [=](string* problemIndicator)
369  { // when the Session thread ends....
371  termNotification (problemIndicator);
372  });
373 
374  if (active_)
375  runningLoop_->activateCommandProecssing();
376  return true;
377  }
378 
388  void
390  {
391  Lock sync{this};
392  if (runningLoop_)
393  runningLoop_.reset(); // delete DispatcherLoop object
394  else
395  WARN (command, "clean-up of DispatcherLoop invoked, "
396  "while SteamDispatcher is not marked as 'running'. "
397  "Likely an error in lifecycle logic, as the only one "
398  "intended to delete this object is the loop thread itself.");
399  }
400 
401 
406  bool
408  {
409  Lock sync{this};
410  return bool(runningLoop_);
411  }
412 
413 
417  void
419  {
420  try {
421  Lock sync{this};
422  if (runningLoop_)
423  runningLoop_->requestStop();
424  }
425  ERROR_LOG_AND_IGNORE (command, "Request for Session Loop Thread to terminate");
426  }
427 
428 
429 
438  void
440  {
441  Lock sync{this};
442  active_ = true;
443  if (runningLoop_)
444  runningLoop_->activateCommandProecssing();
445  }
446 
447 
454  void
456  {
457  Lock sync{this};
458  active_ = false;
459  if (runningLoop_)
460  runningLoop_->deactivateCommandProecssing();
461  }
462 
463 
470  void
472  {
473  Lock sync{this};
474  if (runningLoop_)
475  runningLoop_->awaitStateProcessed();
476  }
477 
478 
480  void
482  {
483  Lock sync{this};
484  if (not empty())
485  {
486  WARN (command, "DISCARDING pending Session commands.");
487  REQUIRE (runningLoop_);
488  runningLoop_->clear();
489  }
490  }
491 
492 
493  bool
494  SteamDispatcher::empty() const
495  {
496  Lock sync{this};
497  return not runningLoop_
498  or 0 == runningLoop_->size();
499  }
500 
501 
502 
503 
504 }} // namespace steam::control
Facility for monitor object based locking.
Definition: sync.hpp:217
DispatcherLoop(FUN &&atExit)
start the session loop thread
static lib::Depend< SteamDispatcher > instance
storage for Singleton access
void awaitDeactivation()
block until the dispatcher has actually reached disabled state.
void runSessionThread()
any operation running in the Session thread is started from here.
bool invokedWithinThread() const
detect if the currently executing code runs within this thread
Definition: thread.cpp:74
CStr cStr(std::string const &rendered)
convenience shortcut: forced conversion to c-String via string.
Definition: symbol.hpp:68
#define ERROR_LOG_AND_IGNORE(_FLAG_, _OP_DESCR_)
convenience shortcut for a sequence of catch blocks just logging and consuming an error...
Definition: error.hpp:275
Configuration handle to expose a service implementation through the Depend<SRV> front-end.
bool hasPendingChanges() const
< "check point"
Definition: looper.hpp:182
Interface to abstract the SteamDispatcher&#39;s ability to handle command messages.
Per type specific configuration of instances created as service dependencies.
Dispatch and execute mutation operations on the High-level model.
void clear()
discard any commands waiting in the dispatcher queue
A public service offered by the Session, implementing the SessionCommand facade interface.
This header is for including and configuring NoBug.
Steam-Layer implementation namespace root.
Implementation building block of SteamDispatcher to organise commands.
Namespace of Session and user visible high-level objects.
Definition: sequence.hpp:74
Access point to singletons and other kinds of dependencies designated by type.
Definition: depend.hpp:289
ServiceHandle commandService_
manage the primary public Session interface
bool shallLoop() const
state fusion to control looping
Definition: looper.hpp:206
bool requireAction()
state fusion to control (timed) wait
Definition: looper.hpp:189
Implementation building block of SteamDispatcher to control waiting and timing.
bool start(Subsys::SigTerm)
starting the SteamDispatcher means to start the session subsystem.
Derived specific exceptions within Lumiera&#39;s exception hierarchy.
Definition: error.hpp:199
Interface of a service to perform Commands on the session.
Tiny helper functions and shortcuts to be used everywhere Consider this header to be effectively incl...
void markStateProcessed()
invoking this function signals that all consequences of past state changes have been processed and ar...
Definition: looper.hpp:174
lumiera_err lumiera_error(void)
Get and clear current error state.
Definition: error-state.c:124
void requestStop() noexcept
signal to the loop thread that it needs to terminate.
Convenience front-end to simplify and codify basic thread handling.
Lumiera error handling (C++ interface).
Implementation of the Session&#39;s command queue.
void deactivate()
halt further processing of session commands
A one time N-fold mutual synchronisation barrier.
Handle object representing a single Command instance to be used by client code.
Definition: command.hpp:124
Extended variant of the standard case, allowing to install callbacks (hook functions) to be invoked d...
Definition: thread.hpp:724
PImpl within SteamDispatcher to implement the Session Loop Thread. During the lifetime of this object...
Primary Interface to the current Session.
Encapsulated control logic for the session thread loop.
Definition: looper.hpp:114
void activate()
activate processing of enqueued session commands.
bool isRunning()
whether the »session subsystem« is operational.
void detach_thread_from_wrapper()
allow to detach explicitly — independent from thread-function&#39;s state.
Definition: thread.hpp:239
A N-fold synchronisation latch using yield-wait until fulfilment.