root/trunk/dep/include/zthread/MonitoredQueue.h @ 25

Revision 2, 9.6 kB (checked in by yumileroy, 17 years ago)

[svn] * Proper SVN structure

Original author: Neo2003
Date: 2008-10-02 16:23:55-05:00

Line 
1/*
2 * Copyright (c) 2005, Eric Crahen
3 *
4 * Permission is hereby granted, free of charge, to any person obtaining a copy
5 * of this software and associated documentation files (the "Software"), to deal
6 * in the Software without restriction, including without limitation the rights
7 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8 * copies of the Software, and to permit persons to whom the Software is furnished
9 * to do so, subject to the following conditions:
10 *
11 * The above copyright notice and this permission notice shall be included in all
12 * copies or substantial portions of the Software.
13 *
14 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
18 * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
19 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
20 *
21 */
22
23#ifndef __ZTMONITOREDQUEUE_H__
24#define __ZTMONITOREDQUEUE_H__
25
26#include "zthread/Condition.h"
27#include "zthread/Guard.h"
28#include "zthread/Queue.h"
29
30#include <deque>
31
32namespace ZThread {
33
34  /**
35   * @class MonitoredQueue
36   * @author Eric Crahen <http://www.code-foo.com>
37   * @date <2003-07-16T20:23:28-0400>
38   * @version 2.3.0
39   *
40   * A MonitoredQueue is a Queue implementation that provides  serialized access to the
41   * items added to it.
42   *
43   * - Threads calling the empty() methods will be blocked until the BoundedQueue becomes empty.
44   * - Threads calling the next() methods will be blocked until the BoundedQueue has a value to
45   *   return.
46   *
47   * @see Queue
48   */
49  template <class T, class LockType, typename StorageType=std::deque<T> >
50    class MonitoredQueue : public Queue<T>, public Lockable {
51
52      //! Serialize access
53      LockType _lock;
54
55      //! Signaled on not empty
56      Condition _notEmpty;
57
58      //! Signaled on empty
59      Condition _isEmpty;
60
61      //! Storage backing the queue
62      StorageType _queue;
63
64      //! Cancellation flag
65      volatile bool _canceled;
66
67      public:
68
69      //! Create a new MonitoredQueue
70      MonitoredQueue() 
71        : _notEmpty(_lock), _isEmpty(_lock), _canceled(false) {}
72
73      //! Destroy a MonitoredQueue, delete remaining items
74      virtual ~MonitoredQueue() { }
75
76      /**
77       * Add a value to this Queue.
78       *
79       * @param item value to be added to the Queue
80       *
81       * @exception Cancellation_Exception thrown if this Queue has been canceled.
82       * @exception Interrupted_Exception thrown if the thread was interrupted while waiting
83       *            to add a value
84       *
85       * @pre  The Queue should not have been canceled prior to the invocation of this function.
86       * @post If no exception is thrown, a copy of <i>item</i> will have been added to the Queue.
87       *
88       * @see Queue::add(const T& item)
89       */
90      virtual void add(const T& item) {
91
92        Guard<LockType> g(_lock);
93   
94        // Allow no further additions in the canceled state
95        if(_canceled)
96          throw Cancellation_Exception();
97
98        _queue.push_back( item );
99
100        _notEmpty.signal(); // Wake one waiter
101
102      }
103
104      /**
105       * Add a value to this Queue.
106       *
107       * @param item value to be added to the Queue
108       * @param timeout maximum amount of time (milliseconds) this method may block
109       *        the calling thread.
110       *
111       * @return
112       *   - <em>true</em> if a copy of <i>item</i> can be added before <i>timeout</i>
113       *     milliseconds elapse.
114       *   - <em>false</em> otherwise.
115       *
116       * @exception Cancellation_Exception thrown if this Queue has been canceled.
117       * @exception Interrupted_Exception thrown if the thread was interrupted while waiting
118       *            to add a value
119       *
120       * @pre  The Queue should not have been canceled prior to the invocation of this function.
121       * @post If no exception is thrown, a copy of <i>item</i> will have been added to the Queue.
122       *
123       * @see Queue::add(const T& item, unsigned long timeout)
124       */
125      virtual bool add(const T& item, unsigned long timeout) {
126 
127        try {
128
129          Guard<LockType> g(_lock, timeout);
130     
131          if(_canceled)
132            throw Cancellation_Exception();
133     
134          _queue.push_back(item);
135
136          _notEmpty.signal();
137
138        } catch(Timeout_Exception&) { return false; }
139 
140        return true;   
141
142      }
143
144      /**
145       * Retrieve and remove a value from this Queue.
146       *
147       * If invoked when there are no values present to return then the calling thread
148       * will be blocked until a value arrives in the Queue.
149       *
150       * @return <em>T</em> next available value
151       *
152       * @exception Cancellation_Exception thrown if this Queue has been canceled.
153       * @exception Interrupted_Exception thrown if the thread was interrupted while waiting
154       *            to retrieve a value
155       *
156       * @pre  The Queue should not have been canceled prior to the invocation of this function.
157       * @post The value returned will have been removed from the Queue.
158       */
159      virtual T next() {
160     
161        Guard<LockType> g(_lock);
162     
163        while (_queue.empty() && !_canceled) 
164          _notEmpty.wait();
165   
166        if(_queue.empty()) // Queue canceled
167          throw Cancellation_Exception(); 
168     
169        T item = _queue.front();
170        _queue.pop_front();
171
172        if(_queue.empty()) // Wake empty waiters
173          _isEmpty.broadcast();
174
175        return item;
176
177      }
178
179      /**
180       * Retrieve and remove a value from this Queue.
181       *
182       * If invoked when there are no values present to return then the calling thread
183       * will be blocked until a value arrives in the Queue.
184       *
185       * @param timeout maximum amount of time (milliseconds) this method may block
186       *        the calling thread.
187       *
188       * @return <em>T</em> next available value
189       *
190       * @exception Cancellation_Exception thrown if this Queue has been canceled.
191       * @exception Timeout_Exception thrown if the timeout expires before a value
192       *            can be retrieved.
193       *
194       * @pre  The Queue should not have been canceled prior to the invocation of this function.
195       * @post The value returned will have been removed from the Queue.
196       */
197      virtual T next(unsigned long timeout) {
198 
199        Guard<LockType> g(_lock, timeout);
200     
201        while(_queue.empty() && !_canceled) {
202          if(!_notEmpty.wait(timeout))
203            throw Timeout_Exception();
204        }
205
206        if( _queue.empty()) // Queue canceled
207          throw Cancellation_Exception(); 
208
209        T item = _queue.front();
210        _queue.pop_front();
211
212        if(_queue.empty()) // Wake empty waiters
213          _isEmpty.broadcast();
214
215        return item;
216
217      }
218
219
220      /**
221       * Cancel this queue.
222       *
223       * @post Any threads blocked by a next() function will throw a Cancellation_Exception.
224       *
225       * @see Queue::cancel()
226       */
227      virtual void cancel() {
228
229        Guard<LockType> g(_lock);
230
231        _canceled = true;
232        _notEmpty.broadcast(); // Wake next() waiters
233
234      }
235
236      /**
237       * @see Queue::isCanceled()
238       */
239      virtual bool isCanceled() {
240 
241        // Faster check since the queue will not become un-canceled
242        if(_canceled)
243          return true;
244   
245        Guard<LockType> g(_lock);
246
247        return _canceled;
248
249      }
250
251
252      /**
253       * @see Queue::size()
254       */
255      virtual size_t size() {
256
257        Guard<LockType> g(_lock);
258        return _queue.size();
259
260      }
261
262      /**
263       * @see Queue::size(unsigned long timeout)
264       */
265      virtual size_t size(unsigned long timeout) {
266
267        Guard<LockType> g(_lock, timeout);
268        return _queue.size();
269
270      }
271
272      /**
273       * Test whether any values are available in this Queue.
274       *
275       * The calling thread is blocked until there are no values present
276       * in the Queue.
277       *
278       * @return
279       *  - <em>true</em> if there are no values available.
280       *  - <em>false</em> if there <i>are</i> values available.
281       *
282       * @see Queue::empty()
283       */
284      virtual bool empty() {
285
286        Guard<LockType> g(_lock);
287
288        while(!_queue.empty()) // Wait for an empty signal
289          _isEmpty.wait();
290   
291        return true;
292
293      }
294
295      /**
296       * Test whether any values are available in this Queue.
297       *
298       * The calling thread is blocked until there are no values present
299       * in the Queue.
300       *
301       * @param timeout maximum amount of time (milliseconds) this method may block
302       *        the calling thread.
303       *
304       * @return
305       *  - <em>true</em> if there are no values available.
306       *  - <em>false</em> if there <i>are</i> values available.
307       *
308       * @exception Timeout_Exception thrown if <i>timeout</i> milliseconds
309       *            expire before a value becomes available
310       *
311       * @see Queue::empty()
312       */
313      virtual bool empty(unsigned long timeout) {
314 
315        Guard<LockType> g(_lock, timeout);
316
317        while(!_queue.empty()) // Wait for an empty signal
318          _isEmpty.wait(timeout);
319   
320        return true;
321
322      }
323
324      public:
325
326      virtual void acquire() {
327        _lock.acquire();
328      }
329
330      virtual bool tryAcquire(unsigned long timeout) {
331        return _lock.tryAcquire(timeout);
332      }
333
334      virtual void release() {
335        _lock.release();
336      }
337
338
339    }; /* MonitoredQueue */
340
341
342} // namespace ZThread
343
344#endif // __ZTMONITOREDQUEUE_H__
345
Note: See TracBrowser for help on using the browser.