root/trunk/dep/src/zthread/PoolExecutor.cxx @ 7

Revision 2, 15.2 kB (checked in by yumileroy, 17 years ago)

[svn] * Proper SVN structure

Original author: Neo2003
Date: 2008-10-02 16:23:55-05:00

Line 
1/*
2 * Copyright (c) 2005, Eric Crahen
3 *
4 * Permission is hereby granted, free of charge, to any person obtaining a copy
5 * of this software and associated documentation files (the "Software"), to deal
6 * in the Software without restriction, including without limitation the rights
7 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8 * copies of the Software, and to permit persons to whom the Software is furnished
9 * to do so, subject to the following conditions:
10 *
11 * The above copyright notice and this permission notice shall be included in all
12 * copies or substantial portions of the Software.
13 *
14 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
18 * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
19 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
20 *
21 */
22
23#include "ThreadImpl.h"
24#include "zthread/PoolExecutor.h"
25#include "zthread/MonitoredQueue.h"
26#include "zthread/FastMutex.h"
27#include "ThreadImpl.h"
28#include "ThreadQueue.h"
29
30#include <algorithm>
31#include <deque>
32#include <utility>
33
34using namespace ZThread;
35
36namespace ZThread {
37
38  namespace {
39
40    /**
41     */
42    class WaiterQueue {
43
44      typedef std::deque<ThreadImpl*>  ThreadList;
45     
46      typedef struct group_t {
47        size_t     id;
48        size_t     count;
49        ThreadList waiters;
50        group_t(size_t n) : id(n), count(0) {}
51      } Group;
52
53      typedef std::deque<Group>  GroupList;
54
55      //! Predicate to find a specific group
56      struct by_id : public std::unary_function<bool, Group> {
57        size_t id;
58        by_id(size_t n) : id(n) {}
59        bool operator()(const Group& grp) {
60          return grp.id == id;
61        }
62      };
63
64      //! Functor to count groups
65      struct counter : public std::unary_function<void, Group> {
66        size_t count;
67        counter() : count(0) {}
68        void operator()(const Group& grp) { count += grp.count; }
69        operator size_t() { return count; }
70      };
71     
72      FastMutex     _lock;
73      GroupList _list;
74      size_t    _id;
75      size_t    _generation;
76
77    public:
78     
79      WaiterQueue() : _id(0), _generation(0) {
80        // At least one empty-group exists
81        _list.push_back( Group(_id++) );
82      }
83
84      /**
85       * Insert the current thread into the current waiter list
86       *
87       * @pre  At least one empty group exists
88       * @post At least one empty group exists
89       */
90      bool wait(unsigned long timeout) {
91
92                ThreadImpl* current = ThreadImpl::current();
93        Monitor& m = current->getMonitor();
94
95        Monitor::STATE state;
96
97        Guard<FastMutex> g1(_lock);
98
99        // At least one empty-group exists
100        assert(!_list.empty());
101
102        // Return w/o waiting if there are no executing tasks
103        if((size_t)std::for_each(_list.begin(), _list.end(), counter()) < 1)
104          return true;
105
106        // Update the waiter list for the active group
107        _list.back().waiters.push_back(current);
108        size_t n = _list.back().id;
109
110        m.acquire();
111       
112        {
113
114          Guard<FastMutex, UnlockedScope> g2(g1);         
115          state = timeout == 0 ? m.wait() : m.wait(timeout);
116
117        }
118
119        m.release();
120
121        // If awoke due to a reason other than the last task in the group 'n' completing,
122        // then then find the group 'current' is waiting in
123        GroupList::iterator i = std::find_if(_list.begin(), _list.end(), by_id(n));
124        if(i != _list.end()) {
125
126          // Remove 'current' from that list if it is still a member
127          ThreadList::iterator j = std::find(i->waiters.begin(), i->waiters.end(), current);
128          if(j != i->waiters.end())
129            i->waiters.erase(j);
130
131        }
132
133        // At least one empty-group exists
134        assert(!_list.empty());
135
136        switch(state) {
137          case Monitor::SIGNALED:
138            break;
139          case Monitor::TIMEDOUT:
140            return false;
141          case Monitor::INTERRUPTED:
142            throw Interrupted_Exception();
143          default:
144            throw Synchronization_Exception();
145        }
146       
147        return true;
148
149      }
150     
151      /**
152       * Increment the active group count
153       *
154       * @pre at least 1 empty group exists
155       * @post at least 1 non-empty group exists
156       */
157      std::pair<size_t, size_t> increment() {
158       
159        Guard<FastMutex> g(_lock);
160       
161        // At least one empty-group exists
162        assert(!_list.empty());
163
164        GroupList::iterator i = --_list.end();
165        size_t n = i->id;
166
167        if(i == _list.end()) {
168
169          // A group should never have been removed until
170          // the final task in that group completed
171          assert(0);
172
173        }
174
175        i->count++;
176
177        // When the active group is being incremented, insert a new active group
178        // to replace it if there were waiting threads
179        if(i == --_list.end() && !i->waiters.empty())
180          _list.push_back(Group(_id++));
181
182        // At least 1 non-empty group exists
183        assert((size_t)std::for_each(_list.begin(), _list.end(), counter()) > 0);
184
185        return std::make_pair(n, _generation);
186
187      }
188     
189
190      /**
191       * Decrease the count for the group with the given id.
192       *
193       * @param n group id
194       *
195       * @pre  At least 1 non-empty group exists
196       * @post At least 1 empty group exists
197       */
198      void decrement(size_t n) {
199       
200        Guard<FastMutex> g1(_lock);
201       
202        // At least 1 non-empty group exists
203        assert((size_t)std::for_each(_list.begin(), _list.end(), counter()) > 0);
204
205        // Find the requested group
206        GroupList::iterator i = std::find_if(_list.begin(), _list.end(), by_id(n));
207        if(i == _list.end()) {
208         
209          // A group should never have been removed until
210          // the final task in that group completed
211          assert(0);
212
213        }
214
215        // Decrease the count for tasks in this group,
216        if(--i->count == 0 && i == _list.begin()) {
217       
218          do {
219
220            // When the first group completes, wake all waiters for every
221            // group, starting from the first until a group that is not
222            // complete is reached
223
224            /*
225            // Don't remove the empty active group
226            if(i == --_list.end() && i->waiters.empty())
227              break;
228            */
229           
230            if( awaken(*i) ) {
231           
232              // If all waiters were awakened, remove the group 
233              i = _list.erase(i);
234             
235            } else {
236             
237              {
238
239                // Otherwise, unlock and yield allowing the waiter
240                // lists to be updated if other threads are busy
241                Guard<FastLock, UnlockedScope> g2(g1);
242                ThreadImpl::yield();
243             
244              }
245
246              i = _list.begin();
247
248            }
249             
250          } while(i != _list.end() && i->count == 0);
251         
252          // Ensure that an active group exists
253          if(_list.empty())
254            _list.push_back( Group(++_id) );
255
256        }
257
258        // At least one group exists
259        assert(!_list.empty());
260       
261      }
262
263      /**
264       */
265      size_t generation(bool next = false) {
266
267        Guard<FastMutex> g(_lock);
268        return next ? _generation++ : _generation;
269
270      }
271     
272    private:
273     
274      /**
275       * Awaken all the waiters remaining in the given group
276       *
277       * @return
278       *   - true if all the waiting threads were successfully awakened.
279       *   - false if there were one or more threads that could not be awakened.
280       */
281      bool awaken(Group& grp) {
282
283        // Go through the waiter list in the given group;
284        for(ThreadList::iterator i = grp.waiters.begin(); i != grp.waiters.end();) {
285         
286          ThreadImpl* impl = *i;
287          Monitor& m = impl->getMonitor();
288         
289          // Try the monitor lock, if it cant be locked skip to the next waiter
290          if(m.tryAcquire()) {
291           
292            // Notify the monitor & remove from the waiter list so time isn't
293            // wasted checking it again.
294            i = grp.waiters.erase(i);
295           
296            // Try to wake the waiter, it doesn't matter if this is successful
297            // or not (only fails when the monitor is already going to stop waiting).
298            m.notify();
299            m.release();
300           
301          } else ++i;
302         
303        }
304       
305        return grp.waiters.empty();
306
307      }
308
309    };
310
311    /**
312     * @class GroupedRunnable
313     *
314     * Wrap a task with group and generation information.
315     *
316     * - 'group' allows tasks to be grouped together so that lists of waiting
317     *   threads can be managed.
318     *
319     * - 'generation' allows tasks to be interrupted 
320     */
321    class GroupedRunnable : public Runnable {
322
323      Task _task;
324      WaiterQueue& _queue;
325
326      size_t _group;
327      size_t _generation;
328
329    public:
330
331      GroupedRunnable(const Task& task, WaiterQueue& queue)
332        : _task(task), _queue(queue) {
333       
334        std::pair<size_t, size_t> pr( _queue.increment() );
335   
336        _group      = pr.first;
337        _generation = pr.second;
338
339      }
340
341      size_t group() const {
342        return _group;
343      }
344
345      size_t generation() const {
346        return _generation;
347      }
348
349      void run() {
350
351        try {
352
353          _task->run();
354
355        } catch(...) {
356
357        }
358
359        _queue.decrement( group() );
360
361      }
362
363    };
364
365    typedef CountedPtr<GroupedRunnable, size_t> ExecutorTask;
366
367    /**
368     *
369     */
370    class ExecutorImpl {
371     
372      typedef MonitoredQueue<ExecutorTask, FastMutex> TaskQueue;
373      typedef std::deque<ThreadImpl*> ThreadList;
374     
375      TaskQueue   _taskQueue;
376      WaiterQueue _waitingQueue;
377
378      ThreadList      _threads;
379      volatile size_t _size;
380
381
382    public:
383     
384      ExecutorImpl() : _size(0) {}
385
386
387      void registerThread() {
388       
389        Guard<TaskQueue> g(_taskQueue);
390
391        ThreadImpl* impl = ThreadImpl::current();
392        _threads.push_back(impl);
393
394        // current cancel if too many threads are being created
395        if(_threads.size() > _size)
396          impl->cancel();
397
398      }
399
400      void unregisterThread() {
401
402        Guard<TaskQueue> g(_taskQueue);
403        std::remove(_threads.begin(), _threads.end(), ThreadImpl::current());
404
405      }
406
407      void execute(const Task& task) {
408
409        // Wrap the task with a grouped task
410        GroupedRunnable* runnable = new GroupedRunnable(task, _waitingQueue);
411 
412        try {
413         
414          _taskQueue.add( ExecutorTask(runnable) );
415
416        } catch(...) {
417
418          // Incase the queue is canceled between the time the WaiterQueue is
419          // updated and the task is added to the TaskQueue
420          _waitingQueue.decrement( runnable->group() );
421          throw;
422
423        }
424
425      }
426
427      void interrupt() {
428
429        // Bump the generation number
430        _waitingQueue.generation(true);
431
432        Guard<TaskQueue> g(_taskQueue);
433       
434        // Interrupt all threads currently running, thier tasks would be
435        // from an older generation
436        for(ThreadList::iterator i = _threads.begin(); i != _threads.end(); ++i)
437          (*i)->interrupt();
438       
439      }
440
441      //! Adjust the number of desired workers and return the number of Threads needed
442      size_t workers(size_t n) {
443       
444        Guard<TaskQueue> g(_taskQueue);
445
446        size_t m = (_size < n) ? (n - _size) : 0;
447        _size = n;
448       
449        return m;
450
451      }
452     
453      size_t workers() {
454       
455        Guard<TaskQueue> g(_taskQueue);
456        return _size;
457       
458      }
459     
460      ExecutorTask next() {
461       
462        ExecutorTask task;
463       
464        // Draw the task from the queue
465        for(;;) {
466
467          try {
468
469            task = _taskQueue.next();
470            break;
471
472          } catch(Interrupted_Exception&) {
473
474            // Ignore interruption here, it can only come from
475            // another thread interrupt()ing the executor. The
476            // thread was interrupted in the hopes it was busy
477            // with a task
478
479          }
480
481        }
482       
483        // Interrupt the thread running the tasks when the generation
484        // does not match the current generation
485        if( task->generation() != _waitingQueue.generation() )
486          ThreadImpl::current()->interrupt();
487
488        // Otherwise, clear the interrupted status for the thread and
489        // give it a clean slate to start with
490        else
491          ThreadImpl::current()->isInterrupted();
492
493        return task;
494
495      }
496
497      bool isCanceled() {
498        return _taskQueue.isCanceled();
499      }
500
501      void cancel() {
502        _taskQueue.cancel();
503      }
504
505      bool wait(unsigned long timeout) {
506        return _waitingQueue.wait(timeout);
507      }
508
509    };
510
511    //! Executor job
512    class Worker : public Runnable {
513
514      CountedPtr< ExecutorImpl > _impl;
515     
516    public:
517     
518      //! Create a Worker that draws upon the given Queue
519      Worker(const CountedPtr< ExecutorImpl >& impl)
520        : _impl(impl) { }
521   
522      //! Run until Thread or Queue are canceled
523      void run() {
524       
525        _impl->registerThread();
526       
527        // Run until the Queue is canceled
528        while(!Thread::canceled()) {
529         
530          // Draw tasks from the queue
531          ExecutorTask task( _impl->next() );
532          task->run();
533                   
534        }
535       
536        _impl->unregisterThread();
537   
538      }
539     
540    }; /* Worker */
541
542
543    //! Helper
544    class Shutdown : public Runnable {
545
546      CountedPtr< ExecutorImpl > _impl;
547
548    public:
549
550      Shutdown(const CountedPtr< ExecutorImpl >& impl)
551        : _impl(impl) { }
552     
553      void run() {       
554        _impl->cancel();
555      }
556
557    }; /* Shutdown */
558
559  }
560
561  PoolExecutor::PoolExecutor(size_t n)
562    : _impl( new ExecutorImpl() ), _shutdown( new Shutdown(_impl) ) {
563   
564    size(n);
565   
566    // Request cancelation when main() exits
567    ThreadQueue::instance()->insertShutdownTask(_shutdown);
568
569  }
570
571  PoolExecutor::~PoolExecutor() {
572
573    try {
574     
575      /**
576       * If the shutdown task for this executor has not already been
577       * selected to run, then run it locally
578       */
579      if(ThreadQueue::instance()->removeShutdownTask(_shutdown))
580        _shutdown->run();
581       
582    } catch(...) { }
583
584  }
585
586  void PoolExecutor::interrupt() {
587    _impl->interrupt();
588  }
589
590  void PoolExecutor::size(size_t n) {
591   
592    if(n < 1)
593      throw InvalidOp_Exception();
594
595    for(size_t m = _impl->workers(n); m > 0; --m)
596      Thread t(new Worker(_impl));
597
598  }
599
600  size_t PoolExecutor::size() {
601    return _impl->workers();
602  }
603
604
605  void PoolExecutor::execute(const Task& task) {
606
607    // Enqueue the task, the Queue will reject it with a
608    // Cancelation_Exception if the Executor has been canceled
609    _impl->execute(task);
610
611  }
612
613  void PoolExecutor::cancel() {
614    _impl->cancel();
615  }
616
617  bool PoolExecutor::isCanceled() {
618    return _impl->isCanceled();
619  }
620 
621  void PoolExecutor::wait() {   
622    _impl->wait(0);
623  }
624
625  bool PoolExecutor::wait(unsigned long timeout) {
626    return _impl->wait(timeout == 0 ? 1 : timeout);
627  }
628
629}
Note: See TracBrowser for help on using the browser.