root/trunk/src/game/WorldSocketMgr.cpp @ 149

Revision 149, 7.5 kB (checked in by yumileroy, 17 years ago)

[svn] *Implement new player conditions CONDITION_NO_AURA, CONDITION_ACTIVE_EVENT
* Default behaviour of pets for creatures changed to REACT_DEFENSIVE
* Disallowed sending wrapped items as COD
* Prevent loading and saving single target auras for pet in same way as already implemented for player
* Correctly limit use some flask types to zones.
* Fixed extracting common.MPQ under *nix
* Many small xleanups and fixes.
** mangos merge rev.

TEST REV so be careful of creepy crawly bugs!

Original author: KingPin?
Date: 2008-11-02 16:53:46-06:00

Line 
1/*
2* Copyright (C) 2005-2008 MaNGOS <http://www.mangosproject.org/>
3*
4* Copyright (C) 2008 Trinity <http://www.trinitycore.org/>
5*
6* This program is free software; you can redistribute it and/or modify
7* it under the terms of the GNU General Public License as published by
8* the Free Software Foundation; either version 2 of the License, or
9* (at your option) any later version.
10*
11* This program is distributed in the hope that it will be useful,
12* but WITHOUT ANY WARRANTY; without even the implied warranty of
13* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14* GNU General Public License for more details.
15*
16* You should have received a copy of the GNU General Public License
17* along with this program; if not, write to the Free Software
18* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19*/
20
21/** \file WorldSocketMgr.cpp
22*  \ingroup u2w
23*  \author Derex <derex101@gmail.com>
24*/
25
26#include "WorldSocketMgr.h"
27
28#include <ace/ACE.h>
29#include <ace/Log_Msg.h>
30#include <ace/Reactor.h>
31#include <ace/Reactor_Impl.h>
32#include <ace/TP_Reactor.h>
33#include <ace/Dev_Poll_Reactor.h>
34#include <ace/Guard_T.h>
35#include <ace/Atomic_Op.h>
36#include <ace/os_include/arpa/os_inet.h>
37#include <ace/os_include/netinet/os_tcp.h>
38#include <ace/os_include/sys/os_types.h>
39#include <ace/os_include/sys/os_socket.h>
40
41#include <set>
42
43#include "Log.h"
44#include "Common.h"
45#include "Config/ConfigEnv.h"
46#include "Database/DatabaseEnv.h"
47#include "WorldSocket.h"
48
49/**
50* This is a helper class to WorldSocketMgr ,that manages
51* network threads, and assigning connections from acceptor thread
52* to other network threads
53*/
54class ReactorRunnable : protected ACE_Task_Base
55{
56public:
57
58        ReactorRunnable () :
59          m_ThreadId (-1),
60                  m_Connections (0),
61                  m_Reactor (0)
62          {
63                  ACE_Reactor_Impl* imp = 0;
64
65#if defined (ACE_HAS_EVENT_POLL) || defined (ACE_HAS_DEV_POLL)
66
67                  imp = new ACE_Dev_Poll_Reactor ();
68
69                  imp->max_notify_iterations (128);
70                  imp->restart (1);
71
72#else
73
74                  imp = new ACE_TP_Reactor ();
75                  imp->max_notify_iterations (128);
76
77#endif
78
79                  m_Reactor = new ACE_Reactor (imp, 1);
80          }
81
82          virtual ~ReactorRunnable ()
83          {
84                  this->Stop ();
85                  this->Wait ();
86
87                  if (m_Reactor)
88                          delete m_Reactor;
89          }
90
91          void Stop ()
92          {
93                  m_Reactor->end_reactor_event_loop ();
94          }
95
96          int Start ()
97          {
98                  if (m_ThreadId != -1)
99                          return -1;
100
101                  return (m_ThreadId = this->activate ());
102          }
103
104          void Wait ()
105          {
106                  ACE_Task_Base::wait ();
107          }
108
109          long Connections ()
110          {
111                  return static_cast<long> (m_Connections.value ());
112          }
113
114          int AddSocket (WorldSocket* sock)
115          {
116                  ACE_GUARD_RETURN (ACE_Thread_Mutex, Guard, m_NewSockets_Lock, -1);
117
118                  ++m_Connections;
119                  sock->AddReference();
120                  sock->reactor (m_Reactor);
121                  m_NewSockets.insert (sock);
122
123                  return 0;
124          }
125
126          ACE_Reactor* GetReactor ()
127          {
128                  return m_Reactor;
129          }
130
131protected:
132
133        void AddNewSockets ()
134        {
135                ACE_GUARD (ACE_Thread_Mutex, Guard, m_NewSockets_Lock);
136
137                if (m_NewSockets.empty ())
138                        return;
139
140                for (SocketSet::iterator i = m_NewSockets.begin (); i != m_NewSockets.end (); ++i)
141                {
142                        WorldSocket* sock = (*i);
143
144                        if (sock->IsClosed ())
145                        {
146                                sock->RemoveReference ();
147                                --m_Connections;
148                        }
149                        else
150                                m_Sockets.insert (sock);
151                }
152
153                m_NewSockets.clear ();
154        }
155
156        virtual int svc (void)
157        {
158                DEBUG_LOG ("Network Thread Starting");
159
160                WorldDatabase.ThreadStart ();
161
162                ACE_ASSERT (m_Reactor);
163
164                SocketSet::iterator i, t;
165
166                while (!m_Reactor->reactor_event_loop_done ())
167                {
168                        // dont be too smart to move this outside the loop
169                        // the run_reactor_event_loop will modify interval
170                        ACE_Time_Value interval (0, 10000);
171
172                        if (m_Reactor->run_reactor_event_loop (interval) == -1)
173                                break;
174
175                        AddNewSockets ();
176
177                        for (i = m_Sockets.begin (); i != m_Sockets.end ();)
178                        {
179                                if ((*i)->Update () == -1)
180                                {
181                                        t = i;
182                                        i++;
183                                        (*t)->CloseSocket ();
184                                        (*t)->RemoveReference ();
185                                        --m_Connections;
186                                        m_Sockets.erase (t);
187                                }
188                                else
189                                        i++;
190                        }
191                }
192
193                WorldDatabase.ThreadEnd ();
194
195                DEBUG_LOG ("Network Thread Exitting");
196
197                return 0;
198        }
199
200private:
201        typedef ACE_Atomic_Op<ACE_SYNCH_MUTEX, long> AtomicInt;
202        typedef std::set<WorldSocket*> SocketSet;
203
204        ACE_Reactor* m_Reactor;
205        AtomicInt m_Connections;
206        int m_ThreadId;
207
208        SocketSet m_Sockets;
209
210        SocketSet m_NewSockets;
211        ACE_Thread_Mutex m_NewSockets_Lock;
212};
213
214
215
216WorldSocketMgr::WorldSocketMgr () :
217m_NetThreadsCount (0),
218m_NetThreads (0),
219m_SockOutKBuff (-1),
220m_SockOutUBuff (65536),
221m_UseNoDelay (true),
222m_Acceptor (0) {}
223
224WorldSocketMgr::~WorldSocketMgr ()
225{
226        if (m_NetThreads)
227                delete [] m_NetThreads;
228
229        if(m_Acceptor)
230                delete m_Acceptor;
231}
232
233int WorldSocketMgr::StartReactiveIO (ACE_UINT16 port, const char* address)
234{
235        m_UseNoDelay = sConfig.GetBoolDefault ("Network.TcpNodelay", true);
236
237        int num_threads = sConfig.GetIntDefault ("Network.Threads", 1);
238
239        if (num_threads <= 0)
240        {
241                sLog.outError ("Network.Threads is wrong in your config file");
242                return -1;
243        }
244
245        m_NetThreadsCount = static_cast<size_t> (num_threads + 1);
246
247        m_NetThreads = new ReactorRunnable[m_NetThreadsCount];
248
249        sLog.outBasic ("Max alowed socket connections %d",ACE::max_handles ());
250
251        m_SockOutKBuff = sConfig.GetIntDefault ("Network.OutKBuff", -1); // -1 means use default
252
253        m_SockOutUBuff = sConfig.GetIntDefault ("Network.OutUBuff", 65536);
254
255        if ( m_SockOutUBuff <= 0 )
256        {
257                sLog.outError ("Network.OutUBuff is wrong in your config file");
258                return -1;
259        }
260
261        WorldSocket::Acceptor *acc = new WorldSocket::Acceptor;
262        m_Acceptor = acc;
263
264        ACE_INET_Addr listen_addr (port, address);
265
266        if (acc->open (listen_addr, m_NetThreads[0].GetReactor (), ACE_NONBLOCK) == -1)
267        {
268                sLog.outError ("Failed to open acceptor ,check if the port is free");
269                return -1;
270        }
271
272        for (size_t i = 0; i < m_NetThreadsCount; ++i)
273                m_NetThreads[i].Start ();
274
275        return 0;
276}
277
278int WorldSocketMgr::StartNetwork (ACE_UINT16 port, const char* address)
279{
280        if (!sLog.IsOutDebug ())
281                ACE_Log_Msg::instance ()->priority_mask (LM_ERROR, ACE_Log_Msg::PROCESS);
282
283        if (this->StartReactiveIO (port, address) == -1)
284                return -1;
285
286        return 0;
287}
288
289void WorldSocketMgr::StopNetwork ()
290{
291        if (m_Acceptor)
292        {
293                WorldSocket::Acceptor* acc = dynamic_cast<WorldSocket::Acceptor*> (m_Acceptor);
294
295                if (acc)
296                        acc->close ();
297        }
298
299        if (m_NetThreadsCount != 0)
300        {
301                for (size_t i = 0; i < m_NetThreadsCount; ++i)
302                        m_NetThreads[i].Stop ();
303        }
304
305        this->Wait ();
306}
307
308void WorldSocketMgr::Wait ()
309{
310        if (m_NetThreadsCount != 0)
311        {
312                for (size_t i = 0; i < m_NetThreadsCount; ++i)
313                        m_NetThreads[i].Wait ();
314        }
315}
316
317int WorldSocketMgr::OnSocketOpen (WorldSocket* sock)
318{
319        // set some options here
320        if (m_SockOutKBuff >= 0)
321                if (sock->peer ().set_option (SOL_SOCKET,
322                        SO_SNDBUF,
323                        (void*) & m_SockOutKBuff,
324                        sizeof (int)) == -1 && errno != ENOTSUP)
325                {
326                        sLog.outError ("WorldSocketMgr::OnSocketOpen set_option SO_SNDBUF");
327                        return -1;
328                }
329
330                static const int ndoption = 1;
331
332                // Set TCP_NODELAY.
333                if (m_UseNoDelay)
334                        if (sock->peer ().set_option (ACE_IPPROTO_TCP,
335                                TCP_NODELAY,
336                                (void*) & ndoption,
337                                sizeof (int)) == -1)
338                        {
339                                sLog.outError ("WorldSocketMgr::OnSocketOpen: peer ().set_option TCP_NODELAY errno = %s", ACE_OS::strerror (errno));
340                                return -1;
341                        }
342
343                        sock->m_OutBufferSize = static_cast<size_t> (m_SockOutUBuff);
344
345                        // we skip the Acceptor Thread
346                        size_t min = 1;
347
348                        ACE_ASSERT (m_NetThreadsCount >= 1);
349
350                        for (size_t i = 1; i < m_NetThreadsCount; ++i)
351                                if (m_NetThreads[i].Connections () < m_NetThreads[min].Connections ())
352                                        min = i;
353
354                        return m_NetThreads[min].AddSocket (sock);
355}
356
357WorldSocketMgr* WorldSocketMgr::Instance ()
358{
359        return ACE_Singleton<WorldSocketMgr,ACE_Thread_Mutex>::instance();
360}
Note: See TracBrowser for help on using the browser.