root/trunk/dep/include/zthread/BoundedQueue.h @ 2

Revision 2, 11.3 kB (checked in by yumileroy, 17 years ago)

[svn] * Proper SVN structure

Original author: Neo2003
Date: 2008-10-02 16:23:55-05:00

Line 
1/*
2 * Copyright (c) 2005, Eric Crahen
3 *
4 * Permission is hereby granted, free of charge, to any person obtaining a copy
5 * of this software and associated documentation files (the "Software"), to deal
6 * in the Software without restriction, including without limitation the rights
7 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8 * copies of the Software, and to permit persons to whom the Software is furnished
9 * to do so, subject to the following conditions:
10 *
11 * The above copyright notice and this permission notice shall be included in all
12 * copies or substantial portions of the Software.
13 *
14 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
18 * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
19 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
20 *
21 */
22
23#ifndef __ZTBOUNDEDQUEUE_H__
24#define __ZTBOUNDEDQUEUE_H__
25
26#include "zthread/Condition.h"
27#include "zthread/Guard.h"
28#include "zthread/Queue.h"
29
30#include <deque>
31
32namespace ZThread {
33
34  /**
35   * @class BoundedQueue
36   *
37   * @author Eric Crahen <http://www.code-foo.com>
38   * @date <2003-07-16T13:54:04-0400>
39   * @version 2.3.0
40   *
41   * A BoundedQueue provides serialized access to a set of values. It differs from other
42   * Queues by adding a maximum capacity, giving it the following properties:
43   *
44   * - Threads calling the empty() methods will be blocked until the BoundedQueue becomes empty.
45   * - Threads calling the next() methods will be blocked until the BoundedQueue has a value to
46   *   return.
47   * - Threads calling the add() methods will be blocked until the number of values in the
48   *   Queue drops below the maximum capacity.
49   *
50   * @see Queue
51   */
52  template <class T, class LockType, typename StorageType=std::deque<T> >
53    class BoundedQueue : public Queue<T>, public Lockable {
54
55      //! Maximum capacity for the Queue
56      size_t _capacity;
57
58      //! Serialize access
59      LockType _lock;
60
61      //! Signaled if not full
62      Condition _notFull;
63
64      //! Signaled if not empty
65      Condition _notEmpty;
66
67      //! Signaled if empty
68      Condition _isEmpty;
69
70      //! Storage backing the queue
71      StorageType _queue;
72
73      //! Cancellation flag
74      volatile bool _canceled;
75
76      public:
77
78      /**
79       * Create a BoundedQueue with the given capacity.
80       *
81       * @param capacity maximum number of values to allow in the Queue at
82       *        at any time
83       */
84      BoundedQueue(size_t capacity)
85        : _notFull(_lock), _notEmpty(_lock), _isEmpty(_lock), 
86          _capacity(capacity), _canceled(false) {}
87 
88      //! Destroy this Queue
89      virtual ~BoundedQueue() { }
90   
91      /**
92       * Get the maximum capacity of this Queue.
93       *
94       * @return <i>size_t</i> maximum capacity
95       */
96      size_t capacity() { 
97        return _capacity; 
98      }
99
100      /**
101       * Add a value to this Queue.
102       *
103       * If the number of values in the queue matches the value returned by <i>capacity</i>()
104       * then the calling thread will be blocked until at least one value is removed from
105       * the Queue.
106       *
107       * @param item value to be added to the Queue
108       *
109       * @exception Cancellation_Exception thrown if this Queue has been canceled.
110       * @exception Interrupted_Exception thrown if the thread was interrupted while waiting
111       *            to add a value
112       *
113       * @pre  The Queue should not have been canceled prior to the invocation of this function.
114       * @post If no exception is thrown, a copy of <i>item</i> will have been added to the Queue.
115       *
116       * @see Queue::add(const T& item)
117       */
118      virtual void add(const T& item) {
119       
120        Guard<LockType> g(_lock);
121       
122        // Wait for the capacity of the Queue to drop
123        while ((_queue.size() == _capacity) && !_canceled)
124          _notFull.wait();
125     
126        if(_canceled)
127          throw Cancellation_Exception();
128
129        _queue.push_back(item);
130        _notEmpty.signal(); // Wake any waiters
131     
132   
133      }
134   
135      /**
136       * Add a value to this Queue.
137       *
138       * If the number of values in the queue matches the value returned by <i>capacity</i>()
139       * then the calling thread will be blocked until at least one value is removed from
140       * the Queue.
141       *
142       * @param item value to be added to the Queue
143       * @param timeout maximum amount of time (milliseconds) this method may block
144       *        the calling thread.
145       *
146       * @return
147       *   - <em>true</em> if a copy of <i>item</i> can be added before <i>timeout</i>
148       *     milliseconds elapse.
149       *   - <em>false</em> otherwise.
150       *
151       * @exception Cancellation_Exception thrown if this Queue has been canceled.
152       * @exception Interrupted_Exception thrown if the thread was interrupted while waiting
153       *            to add a value
154       *
155       * @pre  The Queue should not have been canceled prior to the invocation of this function.
156       * @post If no exception is thrown, a copy of <i>item</i> will have been added to the Queue.
157       *
158       * @see Queue::add(const T& item, unsigned long timeout)
159       */
160      virtual bool add(const T& item, unsigned long timeout) {
161   
162        try {
163
164          Guard<LockType> g(_lock, timeout);
165     
166          // Wait for the capacity of the Queue to drop
167          while ((_queue.size() == _capacity) && !_canceled)
168            if(!_notFull.wait(timeout))
169              return false;
170     
171          if(_canceled)
172            throw Cancellation_Exception();
173     
174          _queue.push_back(item);
175          _notEmpty.signal(); // Wake any waiters
176     
177        } catch(Timeout_Exception&) { return false; }
178   
179        return true;
180
181      }
182
183      /**
184       * Retrieve and remove a value from this Queue.
185       *
186       * If invoked when there are no values present to return then the calling thread
187       * will be blocked until a value arrives in the Queue.
188       *
189       * @return <em>T</em> next available value
190       *
191       * @exception Cancellation_Exception thrown if this Queue has been canceled.
192       * @exception Interrupted_Exception thrown if the thread was interrupted while waiting
193       *            to retrieve a value
194       *
195       * @pre  The Queue should not have been canceled prior to the invocation of this function.
196       * @post The value returned will have been removed from the Queue.
197       */
198      virtual T next() {
199   
200        Guard<LockType> g(_lock);
201     
202        while ( _queue.empty() && !_canceled)
203          _notEmpty.wait();
204   
205        if( _queue.empty()) // Queue canceled
206          throw Cancellation_Exception(); 
207
208        T item = _queue.front();
209        _queue.pop_front();
210     
211        _notFull.signal(); // Wake any thread trying to add
212
213        if(_queue.empty()) // Wake empty waiters
214          _isEmpty.broadcast();
215
216        return item;
217
218      }
219
220      /**
221       * Retrieve and remove a value from this Queue.
222       *
223       * If invoked when there are no values present to return then the calling thread
224       * will be blocked until a value arrives in the Queue.
225       *
226       * @param timeout maximum amount of time (milliseconds) this method may block
227       *        the calling thread.
228       *
229       * @return <em>T</em> next available value
230       *
231       * @exception Cancellation_Exception thrown if this Queue has been canceled.
232       * @exception Timeout_Exception thrown if the timeout expires before a value
233       *            can be retrieved.
234       *
235       * @pre  The Queue should not have been canceled prior to the invocation of this function.
236       * @post The value returned will have been removed from the Queue.
237       */
238      virtual T next(unsigned long timeout) {
239     
240        Guard<LockType> g(_lock, timeout);
241   
242        // Wait for items to be added
243        while (_queue.empty() && !_canceled) {
244          if(!_notEmpty.wait(timeout))
245            throw Timeout_Exception();
246        }
247
248        if(_queue.empty())  // Queue canceled
249          throw Cancellation_Exception(); 
250
251        T item = _queue.front();
252        _queue.pop_front();
253
254        _notFull.signal(); // Wake add() waiters
255
256        if(_queue.empty()) // Wake empty() waiters
257          _isEmpty.broadcast();
258   
259        return item;
260   
261      }
262
263      /**
264       * Cancel this queue.
265       *
266       * @post Any threads blocked by an add() function will throw a Cancellation_Exception.
267       * @post Any threads blocked by a next() function will throw a Cancellation_Exception.
268       *
269       * @see Queue::cancel()
270       */
271      virtual void cancel() {
272   
273        Guard<LockType> g(_lock);
274
275        _canceled = true;
276        _notEmpty.broadcast(); // Wake next() waiters
277
278      }
279
280      /**
281       * @see Queue::isCanceled()
282       */
283      virtual bool isCanceled() {
284
285        // Faster check since the Queue will not become un-canceled
286        if(_canceled)
287          return true;
288   
289        Guard<LockType> g(_lock);
290
291        return _canceled;
292
293      }
294
295      /**
296       * @see Queue::size()
297       */
298      virtual size_t size() {
299 
300        Guard<LockType> g(_lock);
301        return _queue.size();
302
303      }
304
305      /**
306       * @see Queue::size(unsigned long timeout)
307       */
308      virtual size_t size(unsigned long timeout) {
309
310        Guard<LockType> g(_lock, timeout);
311        return _queue.size();
312
313      }
314
315      /**
316       * Test whether any values are available in this Queue.
317       *
318       * The calling thread is blocked until there are no values present
319       * in the Queue.
320       *
321       * @return
322       *  - <em>true</em> if there are no values available.
323       *  - <em>false</em> if there <i>are</i> values available.
324       *
325       * @see Queue::empty()
326       */
327      virtual bool empty() {
328
329        Guard<LockType> g(_lock);
330
331        while(!_queue.empty()) // Wait for an empty signal
332          _isEmpty.wait();
333   
334        return true;
335
336
337      }
338
339
340      /**
341       * Test whether any values are available in this Queue.
342       *
343       * The calling thread is blocked until there are no values present
344       * in the Queue.
345       *
346       * @param timeout maximum amount of time (milliseconds) this method may block
347       *        the calling thread.
348       *
349       * @return
350       *  - <em>true</em> if there are no values available.
351       *  - <em>false</em> if there <i>are</i> values available.
352       *
353       * @exception Timeout_Exception thrown if <i>timeout</i> milliseconds
354       *            expire before a value becomes available
355       *
356       * @see Queue::empty()
357       */
358      virtual bool empty(unsigned long timeout) {
359
360        Guard<LockType> g(_lock, timeout);
361
362        while(!_queue.empty()) // Wait for an empty signal
363          _isEmpty.wait(timeout);
364   
365        return true;
366
367      }
368
369      public:
370
371      virtual void acquire() {
372        _lock.acquire();
373      }
374
375      virtual bool tryAcquire(unsigned long timeout) {
376        return _lock.tryAcquire(timeout);
377      }
378
379      virtual void release() {
380        _lock.release();
381      }
382
383    }; /* BoundedQueue */
384
385} // namespace ZThread
386
387#endif // __ZTBOUNDEDQUEUE_H__
Note: See TracBrowser for help on using the browser.