fw4spl
Aggregator.cpp
1 /* ***** BEGIN LICENSE BLOCK *****
2  * FW4SPL - Copyright (C) IRCAD, 2009-2017.
3  * Distributed under the terms of the GNU Lesser General Public License (LGPL) as
4  * published by the Free Software Foundation.
5  * ****** END LICENSE BLOCK ****** */
6 
7 #include "fwJobs/Aggregator.hpp"
8 
9 #include "fwJobs/exception/Waiting.hpp"
10 #include "fwJobs/Job.hpp"
11 
12 #include <fwCore/spyLog.hpp>
13 
14 #include <fwThread/Worker.hpp>
15 #include <fwThread/Worker.hxx>
16 
17 #include <memory>
18 
19 namespace fwJobs
20 {
21 
22 //------------------------------------------------------------------------------
23 
24 Aggregator::sptr Aggregator::New(const std::string& name)
25 {
26  return std::make_shared<Aggregator>( name );
27 }
28 
29 //------------------------------------------------------------------------------
30 
32 {
33 }
34 
35 //------------------------------------------------------------------------------
36 
37 Aggregator::Aggregator(const std::string& name) :
38  IJob(name)
39 {
40 }
41 
42 //------------------------------------------------------------------------------
43 
45 {
46  decltype(m_jobs)jobs;
47 
48  {
50  jobs = m_jobs;
51  }
52 
53  std::vector< SharedFuture > futures;
54  for(const ::fwJobs::IJob::sptr& iJob : jobs)
55  {
56  futures.push_back(iJob->run());
57  }
58 
59  auto future = std::async(
60  [ = ]() mutable
61  {
62  std::for_each(futures.begin(), futures.end(), std::mem_fn(&::std::shared_future<void>::wait));
63 
64  this->finish();
65 
66  // forwards exceptions that might have been thrown
67  for( SharedFuture f : futures )
68  {
69  f.get();
70  }
71  }
72  );
73 
74  return std::move(future);
75 }
76 
77 //------------------------------------------------------------------------------
78 
79 void Aggregator::add(const ::fwJobs::IJob::sptr& iJob, double weight)
80 {
81  SLM_ASSERT("iJob shall not be null", iJob);
82 
83  SLM_ASSERT("iJob shall not be added to itself", this != iJob.get());
84 
85  if(!iJob)
86  {
87  return;
88  }
89 
91 
92  SLM_ASSERT("Jobs can't be added when Aggregator is running", m_state == WAITING || m_state == RUNNING);
93 
94  const auto normValue = std::uint64_t(weight*100);
95 
96  if( m_state == WAITING || m_state == RUNNING )
97  {
98  m_jobInfo[iJob.get()] = JobInfo( *iJob );
99  auto& jobInfo = m_jobInfo[iJob.get()];
100 
101  this->setTotalWorkUnitsUpgradeLock( m_totalWorkUnits + (jobInfo.totalWork ? normValue : 0), lock);
102  lock.lock();
103  // doneWork call after setTotalWorkUnitsUpgradeLock, because
104  // doneWork value can be thresholded by setTotalWorkUnitsUpgradeLock
105  // call
106  jobInfo.lastValue = std::uint64_t(jobInfo.progress() * normValue);
107  {
108  ::fwCore::mt::UpgradeToWriteLock writeLock(lock);
109  m_jobs.push_back(iJob);
110  }
111  // take care : doneWork unlocks 'lock'
112  this->doneWork(jobInfo.lastValue, lock);
113 
114  // TODO : add a way to disconnect on aggregator destruction
115  iJob->addDoneWorkHook(
116  [ =, &jobInfo](IJob& subJob, std::uint64_t oldDoneWork)
117  {
119 
120  auto oldInfo = jobInfo;
121  jobInfo = Aggregator::JobInfo( subJob );
122 
123  jobInfo.lastValue = std::uint64_t(normValue * jobInfo.progress());
124 
125  auto doneWork = m_doneWorkUnits + jobInfo.lastValue;
126  // minimize numerical uncertainty by substracting in a second time :
127  doneWork -= oldInfo.lastValue;
128 
129  this->doneWork( static_cast<std::uint64_t>(doneWork), sublock );
130  }
131  );
132 
133  iJob->addTotalWorkUnitsHook(
134  [ = ](IJob& subJob, std::uint64_t oldTotalWorkUnits)
135  {
137 
138  auto workUnits = m_totalWorkUnits;
139  auto newTotalWorkUnits = subJob.getTotalWorkUnits();
140 
141  if( oldTotalWorkUnits != newTotalWorkUnits)
142  {
143  if( oldTotalWorkUnits && 0 == newTotalWorkUnits)
144  {
145  workUnits -= normValue;
146  }
147  else if( 0 == oldTotalWorkUnits && newTotalWorkUnits)
148  {
149  workUnits += normValue;
150  }
151  }
152 
153  this->setTotalWorkUnitsUpgradeLock( workUnits, sublock );
154  }
155  );
156 
157  this->addCancelHookNoLock( [iJob]( IJob& /* cancelingJob */ )
158  {
159  iJob->cancel();
160  } );
161 
162  auto iJobName = iJob->getName();
163  iJobName = iJobName.empty() ? "" : "[" + iJobName + "] ";
164  iJob->addLogHook( [ = ]( IJob& /* job */, const std::string& message)
165  {
166  this->log( iJobName + message);
167  } );
168 
169  auto iJobLogs = iJob->getLogs();
170  std::for_each(iJobLogs.begin(), iJobLogs.end(),
171  [&]( const Logs::value_type& message )
172  {
173  this->logNoLock(iJobName + message);
174  }
175  );
176  }
177 
178 }
179 
180 //------------------------------------------------------------------------------
181 
183 {
185  return m_jobs;
186 }
187 
188 } //namespace fwJobs
FWJOBS_API Aggregator()
Default constructor. The name is initialized with an empty string.
Definition: Aggregator.cpp:31
std::uint64_t m_totalWorkUnits
Number of work units to reach to complete the job.
Definition: IJob.hpp:376
FWJOBS_API void log(const std::string &message)
Log a message.
Definition: IJob.cpp:305
::boost::upgrade_lock< ReadWriteMutex > ReadToWriteLock
Defines an upgradable lock type for read/write mutex.
FWJOBS_API SharedFuture runImpl()
Run all the jobs of the Aggregator.
Definition: Aggregator.cpp:44
State m_state
Job&#39;s state.
Definition: IJob.hpp:403
::boost::upgrade_to_unique_lock< ReadWriteMutex > UpgradeToWriteLock
Defines a write lock upgraded from ReadToWriteLock.
FWJOBS_API void doneWork(std::uint64_t units)
Setter on done work units.
Definition: IJob.cpp:379
FWJOBS_API IJobSeq getSubJobs()
Retrieve sub job sequence of the Aggregator.
Definition: Aggregator.cpp:182
This class is an interface for class managing job.
Definition: IJob.hpp:28
std::shared_future< void > SharedFuture
Future type.
Definition: IJob.hpp:109
::boost::multi_index_container< ::fwJobs::IJob::sptr,::boost::multi_index::indexed_by< ::boost::multi_index::random_access<>,::boost::multi_index::hashed_unique< ::boost::multi_index::identity< ::fwJobs::IJob::sptr > > > > IJobSeq
Aggregator container type.
Definition: Aggregator.hpp:50
std::shared_ptr< ::fwJobs::Aggregator > sptr
Aggregator container type.
Definition: Aggregator.hpp:39
mutable::fwCore::mt::ReadWriteMutex m_mutex
Mutex to protect object access.
Definition: IJob.hpp:364
#define SLM_ASSERT(message, cond)
work like &#39;assert&#39; from &#39;cassert&#39;, with in addition a message logged by spylog (with FATAL loglevel) ...
Definition: spyLog.hpp:308
::boost::shared_lock< ReadWriteMutex > ReadLock
Defines a lock of read type for read/write mutex.
FWJOBS_API void addCancelHookNoLock(JobCancelHook callback)
Add job cancel callback to sequence without mutex lock for cancel hook.
Definition: IJob.cpp:335
FWJOBS_API void add(const ::fwJobs::IJob::sptr &iJob, double weight=1.)
Add an IJob to the Aggregator.
Definition: Aggregator.cpp:79
FWJOBS_API std::uint64_t getTotalWorkUnits() const
Getter on the total number of work units.
Definition: IJob.cpp:79
This file defines SpyLog macros. These macros are used to log messages to a file or to the console du...
FWJOBS_API void setTotalWorkUnitsUpgradeLock(std::uint64_t units,::fwCore::mt::ReadToWriteLock &lock)
Setter on total work units.
Definition: IJob.cpp:425
static FWJOBS_API sptr New(const std::string &name="")
Create a new Aggregator smart pointer.
Definition: Aggregator.cpp:24
FWJOBS_API void logNoLock(const std::string &message)
Add a message to thelog sequence.
Definition: IJob.cpp:313
This namespace fwJobs provides jobs management.
std::uint64_t m_doneWorkUnits
Number of work units already reached.
Definition: IJob.hpp:373
virtual FWJOBS_API void finish()
Finish the job: set state to finished or canceled.
Definition: IJob.cpp:263