root/trunk/dep/include/zthread/Barrier.h @ 2

Revision 2, 9.3 kB (checked in by yumileroy, 17 years ago)

[svn] * Proper SVN structure

Original author: Neo2003
Date: 2008-10-02 16:23:55-05:00

Line 
1/*
2 * Copyright (c) 2005, Eric Crahen
3 *
4 * Permission is hereby granted, free of charge, to any person obtaining a copy
5 * of this software and associated documentation files (the "Software"), to deal
6 * in the Software without restriction, including without limitation the rights
7 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8 * copies of the Software, and to permit persons to whom the Software is furnished
9 * to do so, subject to the following conditions:
10 *
11 * The above copyright notice and this permission notice shall be included in all
12 * copies or substantial portions of the Software.
13 *
14 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
18 * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
19 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
20 *
21 */
22
23#ifndef __ZTBARRIER_H__
24#define __ZTBARRIER_H__
25
26#include "zthread/Condition.h"
27#include "zthread/Guard.h"
28#include "zthread/Waitable.h"
29#include "zthread/Runnable.h"
30
31namespace ZThread {
32
33  /**
34   * @class Barrier
35   * @author Eric Crahen <http://www.code-foo.com>
36   * @date <2003-07-16T09:54:01-0400>
37   * @version 2.2.1
38   *
39   * A Barrier is a Waitable object that serves as synchronization points for
40   * a set of threads. A Barrier is constructed for a fixed number (<i>N</i>) of threads.
41   * Threads attempting to wait() on a Barrier (<i> 1 - N</i>) will block until the <i>N</i>th
42   * thread arrives. The <i>N</i>th thread will awaken all the the others.
43   *
44   * An optional Runnable command may be associated with the Barrier. This will be run()
45   * when the <i>N</i>th thread arrives and Barrier is not broken.
46   *
47   * <b>Error Checking</b>
48   *
49   * A Barrier uses an all-or-nothing. All threads involved must successfully
50   * meet at Barrier. If any one of those threads leaves before all the threads
51   * have (as the result of an error or exception) then all threads present at
52   * the Barrier will throw BrokenBarrier_Exception.
53   *
54   * A broken Barrier will cause all threads attempting to wait() on it to
55   * throw a BrokenBarrier_Exception.
56   *
57   * A Barrier will remain 'broken', until it is manually reset().
58   */
59  template <unsigned int Count, class LockType>
60    class Barrier : public Waitable, private NonCopyable {
61
62    //! Broken flag
63    bool _broken;
64    //! Task flag
65    bool _haveTask;
66    //! Thread count
67    unsigned int _count;
68    //! Wait generation
69    unsigned int _generation;
70    //! Serialize access
71    LockType _lock;
72    //! Signaled when all thread arrive
73    Condition _arrived;
74    //! Command to run when all the thread arrive
75    Task _task;
76
77    public:
78
79    //! Create a Barrier
80    Barrier() 
81      : _broken(false), _haveTask(false), _count(Count), _generation(0), _arrived(_lock), _task(0) { }
82
83    /**
84     * Create a Barrier that executes the given task when all threads arrive
85     * without error
86     *
87     * @param task Task to associate with this Barrier
88     */
89    Barrier(const Task& task) 
90      : _broken(false), _haveTask(true), _count(Count), _generation(0), _arrived(_lock), 
91      _task(task) { }
92
93    //! Destroy this Barrier
94    virtual ~Barrier() {}
95
96    /**
97     * Enter barrier and wait for the other threads to arrive. This can block for an indefinite
98     * amount of time.
99     *
100     * @exception BrokenBarrier_Exception thrown when any thread has left a wait on this
101     *            Barrier as a result of an error.
102     * @exception Interrupted_Exception thrown when the calling thread is interrupted.
103     *            A thread may be interrupted at any time, prematurely ending a wait
104     *            for one thread and breaking the barrier for all threads
105     *
106     * @see Waitable::wait()
107     *
108     * @post If no exception was thrown, all threads have successfully arrived
109     * @post If an exception was thrown, the barrier is broken
110     */
111    virtual void wait() {
112
113      Guard<LockType> g(_lock);
114   
115      if(_broken) 
116        throw BrokenBarrier_Exception();
117
118      // Break the barrier if an arriving thread is interrupted
119      if(Thread::interrupted()) {
120     
121        // Release the other waiter, propagate the exception
122        _arrived.broadcast();
123        _broken = true;
124
125        throw Interrupted_Exception();
126
127      }
128   
129      if(--_count == 0) {
130     
131        // Wake the other threads if this was the last 
132        // arriving thread
133        _arrived.broadcast();
134     
135        // Try to run the associated task, if it throws then
136        // break the barrier and propagate the exception
137        try {
138
139          if(_task)
140            _task->run();
141
142          _generation++;
143
144        } catch(Synchronization_Exception&) {
145
146          _broken = true;
147          throw;
148
149        } catch(...) { assert(0); }
150
151      } else {
152
153        int myGeneration = _generation;
154
155        try {
156
157          // Wait for the other threads to arrive
158          _arrived.wait();
159
160        } catch(Interrupted_Exception&) {
161
162          // Its possible for a thread to be interrupted before the
163          // last thread arrives. If the interrupted thread hasn't
164          // resumed, then just propagate the interruption
165
166          if(myGeneration != _generation)
167            Thread().interrupt();
168
169          else _broken = true;
170
171        } catch(Synchronization_Exception&) {
172
173          // Break the barrier and propagate the exception
174          _broken = true;
175          throw;
176
177        }
178     
179        // If the thread woke because it was notified by the thread
180        // that broke the barrier, throw.
181        if(_broken) 
182          throw BrokenBarrier_Exception();
183   
184      } 
185
186    }
187
188    /**
189     * Enter barrier and wait for the other threads to arrive. This can block up to the
190     * amount of time specified with the timeout parameter. The barrier will not break
191     * if a thread leaves this function due to a timeout.
192     *
193     * @param timeout maximum amount of time, in milliseconds, to wait before 
194     *
195     * @return
196     *   - <em>true</em> if the set of tasks being wait for complete before
197     *                   <i>timeout</i> milliseconds elapse.
198     *   - <em>false</em> otherwise.
199     *
200     * @exception BrokenBarrier_Exception thrown when any thread has left a wait on this
201     *            Barrier as a result of an error.
202     * @exception Interrupted_Exception thrown when the calling thread is interrupted.
203     *            A thread may be interrupted at any time, prematurely ending a wait
204     *            for one thread and breaking the barrier for all threads
205     *
206     * @see Waitable::wait(unsigned long timeout)
207     *
208     * @post If no exception was thrown, all threads have successfully arrived
209     * @post If an exception was thrown, the barrier is broken
210     */
211    virtual bool wait(unsigned long timeout) {
212
213      Guard<LockType> g(_lock);
214   
215      if(_broken) 
216        throw BrokenBarrier_Exception();
217
218      // Break the barrier if an arriving thread is interrupted
219      if(Thread::interrupted()) {
220     
221        // Release the other waiter, propagate the exception
222        _arrived.broadcast();
223        _broken = true;
224
225        throw Interrupted_Exception();
226
227      }
228
229   
230      if(--_count == 0) {
231     
232        // Wake the other threads if this was the last 
233        // arriving thread
234        _arrived.broadcast();
235     
236        // Try to run the associated task, if it throws then
237        // break the barrier and propagate the exception
238        try {
239
240          if(_task)
241            _task->run();
242
243          _generation++;
244
245        } catch(Synchronization_Exception&) {
246
247          _broken = true;
248          throw;
249
250        } catch(...) { assert(0); }
251
252      } else {
253
254        int myGeneration = _generation;
255
256        try {
257
258          // Wait for the other threads to arrive
259          if(!_arrived.wait(timeout))
260            _broken = true;
261
262        } catch(Interrupted_Exception&) {
263
264          // Its possible for a thread to be interrupted before the
265          // last thread arrives. If the interrupted thread hasn't
266          // resumed, then just propagate the interruption
267
268          if(myGeneration != _generation)
269            Thread().interrupt();
270
271          else _broken = true;
272
273        } catch(Synchronization_Exception&) {
274
275          // Break the barrier and propagate the exception
276          _broken = true;
277          throw;
278
279        }
280     
281        // If the thread woke because it was notified by the thread
282        // that broke the barrier, throw.
283        if(_broken) 
284          throw BrokenBarrier_Exception();
285   
286      } 
287
288      return true;
289
290    }
291
292    /**
293     * Break the Barrier ending the wait for any threads that were waiting on
294     * the barrier.
295     *
296     * @post the Barrier is broken, all waiting threads will throw the
297     *       BrokenBarrier_Exception
298     */
299    void shatter() {
300     
301      Guard<LockType> g(_lock);   
302
303      _broken = true;
304      _arrived.broadcast();
305
306    }
307
308    /**
309     * Reset the Barrier.
310     *
311     * @post the Barrier is no longer Broken and can be used again.
312     */
313    void reset() {
314     
315      Guard<LockType> g(_lock);   
316
317      _broken = false;
318      _generation++;
319      _count = Count;
320     
321    }
322
323  };
324
325 
326} // namespace ZThread
327
328#endif // __ZTBARRIER_H__
Note: See TracBrowser for help on using the browser.