root/trunk/src/game/WorldSocketMgr.cpp @ 111

Revision 102, 8.1 kB (checked in by yumileroy, 17 years ago)

[svn] Fixed copyright notices to comply with GPL.

Original author: w12x
Date: 2008-10-23 03:29:52-05:00

Line 
1/*
2 * Copyright (C) 2005-2008 MaNGOS <http://www.mangosproject.org/>
3 *
4 * Copyright (C) 2008 Trinity <http://www.trinitycore.org/>
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 2 of the License, or
9 * (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19 */
20
21/** \file WorldSocketMgr.cpp
22 *  \ingroup u2w
23 *  \author Derex <derex101@gmail.com>
24 */
25
26#include "WorldSocketMgr.h"
27
28#include <ace/ACE.h>
29#include <ace/Log_Msg.h>
30#include <ace/Reactor.h>
31#include <ace/Reactor_Impl.h>
32#include <ace/TP_Reactor.h>
33#include <ace/Dev_Poll_Reactor.h>
34#include <ace/Guard_T.h>
35#include <ace/Atomic_Op.h>
36#include <ace/os_include/arpa/os_inet.h>
37#include <ace/os_include/netinet/os_tcp.h>
38#include <ace/os_include/sys/os_types.h>
39#include <ace/os_include/sys/os_socket.h>
40
41#include <set>
42
43#include "Log.h"
44#include "Common.h"
45#include "Config/ConfigEnv.h"
46#include "Database/DatabaseEnv.h"
47#include "WorldSocket.h"
48
49/**
50 * This is a helper class to WorldSocketMgr ,that manages
51 * network threads, and assigning connections from acceptor thread
52 * to other network threads
53 */
54class ReactorRunnable : protected ACE_Task_Base
55{
56public:
57
58  ReactorRunnable () :
59  m_ThreadId (-1),
60  m_Connections (0),
61  m_Reactor (0)
62  {
63    ACE_Reactor_Impl* imp = 0;
64
65#if defined (ACE_HAS_EVENT_POLL) || defined (ACE_HAS_DEV_POLL)
66    imp = new ACE_Dev_Poll_Reactor ();
67
68    imp->max_notify_iterations (128);
69    imp->restart (1);
70#else
71    imp = new ACE_TP_Reactor ();
72    imp->max_notify_iterations (128);
73#endif
74
75    m_Reactor = new ACE_Reactor (imp, 1);
76  }
77
78  virtual
79  ~ReactorRunnable ()
80  {
81    this->Stop ();
82    this->Wait ();
83
84    if (m_Reactor)
85      delete m_Reactor;
86  }
87
88  void
89  Stop ()
90  {
91    m_Reactor->end_reactor_event_loop ();
92  }
93
94  int
95  Start ()
96  {
97    if (m_ThreadId != -1)
98      return -1;
99
100    return (m_ThreadId = this->activate ());
101  }
102
103  void
104  Wait ()
105  {
106    ACE_Task_Base::wait ();
107  }
108
109  long
110  Connections ()
111  {
112    return static_cast<long> (m_Connections.value ());
113  }
114
115  int
116  AddSocket (WorldSocket* sock)
117  {
118    ACE_GUARD_RETURN (ACE_Thread_Mutex, Guard, m_NewSockets_Lock, -1);
119
120    ++m_Connections;
121    sock->AddReference();
122    sock->reactor (m_Reactor);
123    m_NewSockets.insert (sock);
124
125    return 0;
126  }
127 
128  ACE_Reactor* GetReactor ()
129  {
130    return m_Reactor;
131  }
132 
133protected:
134 
135  void
136  AddNewSockets ()
137  {
138    ACE_GUARD (ACE_Thread_Mutex, Guard, m_NewSockets_Lock);
139
140    if (m_NewSockets.empty ())
141      return;
142
143    for (SocketSet::iterator i = m_NewSockets.begin (); i != m_NewSockets.end (); ++i)
144      {
145        WorldSocket* sock = (*i);
146
147        if (sock->IsClosed ())
148          {
149            sock->RemoveReference ();
150            --m_Connections;
151          }
152        else
153          m_Sockets.insert (sock);
154      }
155
156    m_NewSockets.clear ();
157  }
158
159  virtual int
160  svc (void)
161  {
162    DEBUG_LOG ("Network Thread Starting");
163
164    WorldDatabase.ThreadStart ();
165
166    ACE_ASSERT (m_Reactor);
167
168    SocketSet::iterator i, t;
169
170    while (!m_Reactor->reactor_event_loop_done ())
171      {
172        // dont be too smart to move this outside the loop
173        // the run_reactor_event_loop will modify interval
174        ACE_Time_Value interval (0, 10000);
175
176        if (m_Reactor->run_reactor_event_loop (interval) == -1)
177          break;
178
179        AddNewSockets ();
180
181        for (i = m_Sockets.begin (); i != m_Sockets.end ();)
182          {
183            if ((*i)->Update () == -1)
184              {
185                t = i;
186                i++;
187                (*t)->CloseSocket ();
188                (*t)->RemoveReference ();
189                --m_Connections;
190                m_Sockets.erase (t);
191              }
192            else
193              i++;
194          }
195      }
196
197    WorldDatabase.ThreadEnd ();
198
199    DEBUG_LOG ("Network Thread Exitting");
200
201    return 0;
202  }
203
204private:
205  typedef ACE_Atomic_Op<ACE_SYNCH_MUTEX, long> AtomicInt;
206  typedef std::set<WorldSocket*> SocketSet;
207
208  ACE_Reactor* m_Reactor;
209  AtomicInt m_Connections;
210  int m_ThreadId;
211
212  SocketSet m_Sockets;
213
214  SocketSet m_NewSockets;
215  ACE_Thread_Mutex m_NewSockets_Lock;
216};
217
218
219
220WorldSocketMgr::WorldSocketMgr () :
221m_NetThreadsCount (0),
222m_NetThreads (0),
223m_SockOutKBuff (-1),
224m_SockOutUBuff (65536),
225m_UseNoDelay (true),
226m_Acceptor (0) {}
227
228WorldSocketMgr::~WorldSocketMgr ()
229{
230  if (m_NetThreads)
231    delete [] m_NetThreads;
232 
233  if(m_Acceptor)
234    delete m_Acceptor;
235}
236
237int
238WorldSocketMgr::StartReactiveIO (ACE_UINT16 port, const char* address)
239{
240  m_UseNoDelay = sConfig.GetBoolDefault ("Network.TcpNodelay", true);
241
242  int num_threads = sConfig.GetIntDefault ("Network.Threads", 1);
243
244  if (num_threads <= 0)
245    {
246      sLog.outError ("Network.Threads is wrong in your config file");
247      return -1;
248    }
249
250  m_NetThreadsCount = static_cast<size_t> (num_threads + 1);
251
252  m_NetThreads = new ReactorRunnable[m_NetThreadsCount];
253
254  sLog.outBasic ("Max alowed socket connections %d",ACE::max_handles ());
255
256  m_SockOutKBuff = sConfig.GetIntDefault ("Network.OutKBuff", -1); // -1 means use default
257
258  m_SockOutUBuff = sConfig.GetIntDefault ("Network.OutUBuff", 65536);
259
260  if ( m_SockOutUBuff <= 0 )
261    {
262      sLog.outError ("Network.OutUBuff is wrong in your config file");
263      return -1;
264    }
265
266  WorldSocket::Acceptor *acc = new WorldSocket::Acceptor;
267  m_Acceptor = acc;
268
269  ACE_INET_Addr listen_addr (port, address);
270
271  if (acc->open (listen_addr, m_NetThreads[0].GetReactor (), ACE_NONBLOCK) == -1)
272    {
273      sLog.outError ("Failed to open acceptor ,check if the port is free");
274      return -1;
275    }
276
277  for (size_t i = 0; i < m_NetThreadsCount; ++i)
278    m_NetThreads[i].Start ();
279
280  return 0;
281}
282
283int
284WorldSocketMgr::StartNetwork (ACE_UINT16 port, const char* address)
285{
286  if (!sLog.IsOutDebug ())
287    ACE_Log_Msg::instance ()->priority_mask (LM_ERROR, ACE_Log_Msg::PROCESS);
288
289  if (this->StartReactiveIO (port, address) == -1)
290    return -1;
291
292  return 0;
293}
294
295void
296WorldSocketMgr::StopNetwork ()
297{
298  if (m_Acceptor)
299    {
300      WorldSocket::Acceptor* acc = dynamic_cast<WorldSocket::Acceptor*> (m_Acceptor);
301
302      if (acc)
303        acc->close ();
304    }
305
306  if (m_NetThreadsCount != 0)
307    {
308      for (size_t i = 0; i < m_NetThreadsCount; ++i)
309        m_NetThreads[i].Stop ();
310    }
311 
312  this->Wait ();
313}
314
315void 
316WorldSocketMgr::Wait ()
317{
318  if (m_NetThreadsCount != 0)
319    {
320      for (size_t i = 0; i < m_NetThreadsCount; ++i)
321        m_NetThreads[i].Wait ();
322    }
323}
324
325int
326WorldSocketMgr::OnSocketOpen (WorldSocket* sock)
327{
328  // set some options here
329  if (m_SockOutKBuff >= 0)
330    if (sock->peer ().set_option (SOL_SOCKET,
331                                  SO_SNDBUF,
332                                  (void*) & m_SockOutKBuff,
333                                  sizeof (int)) == -1 && errno != ENOTSUP)
334      {
335        sLog.outError ("WorldSocketMgr::OnSocketOpen set_option SO_SNDBUF");
336        return -1;
337      }
338
339  static const int ndoption = 1;
340
341  // Set TCP_NODELAY.
342  if (m_UseNoDelay)
343    if (sock->peer ().set_option (ACE_IPPROTO_TCP,
344                                  TCP_NODELAY,
345                                  (void*) & ndoption,
346                                  sizeof (int)) == -1)
347      {
348        sLog.outError ("WorldSocketMgr::OnSocketOpen: peer ().set_option TCP_NODELAY errno = %s", ACE_OS::strerror (errno));
349        return -1;
350      }
351 
352  sock->m_OutBufferSize = static_cast<size_t> (m_SockOutUBuff);
353
354  // we skip the Acceptor Thread
355  size_t min = 1;
356
357  ACE_ASSERT (m_NetThreadsCount >= 1);
358
359  for (size_t i = 1; i < m_NetThreadsCount; ++i)
360    if (m_NetThreads[i].Connections () < m_NetThreads[min].Connections ())
361      min = i;
362
363  return m_NetThreads[min].AddSocket (sock);
364
365}
366
367WorldSocketMgr*
368WorldSocketMgr::Instance ()
369{
370  return ACE_Singleton<WorldSocketMgr,ACE_Thread_Mutex>::instance();
371}
Note: See TracBrowser for help on using the browser.