root/trunk/dep/src/zthread/ThreadQueue.cxx @ 2

Revision 2, 7.4 kB (checked in by yumileroy, 17 years ago)

[svn] * Proper SVN structure

Original author: Neo2003
Date: 2008-10-02 16:23:55-05:00

Line 
1/*
2 * Copyright (c) 2005, Eric Crahen
3 *
4 * Permission is hereby granted, free of charge, to any person obtaining a copy
5 * of this software and associated documentation files (the "Software"), to deal
6 * in the Software without restriction, including without limitation the rights
7 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8 * copies of the Software, and to permit persons to whom the Software is furnished
9 * to do so, subject to the following conditions:
10 *
11 * The above copyright notice and this permission notice shall be included in all
12 * copies or substantial portions of the Software.
13 *
14 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
18 * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
19 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
20 *
21 */
22
23#include "DeferredInterruptionScope.h"
24#include "Debug.h"
25#include "ThreadImpl.h"
26#include "ThreadQueue.h"
27
28#include <algorithm>
29#include <deque>
30
31namespace ZThread {
32 
33  ThreadQueue::ThreadQueue()
34    : _waiter(0) {
35
36    ZTDEBUG("ThreadQueue created\n");
37
38  }
39
40  ThreadQueue::~ThreadQueue() {
41
42    ZTDEBUG("ThreadQueue waiting on remaining threads...\n");
43
44    // Ensure the current thread is mapped.
45    ThreadImpl* impl = ThreadImpl::current();
46
47    bool threadsWaiting = false;
48    bool waitRequired = false;
49
50    {
51
52      TaskList shutdownTasks;
53
54      { // Check the queue to for pending user threads
55       
56        Guard<FastLock> g(_lock);
57       
58        waitRequired = (_waiter != (ThreadImpl*)1);
59        _waiter = impl;
60       
61        threadsWaiting = !_userThreads.empty() || !_pendingThreads.empty();
62       
63        //ZTDEBUG("Wait required:   %d\n", waitRequired);
64        //ZTDEBUG("Threads waiting: %d\n", threadsWaiting);
65       
66        // Auto-cancel any active threads at the time main() goes out of scope
67        // "force" a gentle exit from the executing tasks; eventually the user-
68        // threads will transition into pending-threads
69        pollUserThreads();
70       
71        // Remove all the tasks about to be run from the task list so an indication
72        // can be given to threads calling removeShutdownTask() too late.
73        std::remove_copy(_shutdownTasks.begin(),
74                         _shutdownTasks.end(),
75                         std::back_inserter(shutdownTasks),
76                         Task((Runnable*)0));
77       
78        //ZTDEBUG("Threads waiting: %d\n", threadsWaiting);
79       
80      }
81
82      // Execute the shutdown tasks
83      for(TaskList::iterator i = shutdownTasks.begin(); i != shutdownTasks.end(); ++i) {
84        try {
85          (*i)->run();
86        } catch(...) { }
87      }
88
89    }
90
91    // Wait for all the users threads to get into the appropriate state
92    if(threadsWaiting) {
93
94
95      Monitor& m = _waiter->getMonitor();
96     
97      // Defer interruption while this thread waits for a signal from
98      // the last pending user thread
99      Guard<Monitor, CompoundScope<DeferredInterruptionScope, LockedScope> > g(m);
100      //ZTDEBUG("Threads waiting: %d %d\n", _userThreads.size(), _pendingThreads.size());
101
102      // Avoid race-condition where the last threads are done with thier tasks, but
103      // only begin the final part of the clean up phase after this destructor begins
104      // to run. Takes advantage of the fact that if all remaining threads have transitioned
105      // into a pending state by the time execution reaches this point, then there is no
106      // need to wait.
107      waitRequired = waitRequired && !(_userThreads.empty() && !_pendingThreads.empty());
108
109      // Reference threads can't be interrupted or otherwise
110      // manipulated. The only signal this monitor will receive
111      // at this point will be from the last pending thread.
112      if(waitRequired && m.wait() != Monitor::SIGNALED) {
113        assert(0);
114      }
115
116      // Join those pending threads
117      pollPendingThreads();
118     
119    }
120     
121    // Clean up the reference threads
122    pollReferenceThreads();
123   
124    ZTDEBUG("ThreadQueue destroyed\n");
125
126  }
127 
128
129  void ThreadQueue::insertPendingThread(ThreadImpl* impl) {
130    ZTDEBUG("insertPendingThread()\n");
131    Guard<FastLock> g(_lock);
132
133    // Move from the user-thread list to the pending-thread list
134    ThreadList::iterator i = std::find(_userThreads.begin(), _userThreads.end(), impl);
135    if(i != _userThreads.end())
136      _userThreads.erase(i);
137
138    _pendingThreads.push_back(impl);
139   
140    // Wake the main thread,if its waiting, when the last pending-thread becomes available;
141    // Otherwise, take note that no wait for pending threads to finish is needed
142    if(_userThreads.empty())
143      if(_waiter && _waiter != (ThreadImpl*)1)
144        _waiter->getMonitor().notify();
145      else
146        _waiter = (ThreadImpl*)!_waiter;
147
148    ZTDEBUG("1 pending-thread added.\n");
149
150  }
151
152  void ThreadQueue::insertReferenceThread(ThreadImpl* impl) {
153
154    Guard<FastLock> g(_lock);
155    _referenceThreads.push_back(impl);
156
157    ZTDEBUG("1 reference-thread added.\n");
158
159  }
160
161  void ThreadQueue::insertUserThread(ThreadImpl* impl) {
162
163    Guard<FastLock> g(_lock);
164    _userThreads.push_back(impl);
165
166    // Reclaim pending-threads
167    pollPendingThreads();
168
169    // Auto-cancel threads that are started when main() is out of scope
170    if(_waiter)
171      impl->cancel(true);
172
173    ZTDEBUG("1 user-thread added.\n");
174   
175  }
176
177
178  void ThreadQueue::pollPendingThreads() {
179
180    ZTDEBUG("pollPendingThreads()\n");
181
182    for(ThreadList::iterator i = _pendingThreads.begin(); i != _pendingThreads.end();) {
183
184      ThreadImpl* impl = (ThreadImpl*)*i;
185      ThreadOps::join(impl);
186     
187      impl->delReference();
188           
189      i = _pendingThreads.erase(i);
190
191      ZTDEBUG("1 pending-thread reclaimed.\n");
192
193    }
194
195  }
196
197  void ThreadQueue::pollReferenceThreads() {
198
199    ZTDEBUG("pollReferenceThreads()\n");
200
201    for(ThreadList::iterator i = _referenceThreads.begin(); i != _referenceThreads.end(); ++i) {
202     
203      ThreadImpl* impl = (ThreadImpl*)*i;
204      impl->delReference();
205     
206      ZTDEBUG("1 reference-thread reclaimed.\n");
207
208    }
209   
210  }
211
212  void ThreadQueue::pollUserThreads() {
213
214    ZTDEBUG("pollUserThreads()\n");
215
216    for(ThreadList::iterator i = _userThreads.begin(); i != _userThreads.end(); ++i) {
217
218      ThreadImpl* impl = *i;
219      impl->cancel(true);
220
221      ZTDEBUG("1 user-thread reclaimed.\n");
222     
223    }
224
225  }
226 
227  void ThreadQueue::insertShutdownTask(Task& task) {
228
229    bool hasWaiter = false;
230
231    {
232
233      Guard<FastLock> g(_lock);
234   
235      // Execute later when the ThreadQueue is destroyed 
236      if( !(hasWaiter = (_waiter != 0)) ) {
237
238        _shutdownTasks.push_back(task);
239        //ZTDEBUG("1 shutdown task added. %d\n", _shutdownTasks.size());
240       
241      }
242
243    }
244   
245    // Execute immediately if things are shutting down
246    if(hasWaiter)
247      task->run();
248   
249  }
250 
251  bool ThreadQueue::removeShutdownTask(const Task& task) {
252   
253    Guard<FastLock> g(_lock);
254   
255    TaskList::iterator i = std::find(_shutdownTasks.begin(), _shutdownTasks.end(), task);
256    bool removed = (i != _shutdownTasks.end());
257    if(removed)
258      _shutdownTasks.erase(i);
259
260    //ZTDEBUG("1 shutdown task removed (%d)-%d\n", removed, _shutdownTasks.size());
261   
262    return removed;
263
264  }
265
266};
Note: See TracBrowser for help on using the browser.