root/trunk/dep/include/zthread/BlockingQueue.h @ 194

Revision 2, 6.3 kB (checked in by yumileroy, 17 years ago)

[svn] * Proper SVN structure

Original author: Neo2003
Date: 2008-10-02 16:23:55-05:00

Line 
1/*
2 * Copyright (c) 2005, Eric Crahen
3 *
4 * Permission is hereby granted, free of charge, to any person obtaining a copy
5 * of this software and associated documentation files (the "Software"), to deal
6 * in the Software without restriction, including without limitation the rights
7 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8 * copies of the Software, and to permit persons to whom the Software is furnished
9 * to do so, subject to the following conditions:
10 *
11 * The above copyright notice and this permission notice shall be included in all
12 * copies or substantial portions of the Software.
13 *
14 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
18 * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
19 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
20 *
21 */
22
23#ifndef __ZTBLOCKINGQUEUE_H__
24#define __ZTBLOCKINGQUEUE_H__
25
26#include "zthread/Guard.h"
27#include "zthread/Condition.h"
28#include "zthread/Queue.h"
29
30#include <deque>
31
32namespace ZThread {
33 
34  /**
35   * @class BlockingQueue
36   * @author Eric Crahen <http://www.code-foo.com>
37   * @date <2003-07-16T12:01:43-0400>
38   * @version 2.3.0
39   *
40   * Like a LockedQueue, a BlockingQueue is a Queue implementation that provides
41   * serialized access to the items added to it. It differs by causing threads
42   * accessing the next() methods to block until a value becomes available.
43   */
44  template <class T, class LockType, typename StorageType = std::deque<T> >
45    class BlockingQueue : public Queue<T>, public Lockable {
46
47      //! Serialize access
48      LockType _lock;
49
50      //! Signaled when empty
51      Condition _notEmpty;
52
53      //! Storage backing the queue
54      StorageType _queue;
55
56      //! Cancellation flag
57      volatile bool _canceled;
58
59      public:
60
61      //! Create a new BlockingQueue
62      BlockingQueue() : _notEmpty(_lock), _canceled(false) {}
63
64      //! Destroy this BlockingQueue
65      virtual ~BlockingQueue() { }
66
67      /**
68       * @see Queue::add(const T& item)
69       */
70      virtual void add(const T& item) {
71
72        Guard<LockType> g(_lock);
73   
74        if(_canceled)
75          throw Cancellation_Exception();
76       
77        _queue.push_back(item);
78       
79        _notEmpty.signal();
80
81      }
82
83      /**
84       * @see Queue::add(const T& item, unsigned long timeout)
85       */
86      virtual bool add(T item, unsigned long timeout) {
87
88        try {
89
90          Guard<LockType> g(_lock, timeout);
91     
92          if(_canceled)
93            throw Cancellation_Exception();
94     
95          _queue.push_back(item);
96
97          _notEmpty.signal();
98
99        } catch(Timeout_Exception&) { return false; }
100 
101        return true;   
102
103      }
104
105      /**
106       * Get a value from this Queue. The calling thread may block indefinitely.
107       *
108       * @return <em>T</em> next available value
109       *
110       * @exception Cancellation_Exception thrown if this Queue has been canceled.
111       *
112       * @exception Interrupted_Exception thrown if the calling thread is interrupted
113       *            before a value becomes available.
114       *
115       * @pre  The Queue should not have been canceled prior to the invocation of this function.
116       * @post The value returned will have been removed from the Queue.
117       *
118       * @see Queue::next()
119       */
120      virtual T next() {
121
122        Guard<LockType> g(_lock);
123
124        while(_queue.empty() && !_canceled)
125          _notEmpty.wait();
126
127        if( _queue.empty() )
128          throw Cancellation_Exception();
129       
130        T item = _queue.front();
131        _queue.pop_front();
132   
133        return item;
134
135      }
136
137
138      /**
139       * Get a value from this Queue. The calling thread may block indefinitely.
140       *
141       * @param timeout maximum amount of time (milliseconds) this method may block
142       *        the calling thread.
143       *
144       * @return <em>T</em> next available value
145       *
146       * @exception Cancellation_Exception thrown if this Queue has been canceled.
147       * @exception Timeout_Exception thrown if the timeout expires before a value
148       *            can be retrieved.
149       * @exception Interrupted_Exception thrown if the calling thread is interrupted
150       *            before a value becomes available.
151       *
152       * @pre  The Queue should not have been canceled prior to the invocation of this function.
153       * @post The value returned will have been removed from the Queue.
154       *
155       * @see Queue::next(unsigned long timeout)
156       */
157      virtual T next(unsigned long timeout) {
158
159        Guard<LockType> g(_lock, timeout);
160
161        while(_queue.empty() && !_canceled) {
162          if(!_notEmpty.wait(timeout))
163            throw Timeout_Exception();
164        }
165
166        if(_queue.empty() )
167          throw Cancellation_Exception();
168 
169        T item = _queue.front();
170        _queue.pop_front();
171   
172        return item;
173
174      }
175
176
177      /**
178       * @see Queue::cancel()
179       *
180       * @post If threads are blocked on one of the next() functions then
181       *       they will be awakened with a Cancellation_Exception.
182       */
183      virtual void cancel() {
184
185        Guard<LockType> g(_lock);
186
187        _notEmpty.broadcast();
188        _canceled = true;
189
190      }
191
192      /**
193       * @see Queue::isCanceled()
194       */
195      virtual bool isCanceled() {
196
197        // Faster check since the queue will not become un-canceled
198        if(_canceled)
199          return true;
200       
201        Guard<LockType> g(_lock);
202
203        return _canceled;
204
205      }
206
207      /**
208       * @see Queue::size()
209       */
210      virtual size_t size() {
211
212        Guard<LockType> g(_lock);
213        return _queue.size();
214
215      }
216
217      /**
218       * @see Queue::size(unsigned long timeout)
219       */
220      virtual size_t size(unsigned long timeout) {
221
222        Guard<LockType> g(_lock, timeout);
223        return _queue.size();
224
225      }
226
227      public:
228
229      virtual void acquire() {
230        _lock.acquire();
231      }
232
233      virtual bool tryAcquire(unsigned long timeout) {
234        return _lock.tryAcquire(timeout);
235      }
236
237      virtual void release() {
238        _lock.release();
239      }
240
241    }; /* BlockingQueue */
242
243} // namespace ZThread
244
245#endif // __ZTBLOCKINGQUEUE_H__
Note: See TracBrowser for help on using the browser.