fw4spl
IJob.cpp
1 /* ***** BEGIN LICENSE BLOCK *****
2  * FW4SPL - Copyright (C) IRCAD, 2009-2017.
3  * Distributed under the terms of the GNU Lesser General Public License (LGPL) as
4  * published by the Free Software Foundation.
5  * ****** END LICENSE BLOCK ****** */
6 
7 #include "fwJobs/IJob.hpp"
8 
9 #include "fwJobs/exception/Waiting.hpp"
10 
11 #include <fwCom/Signal.hpp>
12 #include <fwCom/Signal.hxx>
13 #include <fwCom/Signals.hpp>
14 
15 #include <fwThread/Worker.hpp>
16 #include <fwThread/Worker.hxx>
17 
18 #include <algorithm>
19 
20 namespace fwJobs
21 {
22 
23 IJob::IJob(const std::string& name) :
24  m_sigCancelRequested(CancelRequestedSignal::New()),
25  m_sigCanceled(StateSignal::New()),
26  m_sigStarted(StateSignal::New()),
27  m_sigFinished(StateSignal::New()),
28  m_sigDoneWork(DoneWorkSignal::New()),
29  m_sigLogged(LogSignal::New()),
30  m_name(name),
31  m_doneWorkUnits(0),
32  m_totalWorkUnits(0),
33  m_cancelRequested(false),
34  m_cancelable(true),
35  m_state(WAITING)
36 {
37 }
38 
39 //------------------------------------------------------------------------------
40 
42 {
43  std::for_each(m_stateHooks.begin(), m_stateHooks.end(),
44  [&]( const StateHookSeq::value_type& f )
45  {
46  f(m_state);
47  } );
48 }
49 
50 //------------------------------------------------------------------------------
51 
52 const bool& IJob::cancelRequested() const
53 {
54  // ::fwCore::mt::ReadLock lock(m_mutex);
55  return m_cancelRequested;
56 }
57 
58 //------------------------------------------------------------------------------
59 
61 {
62  return [this]()->bool
63  {
65  return m_cancelRequested;
66  };
67 }
68 
69 //------------------------------------------------------------------------------
70 
71 std::uint64_t IJob::getDoneWorkUnits() const
72 {
74  return m_doneWorkUnits;
75 }
76 
77 //------------------------------------------------------------------------------
78 
79 std::uint64_t IJob::getTotalWorkUnits() const
80 {
82  return m_totalWorkUnits;
83 }
84 
85 //------------------------------------------------------------------------------
86 
88 {
90  if( m_cancelable && (m_state == WAITING || m_state == RUNNING) )
91  {
92  State nextState = (m_state == WAITING) ? CANCELED : CANCELING;
93 
94  {
95  ::fwCore::mt::UpgradeToWriteLock writeLock(lock);
96  m_cancelRequested = true;
97  this->setStateNoLock(CANCELING);
98  }
99 
100  // unlock mutex for cancel callbacks sanity
101  lock.unlock();
102  // m_cancelHooks can not be changed when m_state != WAITING or
103  // RUNNING, no need to lock m_mutex
104  for(auto cancel : m_cancelHooks)
105  {
106  (cancel)(*this);
107  }
108  lock.lock();
109 
110  ::fwCore::mt::UpgradeToWriteLock writeLock(lock);
111 
112  SLM_ASSERT("State shall be only CANCELING or CANCELED, not " << m_state,
113  m_state == CANCELED || m_state == CANCELING);
114 
115  if(m_state == CANCELING)
116  {
117  this->setStateNoLock(nextState);
118 
119  if(nextState == CANCELED)
120  {
121  // If we use the default constructor, the future is not valid and we will not be able to wait for it
122  // Thus we build a dummy future to get a valid one
123  m_runFuture = std::async( []() {} );
124  }
125  }
126  }
127  // else if (m_state == CANCELING)
128  // NOP
129  return m_runFuture;
130 }
131 
132 //------------------------------------------------------------------------------
133 
135 {
137  this->addCancelHookNoLock(callback);
138 }
139 
140 //------------------------------------------------------------------------------
141 
143 {
144  this->addCancelHook(JobCancelHook([ = ](IJob& /*job*/)
145  {
146  callback();
147  }));
148 }
149 
150 //------------------------------------------------------------------------------
151 
153 {
155  this->addDoneWorkHookNoLock(callback);
156 }
157 
158 //------------------------------------------------------------------------------
159 
161 {
163  this->addTotalWorkUnitsHookNoLock(callback);
164 }
165 
166 //------------------------------------------------------------------------------
167 
168 void IJob::addLogHook(LogHook callback)
169 {
171  this->addLogHookNoLock(callback);
172 }
173 
174 //------------------------------------------------------------------------------
175 
177 {
179  this->addStateHookNoLock(callback);
180 }
181 
182 //------------------------------------------------------------------------------
183 
185 {
187  return m_state;
188 }
189 
190 //------------------------------------------------------------------------------
191 
192 const std::string& IJob::getName() const
193 {
194  return m_name; //Does not need to lock mutex, the name can't be changed
195 }
196 
197 //------------------------------------------------------------------------------
198 
200 {
202 
203  if( m_state == WAITING )
204  {
205  {
206  ::fwCore::mt::UpgradeToWriteLock writeLock(lock);
207  this->setStateNoLock(RUNNING);
208  }
209 
210  lock.unlock();
211  auto future = this->runImpl();
212  lock.lock();
213 
214  ::fwCore::mt::UpgradeToWriteLock writeLock(lock);
215  m_runFuture = future;
216  }
217 
218  return m_runFuture;
219 }
220 
221 //------------------------------------------------------------------------------
222 
224 {
226  this->setStateNoLock(state);
227 }
228 
229 //------------------------------------------------------------------------------
230 
232 {
233  m_state = state;
234  switch(state)
235  {
236  case WAITING:
237  break;
238  case RUNNING:
239  m_sigStarted->asyncEmit();
240  break;
241  case CANCELING:
242  m_sigCancelRequested->asyncEmit();
243  break;
244  case CANCELED:
245  m_sigCanceled->asyncEmit();
246  break;
247  case FINISHED:
248  m_sigFinished->asyncEmit();
249  break;
250  default:
251  SLM_ASSERT("You shall not pass !", 0);
252  }
253 
254  std::for_each(m_stateHooks.begin(), m_stateHooks.end(),
255  [&]( const StateHookSeq::value_type& f )
256  {
257  f(m_state);
258  } );
259 }
260 
261 //------------------------------------------------------------------------------
262 
264 {
266  this->finishNoLock();
267 }
268 
269 //------------------------------------------------------------------------------
270 
272 {
273  this->setStateNoLock( ( m_state == CANCELING ) ? CANCELED : FINISHED );
274 }
275 
276 //------------------------------------------------------------------------------
277 
278 std::function< void() > IJob::finishCallback()
279 {
280  return [ = ] { this->finish(); };
281 }
282 
283 //------------------------------------------------------------------------------
284 
286 {
287  decltype (m_runFuture)runFuture;
288  {
290  runFuture = m_runFuture;
291  }
292 
293  try
294  {
295  runFuture.wait();
296  }
297  catch( std::future_error& )
298  {
299  FW_RAISE_EXCEPTION( ::fwJobs::exception::Waiting("Job has not been started") );
300  }
301 }
302 
303 //------------------------------------------------------------------------------
304 
305 void IJob::log(const std::string& message)
306 {
308  this->logNoLock(message);
309 }
310 
311 //------------------------------------------------------------------------------
312 
313 void IJob::logNoLock(const std::string& message)
314 {
315  m_logs.push_back(message);
316 
317  m_sigLogged->asyncEmit(message);
318 
319  std::for_each(m_logHooks.begin(), m_logHooks.end(),
320  [&]( const LogHookSeq::value_type& f )
321  {
322  f(*this, message);
323  } );
324 }
325 
326 //------------------------------------------------------------------------------
327 
329 {
330  return m_state;
331 }
332 
333 //------------------------------------------------------------------------------
334 
336 {
337  if(m_state == WAITING || m_state == RUNNING)
338  {
339  m_cancelHooks.push_back(callback);
340  }
341 }
342 
343 //------------------------------------------------------------------------------
344 
346 {
347  if(m_state == WAITING || m_state == RUNNING)
348  {
349  m_doneWorkHooks.push_back(callback);
350  }
351 }
352 
353 //------------------------------------------------------------------------------
354 
356 {
357  if(m_state == WAITING || m_state == RUNNING)
358  {
359  m_totalWorkUnitsHooks.push_back(callback);
360  }
361 }
362 
363 //------------------------------------------------------------------------------
364 
366 {
367  m_logHooks.push_back(callback);
368 }
369 
370 //------------------------------------------------------------------------------
371 
373 {
374  m_stateHooks.push_back(callback);
375 }
376 
377 //------------------------------------------------------------------------------
378 
379 void IJob::doneWork( std::uint64_t units )
380 {
382  this->doneWork( units, lock );
383 }
384 
385 //------------------------------------------------------------------------------
386 
387 void IJob::doneWork( std::uint64_t units, ::fwCore::mt::ReadToWriteLock& lock )
388 {
389  auto oldDoneWork = m_doneWorkUnits;
390  decltype(m_doneWorkHooks)doneWorkHooks;
391 
392  if(m_doneWorkUnits == units)
393  {
394  return;
395  }
396 
397  doneWorkHooks = m_doneWorkHooks;
398 
399  {
400  ::fwCore::mt::UpgradeToWriteLock writeLock(lock);
401  m_doneWorkUnits = units;
402  }
403 
405 
406  lock.unlock();
407 
408  std::for_each(doneWorkHooks.begin(), doneWorkHooks.end(),
409  [&]( const DoneWorkHookSeq::value_type& f )
410  {
411  f(*this, oldDoneWork);
412  } );
413 }
414 
415 //------------------------------------------------------------------------------
416 
417 void IJob::setTotalWorkUnits(std::uint64_t units)
418 {
420  this->setTotalWorkUnitsUpgradeLock( units, lock );
421 }
422 
423 //------------------------------------------------------------------------------
424 
426 {
427  auto oldTotalWorkUnits = m_totalWorkUnits;
428  decltype(m_totalWorkUnitsHooks)totalWorkUnitsHook;
429 
430  totalWorkUnitsHook = m_totalWorkUnitsHooks;
431 
432  if(m_doneWorkUnits > units)
433  {
434  this->doneWork( units, lock );
435  }
436 
437  {
438  ::fwCore::mt::UpgradeToWriteLock writeLock(lock);
439  m_totalWorkUnits = units;
440  }
441 
442  lock.unlock();
443  std::for_each(totalWorkUnitsHook.begin(), totalWorkUnitsHook.end(),
444  [&]( const TotalWorkUnitsHookSeq::value_type& f )
445  {
446  f(*this, oldTotalWorkUnits);
447  } );
448 }
449 
450 //------------------------------------------------------------------------------
451 
453 {
455  return m_logs;
456 }
457 
458 //------------------------------------------------------------------------------
459 
461 {
464 }
465 
466 //------------------------------------------------------------------------------
467 
468 bool IJob::isCancelable() const
469 {
471  return m_cancelable;
472 }
473 
474 //------------------------------------------------------------------------------
475 
477 {
478  this->doneWork(m_totalWorkUnits);
479 }
480 
481 //------------------------------------------------------------------------------
482 
483 } //namespace fwJobs
CancelHookSeq m_cancelHooks
Container of cancel callbacks. Cancel callbacks will be run when job is canceling.
Definition: IJob.hpp:385
std::uint64_t m_totalWorkUnits
Number of work units to reach to complete the job.
Definition: IJob.hpp:376
FWJOBS_API IJob(const std::string &name="")
Default constructor.
Definition: IJob.cpp:23
FWJOBS_API void addLogHook(LogHook callback)
Add job log callback to sequence for log hook.
Definition: IJob.cpp:168
FWJOBS_API void log(const std::string &message)
Log a message.
Definition: IJob.cpp:305
FWJOBS_API void addStateHookNoLock(StateHook callback)
Add cancel callback to sequence without mutex lock for state hook.
Definition: IJob.cpp:372
::boost::upgrade_lock< ReadWriteMutex > ReadToWriteLock
Defines an upgradable lock type for read/write mutex.
State m_state
Job&#39;s state.
Definition: IJob.hpp:403
FWJOBS_API void addTotalWorkUnitsHook(TotalWorkUnitsHook callback)
Add job work unit callback to sequence for total work unit hook.
Definition: IJob.cpp:160
bool m_cancelRequested
Determines if cancellation has been requested. The job is/will be canceling if true.
Definition: IJob.hpp:379
::boost::upgrade_to_unique_lock< ReadWriteMutex > UpgradeToWriteLock
Defines a write lock upgraded from ReadToWriteLock.
std::string m_name
Job&#39;s name.
Definition: IJob.hpp:367
FWJOBS_API void doneWork(std::uint64_t units)
Setter on done work units.
Definition: IJob.cpp:379
TotalWorkUnitsHookSeq m_totalWorkUnitsHooks
Container of total work unit callbacks. These callbacks take a IJob as parameter. ...
Definition: IJob.hpp:391
std::shared_ptr< StateSignal > m_sigStarted
Signal emitted when the job has been started.
Definition: IJob.hpp:352
FWJOBS_API std::function< void() > finishCallback()
Return callback to finish the job.
Definition: IJob.cpp:278
This class is an interface for class managing job.
Definition: IJob.hpp:28
DoneWorkHookSeq m_doneWorkHooks
Container of done work callbacks. These callbacks take nothing or a IJob as parameter.
Definition: IJob.hpp:388
virtual FWJOBS_API SharedFuture cancel()
Cancel the current job and call all available cancel callbacks.
Definition: IJob.cpp:87
std::shared_future< void > SharedFuture
Future type.
Definition: IJob.hpp:109
LogHookSeq m_logHooks
Container of log callbacks. These callbacks take a std::string as parameter.
Definition: IJob.hpp:394
FWJOBS_API State getState() const
Getter on the current State.
Definition: IJob.cpp:184
virtual FWJOBS_API SharedFuture runImpl()=0
Run an instanciated job.
FWJOBS_API void addLogHookNoLock(LogHook callback)
Add job log callback to sequence without mutex lock for log hook.
Definition: IJob.cpp:365
SharedFuture m_runFuture
Job&#39;s future returned by &#39;run&#39;.
Definition: IJob.hpp:400
std::shared_ptr< DoneWorkSignal > m_sigDoneWork
Signal emitted when done work units have been changed. Takes a std::uint64_t as parameter.
Definition: IJob.hpp:358
bool m_cancelable
Determines if the job can be cancelled. The job is cancelable if true.
Definition: IJob.hpp:382
std::shared_ptr< CancelRequestedSignal > m_sigCancelRequested
Signal emitted when cancel has been requested.
Definition: IJob.hpp:346
::boost::unique_lock< ReadWriteMutex > WriteLock
Defines a lock of write type for read/write mutex.
FWJOBS_API void setCancelable(bool cancel)
Setter on cancelable.
Definition: IJob.cpp:460
FWJOBS_API const bool & cancelRequested() const
Returns the job canceling status.
Definition: IJob.cpp:52
std::shared_ptr< StateSignal > m_sigCanceled
Signal emitted when te job has been canceled.
Definition: IJob.hpp:349
virtual FWJOBS_API ~IJob()
Default destructor.
Definition: IJob.cpp:41
FWJOBS_API void addSimpleCancelHook(CancelHook callback)
Add cancel callback to sequence for cancel hook.
Definition: IJob.cpp:142
FWJOBS_API std::uint64_t getDoneWorkUnits() const
Getter on the number of done work units.
Definition: IJob.cpp:71
State
Job&#39;s states.
Definition: IJob.hpp:42
FWJOBS_API void setTotalWorkUnits(std::uint64_t units)
Setter on total work units.
Definition: IJob.cpp:417
std::shared_ptr< StateSignal > m_sigFinished
Signal emitted when the job has been finished.
Definition: IJob.hpp:355
No worker exception.
Definition: Waiting.hpp:19
mutable::fwCore::mt::ReadWriteMutex m_mutex
Mutex to protect object access.
Definition: IJob.hpp:364
std::function< void(IJob &, const std::string &) > LogHook
Log callback type for log hook.
Definition: IJob.hpp:64
#define SLM_ASSERT(message, cond)
work like &#39;assert&#39; from &#39;cassert&#39;, with in addition a message logged by spylog (with FATAL loglevel) ...
Definition: spyLog.hpp:308
FWJOBS_API void wait()
Wait job execution ending.
Definition: IJob.cpp:285
FWJOBS_API void addTotalWorkUnitsHookNoLock(TotalWorkUnitsHook callback)
Add job work unit callback to sequence without mutex lock for total work unit hook.
Definition: IJob.cpp:355
::boost::shared_lock< ReadWriteMutex > ReadLock
Defines a lock of read type for read/write mutex.
std::function< void(State) > StateHook
State callback type for state hook.
Definition: IJob.hpp:76
std::function< void(IJob &, std::uint64_t) > TotalWorkUnitsHook
Work units callback type for total work unit hook.
Definition: IJob.hpp:70
std::function< void(IJob &) > JobCancelHook
Job cancel callback type for cancel hook.
Definition: IJob.hpp:61
FWJOBS_API void finishNoLock()
Finish the job without mutex lock: set the state to finished or canceled.
Definition: IJob.cpp:271
std::function< void(IJob &, std::uint64_t) > DoneWorkHook
Done work callback type for done work hook.
Definition: IJob.hpp:67
std::function< bool() > CancelRequestCallback
Cancel request callback type.
Definition: IJob.hpp:58
FWJOBS_API void addStateHook(StateHook callback)
Add job state callback to sequence for state hook.
Definition: IJob.cpp:176
FWJOBS_API void addCancelHook(JobCancelHook callback)
Add cancel callback to sequence for cancel hook.
Definition: IJob.cpp:134
FWJOBS_API void addCancelHookNoLock(JobCancelHook callback)
Add job cancel callback to sequence without mutex lock for cancel hook.
Definition: IJob.cpp:335
FWJOBS_API bool isCancelable() const
Getter on cancelable, returns whether the job is cancelable.
Definition: IJob.cpp:468
FWJOBS_API SharedFuture run()
Run the current job.
Definition: IJob.cpp:199
FWJOBS_API Logs getLogs() const
Getter on the log container.
Definition: IJob.cpp:452
FWJOBS_API void addDoneWorkHookNoLock(DoneWorkHook callback)
Add job done work unit callback to sequence without mutex lock for done work hook.
Definition: IJob.cpp:345
Logs m_logs
Logs container.
Definition: IJob.hpp:370
FWJOBS_API void setState(State state)
Setter on the state.
Definition: IJob.cpp:223
FWJOBS_API CancelRequestCallback cancelRequestedCallback() const
Returns a callback on job canceling status. This callback can only be used if the job is still instan...
Definition: IJob.cpp:60
FWJOBS_API State getStateNoLock() const
Getter on the state without mutex lock.
Definition: IJob.cpp:328
FWJOBS_API const std::string & getName() const
Getter on the name of the job.
Definition: IJob.cpp:192
FWJOBS_API void done()
Set done work units to total work units.
Definition: IJob.cpp:476
StateHookSeq m_stateHooks
Container of state callbacks. These callbacks take a State as parameter.
Definition: IJob.hpp:397
FWJOBS_API std::uint64_t getTotalWorkUnits() const
Getter on the total number of work units.
Definition: IJob.cpp:79
FWJOBS_API void setTotalWorkUnitsUpgradeLock(std::uint64_t units,::fwCore::mt::ReadToWriteLock &lock)
Setter on total work units.
Definition: IJob.cpp:425
std::function< void() > CancelHook
Cancel callback type for cancel hook.
Definition: IJob.hpp:73
std::vector< std::string > Logs
Log container type.
Definition: IJob.hpp:94
FWJOBS_API void logNoLock(const std::string &message)
Add a message to thelog sequence.
Definition: IJob.cpp:313
This namespace fwJobs provides jobs management.
std::shared_ptr< LogSignal > m_sigLogged
Signal emitted when a message has been added to logs. Takes a std::string as parameter.
Definition: IJob.hpp:361
std::uint64_t m_doneWorkUnits
Number of work units already reached.
Definition: IJob.hpp:373
FWJOBS_API void setStateNoLock(State state)
Setter on the state without mutex lock.
Definition: IJob.cpp:231
virtual FWJOBS_API void finish()
Finish the job: set state to finished or canceled.
Definition: IJob.cpp:263
FWJOBS_API void addDoneWorkHook(DoneWorkHook callback)
Add job done work unit callback to sequence for done work hook.
Definition: IJob.cpp:152