root/trunk/dep/src/zthread/ThreadedExecutor.cxx @ 2

Revision 2, 11.8 kB (checked in by yumileroy, 17 years ago)

[svn] * Proper SVN structure

Original author: Neo2003
Date: 2008-10-02 16:23:55-05:00

Line 
1/*
2 * Copyright (c) 2005, Eric Crahen
3 *
4 * Permission is hereby granted, free of charge, to any person obtaining a copy
5 * of this software and associated documentation files (the "Software"), to deal
6 * in the Software without restriction, including without limitation the rights
7 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8 * copies of the Software, and to permit persons to whom the Software is furnished
9 * to do so, subject to the following conditions:
10 *
11 * The above copyright notice and this permission notice shall be included in all
12 * copies or substantial portions of the Software.
13 *
14 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
18 * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
19 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
20 *
21 */
22
23#include "zthread/ThreadedExecutor.h"
24#include "zthread/Guard.h"
25#include "zthread/FastMutex.h"
26#include "zthread/Time.h"
27
28#include "ThreadImpl.h"
29
30namespace ZThread {
31
32  namespace {
33
34    //!
35    class WaiterQueue {
36
37      typedef std::deque<ThreadImpl*>  ThreadList;
38     
39      typedef struct group_t {
40        size_t     id;
41        size_t     count;
42        ThreadList waiters;
43        group_t(size_t n) : id(n), count(0) {}
44      } Group;
45
46      typedef std::deque<Group>  GroupList;
47
48      //! Predicate to find a specific group
49      struct by_id : public std::unary_function<bool, Group> {
50        size_t id;
51        by_id(size_t n) : id(n) {}
52        bool operator()(const Group& grp) {
53          return grp.id == id;
54        }
55      };
56
57      //! Functor to count groups
58      struct counter : public std::unary_function<void, Group> {
59        size_t count;
60        counter() : count(0) {}
61        void operator()(const Group& grp) { count += grp.count; }
62        operator size_t() { return count; }
63      };
64     
65      FastMutex     _lock;
66      GroupList _list;
67      size_t    _id;
68      size_t    _generation;
69
70    public:
71     
72      WaiterQueue() : _id(0), _generation(0) {
73        // At least one empty-group exists
74        _list.push_back( Group(_id++) );
75      }
76
77      /**
78       * Insert the current thread into the current waiter list
79       *
80       * @pre  At least one empty group exists
81       * @post At least one empty group exists
82       */
83      bool wait(unsigned long timeout) {
84
85        ThreadImpl* self = ThreadImpl::current();
86        Monitor& m = self->getMonitor();
87
88        Monitor::STATE state;
89
90        Guard<Lockable> g1(_lock);
91
92        // At least one empty-group exists
93        assert(!_list.empty());
94
95        // Return w/o waiting if there are no executing tasks
96        if((size_t)std::for_each(_list.begin(), _list.end(), counter()) < 1)
97          return true;
98
99        // Update the waiter list for the active group
100        _list.back().waiters.push_back(self);
101        size_t n = _list.back().id;
102
103        m.acquire();
104       
105        {
106         
107          Guard<Lockable, UnlockedScope> g2(g1);         
108          state = timeout == 0 ? m.wait() : m.wait(timeout);
109
110        }
111
112        m.release();
113
114        // If awoke due to a reason other than the last task in the group 'n' completing,
115        // then then find the group 'self' is waiting in
116        GroupList::iterator i = std::find_if(_list.begin(), _list.end(), by_id(n));
117        if(i != _list.end()) {
118
119          // Remove 'self' from that list if it is still a member
120          ThreadList::iterator j = std::find(i->waiters.begin(), i->waiters.end(), self);
121          if(j != i->waiters.end())
122            i->waiters.erase(j);
123
124        }
125
126        // At least one empty-group exists
127        assert(!_list.empty());
128
129        switch(state) {
130          case Monitor::SIGNALED:
131            break;
132          case Monitor::TIMEDOUT:
133            return false;
134          case Monitor::INTERRUPTED:
135            throw Interrupted_Exception();
136          default:
137            throw Synchronization_Exception();
138        }
139       
140        return true;
141
142      }
143     
144      /**
145       * Increment the active group count
146       *
147       * @pre at least 1 empty group exists
148       * @post at least 1 non-empty group exists
149       */
150      std::pair<size_t, size_t> increment() {
151       
152        Guard<FastMutex> g(_lock);
153       
154        // At least one empty-group exists
155        assert(!_list.empty());
156
157        GroupList::iterator i = --_list.end();
158        size_t n = i->id;
159
160        if(i == _list.end()) {
161
162          // A group should never have been removed until
163          // the final task in that group completed
164          assert(0);
165
166        }
167
168        i->count++;
169
170        // When the active group is being incremented, insert a new active group
171        // to replace it if there were waiting threads
172        if(i == --_list.end() && !i->waiters.empty())
173          _list.push_back(Group(_id++));
174
175        // At least 1 non-empty group exists
176        assert((size_t)std::for_each(_list.begin(), _list.end(), counter()) > 0);
177
178        return std::make_pair(n, _generation);
179
180      }
181     
182
183      /**
184       * Decrease the count for the group with the given id.
185       *
186       * @param n group id
187       *
188       * @pre  At least 1 non-empty group exists
189       * @post At least 1 empty group exists
190       */
191      void decrement(size_t n) {
192
193        Guard<FastMutex> g1(_lock);
194
195        // At least 1 non-empty group exists
196        assert((size_t)std::for_each(_list.begin(), _list.end(), counter()) > 0);
197
198        // Find the requested group
199        GroupList::iterator i = std::find_if(_list.begin(), _list.end(), by_id(n));
200        if(i == _list.end()) {
201         
202          // A group should never have been removed until
203          // the final task in that group completed
204          assert(0);
205
206        }
207
208        // Decrease the count for tasks in this group,
209        if(--i->count == 0 && i == _list.begin()) {
210         
211          do {
212
213            // When the first group completes, wake all waiters for every
214            // group, starting from the first until a group that is not
215            // complete is reached
216
217            /*
218            // Don't remove the empty active group
219            if(i == --_list.end() && i->waiters.empty())
220              break;
221            */
222           
223            if( awaken(*i) ) {
224           
225              // If all waiters were awakened, remove the group 
226              i = _list.erase(i);
227             
228            } else {
229             
230              {
231
232                // Otherwise, unlock and yield allowing the waiter
233                // lists to be updated if other threads are busy
234                Guard<FastLock, UnlockedScope> g2(g1);
235                ThreadImpl::yield();
236             
237              }
238
239              i = _list.begin();
240
241            }
242             
243          } while(i != _list.end() && i->count == 0);
244         
245          // Ensure that an active group exists
246          if(_list.empty())
247            _list.push_back( Group(++_id) );
248
249        }
250
251        // At least one group exists
252        assert(!_list.empty());
253       
254      }
255
256      /**
257       */
258      size_t generation(bool next = false) {
259
260        Guard<FastMutex> g(_lock);
261        return next ? _generation++ : _generation;
262
263      }
264     
265    private:
266     
267      /**
268       * Awaken all the waiters remaining in the given group
269       *
270       * @return
271       *   - true if all the waiting threads were successfully awakened.
272       *   - false if there were one or more threads that could not be awakened.
273       */
274      bool awaken(Group& grp) {
275
276        // Go through the waiter list in the given group;
277        for(ThreadList::iterator i = grp.waiters.begin(); i != grp.waiters.end();) {
278         
279          ThreadImpl* impl = *i;
280          Monitor& m = impl->getMonitor();
281         
282          // Try the monitor lock, if it cant be locked skip to the next waiter
283          if(m.tryAcquire()) {
284           
285            // Notify the monitor & remove from the waiter list so time isn't
286            // wasted checking it again.
287            i = grp.waiters.erase(i);
288           
289            // Try to wake the waiter, it doesn't matter if this is successful
290            // or not (only fails when the monitor is already going to stop waiting).
291            m.notify();
292            m.release();
293           
294          } else ++i;
295         
296        }
297       
298        return grp.waiters.empty();
299
300      }
301
302    };
303
304    //! Synchronization point for the Executor
305    class ExecutorImpl {
306
307      typedef std::deque<ThreadImpl*> ThreadList;
308
309      bool _canceled;
310      FastMutex _lock;     
311
312      //! Worker threads
313      ThreadList _threads;
314     
315      WaiterQueue _queue;
316
317    public:
318
319      ExecutorImpl() : _canceled(false) {}
320
321      WaiterQueue& getWaiterQueue() {
322        return _queue;
323      }
324
325      void registerThread(size_t generation) {
326               
327        // Interrupt slow starting threads
328        if(getWaiterQueue().generation() != generation)
329          ThreadImpl::current()->interrupt();
330
331        // Enqueue for possible future interrupt()
332        else {
333
334          Guard<FastMutex> g(_lock);
335          _threads.push_back( ThreadImpl::current() );
336
337        }
338
339      }
340
341      void unregisterThread() {
342       
343        Guard<FastMutex> g(_lock);
344        std::remove(_threads.begin(), _threads.end(), ThreadImpl::current() );
345
346      }
347
348      void cancel() {
349
350        Guard<FastMutex> g(_lock);
351        _canceled = true;
352
353      }
354
355      bool isCanceled() {
356
357        if(_canceled)
358          return true;
359
360        Guard<FastMutex> g(_lock);
361        return _canceled;
362
363      }
364
365      void interrupt() {
366
367        Guard<FastMutex> g(_lock);
368
369        // Interrupt all the registered threads
370        for(ThreadList::iterator i = _threads.begin(); i != _threads.end(); ++i)
371          (*i)->interrupt();
372       
373        // Bump the generation up, ensuring slow starting threads get this interrupt
374        getWaiterQueue().generation( true );
375
376      }     
377
378    }; /* ExecutorImpl */
379
380    //! Wrap a generation and a group around a task
381    class Worker : public Runnable {
382
383      CountedPtr< ExecutorImpl > _impl;
384      Task _task;
385
386      size_t _generation;
387      size_t _group;
388
389    public:
390
391      Worker(const CountedPtr< ExecutorImpl >& impl, const Task& task)
392        : _impl(impl), _task(task) {
393
394        std::pair<size_t, size_t> pr( _impl->getWaiterQueue().increment() );
395   
396        _group      = pr.first;
397        _generation = pr.second;
398
399      }
400
401      size_t group() const {
402        return _group;
403      }
404
405      size_t generation() const {
406        return _generation;
407      }
408     
409      void run() {
410       
411        // Register this thread once its begun; the generation is used to ensure
412        // threads that are slow starting are properly interrupted
413
414        _impl->registerThread( generation() );
415       
416        try {
417          _task->run();         
418        } catch(...) {
419          /* consume the exceptions the work propogates */
420        }
421       
422        _impl->getWaiterQueue().decrement( group() );
423
424        // Unregister this thread
425
426        _impl->unregisterThread();
427
428      }
429
430    }; /* Worker */
431
432  }
433
434  ThreadedExecutor::ThreadedExecutor() : _impl(new ExecutorImpl) {}
435
436  ThreadedExecutor::~ThreadedExecutor() {}
437 
438  void ThreadedExecutor::execute(const Task& task) {
439     
440    Thread t( new Worker(_impl, task) );
441
442  } 
443
444  void ThreadedExecutor::interrupt() {
445    _impl->interrupt();
446  }
447
448  void ThreadedExecutor::cancel() {
449    _impl->cancel();   
450  }
451 
452  bool ThreadedExecutor::isCanceled() {
453    return _impl->isCanceled();
454  }
455 
456  void ThreadedExecutor::wait() {
457    _impl->getWaiterQueue().wait(0);
458  }
459
460  bool ThreadedExecutor::wait(unsigned long timeout) {
461    return _impl->getWaiterQueue().wait(timeout == 0 ? 1 : timeout);
462  }
463
464}
Note: See TracBrowser for help on using the browser.