libtdepim

weaver.cpp
1 /*
2  This file implements the Weaver, Job and Thread classes.
3 
4  $ Author: Mirko Boehm $
5  $ Copyright: (C) 2004, Mirko Boehm $
6  $ Contact: mirko@kde.org
7  http://www.kde.org
8  http://www.hackerbuero.org $
9  $ License: LGPL with the following explicit clarification:
10  This code may be linked against any version of the TQt toolkit
11  from Troll Tech, Norway. $
12 
13 */
14 
15 extern "C" {
16 #include <signal.h>
17 }
18 
19 #include <tqevent.h>
20 #include <tqapplication.h>
21 
22 #include "weaver.h"
23 
24 namespace KPIM {
25 namespace ThreadWeaver {
26 
27  bool Debug = true;
28  int DebugLevel = 2;
29 
30  Job::Job (TQObject* parent, const char* name)
31  : TQObject (parent, name),
32  m_finished (false),
33  m_mutex (new TQMutex (true) ),
34  m_thread (0)
35  {
36  }
37 
39  {
40  }
41 
42  void Job::lock()
43  {
44  m_mutex->lock();
45  }
46 
47  void Job::unlock()
48  {
49  m_mutex->unlock();
50  }
51 
52  void Job::execute(Thread *th)
53  {
54  m_mutex->lock();
55  m_thread = th;
56  m_mutex->unlock();
57 
58  run ();
59 
60  m_mutex->lock();
61  setFinished (true);
62  m_thread = 0;
63  m_mutex->unlock();
64  }
65 
67  {
68  TQMutexLocker l (m_mutex);
69  return m_thread;
70  }
71 
72  bool Job::isFinished() const
73  {
74  TQMutexLocker l (m_mutex);
75  return m_finished;
76  }
77 
78  void Job::setFinished(bool status)
79  {
80  TQMutexLocker l (m_mutex);
81  m_finished = status;
82  }
83 
85  {
86  switch ( e->action() )
87  {
88  case Event::JobStarted:
89  emit ( started() );
90  break;
91  case Event::JobFinished:
92  emit ( done() );
93  break;
94  case Event::JobSPR:
95  emit ( SPR () );
96  m_wc->wakeOne ();
97  break;
98  case Event::JobAPR:
99  emit ( APR () );
100  // no wake here !
101  break;
102  default:
103  break;
104  }
105  }
106 
108  {
109  m_mutex->lock ();
110  m_wc = new TQWaitCondition;
111  m_mutex->unlock ();
112 
113  thread()->post (KPIM::ThreadWeaver::Event::JobSPR, this);
114  m_wc->wait ();
115 
116  m_mutex->lock ();
117  delete m_wc;
118  m_wc = 0;
119  m_mutex->unlock ();
120  }
121 
123  {
124  m_mutex->lock ();
125  m_wc = new TQWaitCondition;
126  m_mutex->unlock ();
127 
129  m_wc->wait ();
130  }
131 
132  void Job::wakeAPR ()
133  {
134  TQMutexLocker l(m_mutex);
135  if ( m_wc!=0 )
136  {
137  m_wc->wakeOne ();
138  delete m_wc;
139  m_wc = 0;
140  }
141  }
142 
143  const int Event::Type = TQEvent::User + 1000;
144 
145  Event::Event ( Action action, Thread *thread, Job *job)
146  : TQCustomEvent ( type () ),
147  m_action (action),
148  m_thread (thread),
149  m_job (job)
150  {
151  }
152 
153  int Event::type ()
154  {
155  return Type;
156  }
157 
159  {
160  if ( m_thread != 0)
161  {
162  return m_thread;
163  } else {
164  return 0;
165  }
166  }
167 
168  Job* Event::job () const
169  {
170  return m_job;
171  }
172 
174  {
175  return m_action;
176  }
177 
178  unsigned int Thread::sm_Id;
179 
181  : TQThread (),
182  m_parent ( parent ),
183  m_id ( makeId() )
184  {
185  }
186 
188  {
189  }
190 
191  unsigned int Thread::makeId()
192  {
193  static TQMutex mutex;
194  TQMutexLocker l (&mutex);
195 
196  return ++sm_Id;
197  }
198 
199  unsigned int Thread::id() const
200  {
201  return m_id;
202  }
203 
204  void Thread::run()
205  {
206  Job *job = 0;
207 
209 
210  while (true)
211  {
212  debug ( 3, "Thread::run [%u]: trying to execute the next job.\n", id() );
213 
214  job = m_parent->applyForWork ( this, job );
215 
216  if (job == 0)
217  {
218  break;
219  } else {
220  post ( Event::JobStarted, job );
221  job->execute (this);
222  post ( Event::JobFinished, job );
223  }
224  }
225 
226  post ( Event::ThreadExiting );
227  }
228 
230  {
231  m_parent->post ( a, this, j);
232  }
233 
234  void Thread::msleep(unsigned long msec)
235  {
236  TQThread::msleep(msec);
237  }
238 
239  Weaver::Weaver(TQObject* parent, const char* name,
240  int inventoryMin, int inventoryMax)
241  : TQObject(parent, name),
242  m_active(0),
243  m_inventoryMin(inventoryMin),
244  m_inventoryMax(inventoryMax),
245  m_shuttingDown(false),
246  m_running (false),
247  m_suspend (false),
248  m_mutex ( new TQMutex(true) )
249  {
250  lock();
251 
252  for ( int count = 0; count < m_inventoryMin; ++count)
253  {
254  Thread *th = new Thread(this);
255  m_inventory.append(th);
256  // this will idle the thread, waiting for a job
257  th->start();
258 
259  emit (threadCreated (th) );
260  }
261 
262  unlock();
263  }
264 
265  Weaver::~Weaver()
266  {
267  lock();
268 
269  debug ( 1, "Weaver dtor: destroying inventory.\n" );
270 
271  m_shuttingDown = true;
272 
273  unlock();
274 
275  m_jobAvailable.wakeAll();
276 
277  // problem: Some threads might not be asleep yet, just finding
278  // out if a job is available. Those threads will suspend
279  // waiting for their next job (a rare case, but not impossible).
280  // Therefore, if we encounter a thread that has not exited, we
281  // have to wake it again (which we do in the following for
282  // loop).
283 
284  for ( Thread *th = m_inventory.first(); th; th = m_inventory.next() )
285  {
286  if ( !th->finished() )
287  {
288  m_jobAvailable.wakeAll();
289  th->wait();
290  }
291 
292  emit (threadDestroyed (th) );
293  delete th;
294 
295  }
296 
297  m_inventory.clear();
298 
299  delete m_mutex;
300 
301  debug ( 1, "Weaver dtor: done\n" );
302 
303  }
304 
306  {
307  debug ( 3 , "Weaver::lock: lock (mutex is %s).\n",
308  ( m_mutex->locked() ? "locked" : "not locked" ) );
309  m_mutex->lock();
310  }
311 
313  {
314  m_mutex->unlock();
315 
316  debug ( 3 , "Weaver::unlock: unlock (mutex is %s).\n",
317  ( m_mutex->locked() ? "locked" : "not locked" ) );
318  }
319 
320  int Weaver::threads () const
321  {
322  TQMutexLocker l (m_mutex);
323  return m_inventory.count ();
324  }
325 
326  void Weaver::enqueue(Job* job)
327  {
328  lock();
329 
330  m_assignments.append(job);
331  m_running = true;
332 
333  unlock();
334 
335  assignJobs();
336  }
337 
338  void Weaver::enqueue (TQPtrList <Job> jobs)
339  {
340  lock();
341 
342  for ( Job * job = jobs.first(); job; job = jobs.next() )
343  {
344  m_assignments.append (job);
345  }
346 
347  unlock();
348 
349  assignJobs();
350  }
351 
352  bool Weaver::dequeue ( Job* job )
353  {
354  TQMutexLocker l (m_mutex);
355  return m_assignments.remove (job);
356  }
357 
359  {
360  TQMutexLocker l (m_mutex);
361  m_assignments.clear();
362  }
363 
364  void Weaver::suspend (bool state)
365  {
366  lock();
367 
368  if (state)
369  {
370  // no need to wake any threads here
371  m_suspend = true;
372  if ( m_active == 0 && isEmpty() )
373  { // instead of waking up threads:
375  }
376  } else {
377  m_suspend = false;
378  // make sure we emit suspended () even if all threads are sleeping:
379  assignJobs ();
380  debug (2, "Weaver::suspend: queueing resumed.\n" );
381  }
382 
383  unlock();
384  }
385 
387  {
388  m_jobAvailable.wakeAll();
389  }
390 
391  bool Weaver::event (TQEvent *e )
392  {
393  if ( e->type() >= TQEvent::User )
394  {
395 
396  if ( e->type() == Event::type() )
397  {
398  Event *event = (Event*) e;
399 
400  switch (event->action() )
401  {
402  case Event::JobFinished:
403  if ( event->job() !=0 )
404  {
405  emit (jobDone (event->job() ) );
406  }
407  break;
408  case Event::Finished:
409  emit ( finished() );
410  break;
411  case Event::Suspended:
412  emit ( suspended() );
413  break;
414  case Event::ThreadSuspended:
415  if (!m_shuttingDown )
416  {
417  emit (threadSuspended ( event->thread() ) );
418  }
419  break;
420  case Event::ThreadBusy:
421  if (!m_shuttingDown )
422  {
423  emit (threadBusy (event->thread() ) );
424  }
425  break;
426  default:
427  break;
428  }
429 
430  if ( event->job() !=0 )
431  {
432  event->job()->processEvent (event);
433  }
434  } else {
435  debug ( 0, "Weaver::event: Strange: received unknown user event.\n" );
436  }
437  return true;
438  } else {
439  // others - please make sure we are a TQObject!
440  return TQObject::event ( e );
441  }
442  }
443 
445  {
446  Event *e = new Event ( a, t, j);
447  TQApplication::postEvent (this, e);
448  }
449 
450  bool Weaver::isEmpty() const
451  {
452  TQMutexLocker l (m_mutex);
453  return m_assignments.count()==0;
454  }
455 
457  {
458  Job *rc = 0;
459  bool lastjob = false;
460  bool suspended = false;
461 
462  while (true)
463  {
464  lock();
465 
466  if (previous != 0)
467  { // cleanup and send events:
468  --m_active;
469 
470  debug ( 3, "Weaver::applyForWork: job done, %i jobs left, "
471  "%i active jobs left.\n",
472  queueLength(), m_active );
473 
474  if ( m_active == 0 && isEmpty() )
475  {
476  lastjob = true;
477  m_running = false;
478  post (Event::Finished);
479  debug ( 3, "Weaver::applyForWork: last job.\n" );
480  }
481 
482  if (m_active == 0 && m_suspend == true)
483  {
484  suspended = true;
486  debug ( 2, "Weaver::applyForWork: queueing suspended.\n" );
487  }
488 
489  m_jobFinished.wakeOne();
490  }
491 
492  previous = 0;
493 
494  if (m_shuttingDown == true)
495  {
496  unlock();
497 
498  return 0;
499  } else {
500  if ( !isEmpty() && m_suspend == false )
501  {
502  rc = m_assignments.getFirst();
503  m_assignments.removeFirst ();
504  ++m_active;
505 
506  debug ( 3, "Weaver::applyForWork: job assigned, "
507  "%i jobs in queue (%i active).\n",
508  m_assignments.count(), m_active );
509  unlock();
510 
511  post (Event::ThreadBusy, th);
512 
513  return rc;
514  } else {
515  unlock();
516 
517  post (Event::ThreadSuspended, th);
518  m_jobAvailable.wait();
519  }
520  }
521  }
522  }
523 
525  {
526  TQMutexLocker l (m_mutex);
527  return m_assignments.count();
528  }
529 
530  bool Weaver::isIdle () const
531  {
532  TQMutexLocker l (m_mutex);
533  return isEmpty() && m_active == 0;
534  }
535 
537  {
538  while ( !isIdle() )
539  {
540  debug (2, "Weaver::finish: not done, waiting.\n" );
541  m_jobFinished.wait();
542  }
543  debug (1, "Weaver::finish: done.\n\n\n" );
544  }
545 
546 }
547 }
548 
549 #include "weaver.moc"
void post(Event::Action, Thread *=0, Job *=0)
Post an event that is handled by this object, but in the main (GUI) thread.
Definition: weaver.cpp:444
int m_active
The number of jobs that are assigned to the worker threads, but not finished.
Definition: weaver.h:421
int queueLength()
Returns the number of pending jobs.
Definition: weaver.cpp:524
bool m_running
m_running is set to true when a job is enqueued and set to false when the job finishes that was the l...
Definition: weaver.h:437
TQWaitCondition m_jobFinished
Wait for a job to finish.
Definition: weaver.h:429
The class Thread is used to represent the worker threads in the weaver's inventory.
Definition: weaver.h:249
virtual void dequeue()
Remove all queued jobs.
Definition: weaver.cpp:358
Thread(Weaver *parent)
Create a thread.
Definition: weaver.cpp:180
A class to represent the events threads generate and send to the Weaver object.
Definition: weaver.h:99
void unlock()
Unlock.
Definition: weaver.cpp:312
virtual ~Job()
Destructor.
Definition: weaver.cpp:38
virtual void processEvent(Event *)
Process events related to this job (created by the processing thread or the weaver or whoever).
Definition: weaver.cpp:84
Job(TQObject *parent=0, const char *name=0)
Construct a Job object.
Definition: weaver.cpp:30
void jobDone(Job *)
This signal is emitted when a job is done.
void triggerSPR()
Trigger a SPR.
Definition: weaver.cpp:107
void suspended()
Thread queueing has been suspended.
virtual void execute(Thread *)
Perform the job.
Definition: weaver.cpp:52
@ JobAPR
Synchronous Process Request.
Definition: weaver.h:113
bool isEmpty() const
Is the queue empty?
Definition: weaver.cpp:450
virtual void finish()
Get notified when a thread has finished a job.
Definition: weaver.cpp:536
void assignJobs()
Schedule enqueued jobs to be executed by idle threads.
Definition: weaver.cpp:386
void done()
This signal is emitted when a job has been finished.
Job * job() const
The associated job.
Definition: weaver.cpp:168
void unlock()
Unlock this Job's mutex.
Definition: weaver.cpp:47
virtual void suspend(bool state)
Suspend job execution if state = true, otherwise resume job execution if it was suspended.
Definition: weaver.cpp:364
void run()
Overloaded to execute the assigned job.
Definition: weaver.cpp:204
@ Suspended
All jobs in the queue are done.
Definition: weaver.h:105
void finished()
This signal is emitted when the Weaver has finished ALL currently queued jobs.
void lock()
Lock this Job's mutex.
Definition: weaver.cpp:42
virtual Job * applyForWork(Thread *thread, Job *previous)
Assign a job to the calling thread.
Definition: weaver.cpp:456
TQWaitCondition m_jobAvailable
Wait condition all idle or done threads wait for.
Definition: weaver.h:427
void triggerAPR()
Trigger an APR.
Definition: weaver.cpp:122
@ ThreadStarted
Thread queueing halted.
Definition: weaver.h:106
virtual bool isFinished() const
Returns true if the jobs's execute method finished.
Definition: weaver.cpp:72
unsigned int id() const
Returns the thread id.
Definition: weaver.cpp:199
bool m_shuttingDown
Indicates if the weaver is shutting down and exiting it's threads.
Definition: weaver.h:432
TQPtrList< Thread > m_inventory
The thread inventory.
Definition: weaver.h:416
void APR()
Perform an Asynchronous Process Request.
Thread * thread()
Return the thread that executes this job.
Definition: weaver.cpp:66
static int type()
Return the (custom defined) event type.
Definition: weaver.cpp:153
int threads() const
Returns the current number of threads in the inventory.
Definition: weaver.cpp:320
void started()
This signal is emitted when a thread starts to process a job.
bool isIdle() const
Is the weaver idle? The weaver is idle if no jobs are queued and no jobs are processed by the threads...
Definition: weaver.cpp:530
virtual void run()=0
The method that actually performs the job.
void SPR()
This signal is emitted when the job needs some operation done by the main thread (usually the creator...
TDEPIM classes for drag and drop of mails.
TQPtrList< Job > m_assignments
The job queue.
Definition: weaver.h:418
~Thread()
The destructor.
Definition: weaver.cpp:187
void wakeAPR()
Wake the thread after an APR has been processed.
Definition: weaver.cpp:132
Action action() const
The action.
Definition: weaver.cpp:173
A weaver is the manager of worker threads (Thread objects) to which it assigns jobs from it's queue.
Definition: weaver.h:296
virtual void enqueue(Job *)
Add a job to be executed.
Definition: weaver.cpp:326
A Job is a simple abstraction of an action that is to be executed in a thread context.
Definition: weaver.h:163
bool event(TQEvent *)
Check incoming events for user defined ones.
Definition: weaver.cpp:391
void lock()
Lock the mutex for this weaver.
Definition: weaver.cpp:305
bool m_suspend
If m_suspend is true, no new jobs will be assigned to threads.
Definition: weaver.h:442
void post(Event::Action, Job *=0)
Post an event, will be received and processed by the Weaver.
Definition: weaver.cpp:229
virtual void setFinished(bool status)
Call with status = true to mark this job as done.
Definition: weaver.cpp:78
Thread * thread() const
The ID of the sender thread.
Definition: weaver.cpp:158