Show
Ignore:
Timestamp:
11/19/08 13:22:12 (17 years ago)
Author:
yumileroy
Message:

[svn] * Added ACE for Linux and Windows (Thanks Derex for Linux part and partial Windows part)
* Updated to 6721 and 676
* Fixed TrinityScript? logo
* Version updated to 0.2.6721.676

Original author: Neo2003
Date: 2008-10-04 06:17:19-05:00

Files:
1 modified

Legend:

Unmodified
Added
Removed
  • trunk/src/game/WorldSocketMgr.cpp

    r2 r6  
    1717 */ 
    1818 
    19 /** \file 
    20     \ingroup u2w 
    21 */ 
    22  
     19/** \file WorldSocketMgr.cpp 
     20 *  \ingroup u2w 
     21 *  \author Derex <derex101@gmail.com> 
     22 */ 
     23 
     24#include "WorldSocketMgr.h" 
     25 
     26#include <ace/ACE.h> 
     27#include <ace/Log_Msg.h> 
     28#include <ace/Reactor.h> 
     29#include <ace/Reactor_Impl.h> 
     30#include <ace/TP_Reactor.h> 
     31#include <ace/Dev_Poll_Reactor.h> 
     32#include <ace/Guard_T.h> 
     33#include <ace/Atomic_Op.h> 
     34#include <ace/os_include/arpa/os_inet.h> 
     35#include <ace/os_include/netinet/os_tcp.h> 
     36#include <ace/os_include/sys/os_types.h> 
     37#include <ace/os_include/sys/os_socket.h> 
     38 
     39#include <set> 
     40 
     41#include "Log.h" 
    2342#include "Common.h" 
     43#include "Config/ConfigEnv.h" 
     44#include "Database/DatabaseEnv.h" 
    2445#include "WorldSocket.h" 
    25 #include "WorldSocketMgr.h" 
    26 #include "Policies/SingletonImp.h" 
    27  
    28 INSTANTIATE_SINGLETON_1( WorldSocketMgr ); 
    29  
    30 /// WorldSocketMgr constructor 
    31 WorldSocketMgr::WorldSocketMgr() 
    32 { 
    33 } 
    34  
    35 /// Add a WorldSocket to the set 
    36 void WorldSocketMgr::AddSocket(WorldSocket *s) 
    37 { 
    38     m_sockets.insert(s); 
    39 } 
    40  
    41 /// Remove a WorldSocket to the set 
    42 void WorldSocketMgr::RemoveSocket(WorldSocket *s) 
    43 { 
    44     m_sockets.erase(s); 
    45 } 
    46  
    47 /// Triggers an 'update' to all sockets in the set 
    48 void WorldSocketMgr::Update(time_t diff) 
    49 { 
    50     SocketSet::iterator i; 
    51     for(i = m_sockets.begin(); i != m_sockets.end(); i++) 
    52     { 
    53         (*i)->Update(diff); 
    54     } 
    55 } 
     46 
     47/**  
     48 * This is a helper class to WorldSocketMgr ,that manages  
     49 * network threads, and assigning connections from acceptor thread 
     50 * to other network threads 
     51 */ 
     52class ReactorRunnable : protected ACE_Task_Base 
     53{ 
     54public: 
     55 
     56  ReactorRunnable () : 
     57  m_ThreadId (-1), 
     58  m_Connections (0), 
     59  m_Reactor (0) 
     60  { 
     61    ACE_Reactor_Impl* imp = 0; 
     62 
     63#if defined (ACE_HAS_EVENT_POLL) || defined (ACE_HAS_DEV_POLL) 
     64    imp = new ACE_Dev_Poll_Reactor (); 
     65 
     66    imp->max_notify_iterations (128); 
     67    imp->restart (1); 
     68#else 
     69    imp = new ACE_TP_Reactor (); 
     70    imp->max_notify_iterations (128); 
     71#endif 
     72 
     73    m_Reactor = new ACE_Reactor (imp, 1); 
     74  } 
     75 
     76  virtual 
     77  ~ReactorRunnable () 
     78  { 
     79    this->Stop (); 
     80    this->Wait (); 
     81 
     82    if (m_Reactor) 
     83      delete m_Reactor; 
     84  } 
     85 
     86  void 
     87  Stop () 
     88  { 
     89    m_Reactor->end_reactor_event_loop (); 
     90  } 
     91 
     92  int 
     93  Start () 
     94  { 
     95    if (m_ThreadId != -1) 
     96      return -1; 
     97 
     98    return (m_ThreadId = this->activate ()); 
     99  } 
     100 
     101  void 
     102  Wait () 
     103  { 
     104    ACE_Task_Base::wait (); 
     105  } 
     106 
     107  long 
     108  Connections () 
     109  { 
     110    return static_cast<long> (m_Connections.value ()); 
     111  } 
     112 
     113  int 
     114  AddSocket (WorldSocket* sock) 
     115  { 
     116    ACE_GUARD_RETURN (ACE_Thread_Mutex, Guard, m_NewSockets_Lock, -1); 
     117 
     118    ++m_Connections; 
     119    sock->AddReference(); 
     120    sock->reactor (m_Reactor); 
     121    m_NewSockets.insert (sock); 
     122 
     123    return 0; 
     124  } 
     125   
     126  ACE_Reactor* GetReactor () 
     127  { 
     128    return m_Reactor; 
     129  } 
     130   
     131protected: 
     132   
     133  void 
     134  AddNewSockets () 
     135  { 
     136    ACE_GUARD (ACE_Thread_Mutex, Guard, m_NewSockets_Lock); 
     137 
     138    if (m_NewSockets.empty ()) 
     139      return; 
     140 
     141    for (SocketSet::iterator i = m_NewSockets.begin (); i != m_NewSockets.end (); ++i) 
     142      { 
     143        WorldSocket* sock = (*i); 
     144 
     145        if (sock->IsClosed ()) 
     146          { 
     147            sock->RemoveReference (); 
     148            --m_Connections; 
     149          } 
     150        else 
     151          m_Sockets.insert (sock); 
     152      } 
     153 
     154    m_NewSockets.clear (); 
     155  } 
     156 
     157  virtual int 
     158  svc (void) 
     159  { 
     160    DEBUG_LOG ("Network Thread Starting"); 
     161 
     162    WorldDatabase.ThreadStart (); 
     163 
     164    ACE_ASSERT (m_Reactor); 
     165 
     166    SocketSet::iterator i, t; 
     167 
     168    while (!m_Reactor->reactor_event_loop_done ()) 
     169      { 
     170        // dont be too smart to move this outside the loop  
     171        // the run_reactor_event_loop will modify interval 
     172        ACE_Time_Value interval (0, 10000); 
     173 
     174        if (m_Reactor->run_reactor_event_loop (interval) == -1) 
     175          break; 
     176 
     177        AddNewSockets (); 
     178 
     179        for (i = m_Sockets.begin (); i != m_Sockets.end ();) 
     180          { 
     181            if ((*i)->Update () == -1) 
     182              { 
     183                t = i; 
     184                i++; 
     185                (*t)->CloseSocket (); 
     186                (*t)->RemoveReference (); 
     187                --m_Connections; 
     188                m_Sockets.erase (t); 
     189              } 
     190            else 
     191              i++; 
     192          } 
     193      } 
     194 
     195    WorldDatabase.ThreadEnd (); 
     196 
     197    DEBUG_LOG ("Network Thread Exitting"); 
     198 
     199    return 0; 
     200  } 
     201 
     202private: 
     203  typedef ACE_Atomic_Op<ACE_SYNCH_MUTEX, long> AtomicInt; 
     204  typedef std::set<WorldSocket*> SocketSet; 
     205 
     206  ACE_Reactor* m_Reactor; 
     207  AtomicInt m_Connections; 
     208  int m_ThreadId; 
     209 
     210  SocketSet m_Sockets; 
     211 
     212  SocketSet m_NewSockets; 
     213  ACE_Thread_Mutex m_NewSockets_Lock; 
     214}; 
     215 
     216 
     217 
     218WorldSocketMgr::WorldSocketMgr () : 
     219m_NetThreadsCount (0), 
     220m_NetThreads (0), 
     221m_SockOutKBuff (-1), 
     222m_SockOutUBuff (65536), 
     223m_UseNoDelay (true), 
     224m_Acceptor (0) {} 
     225 
     226WorldSocketMgr::~WorldSocketMgr () 
     227{ 
     228  if (m_NetThreads) 
     229    delete [] m_NetThreads; 
     230   
     231  if(m_Acceptor) 
     232    delete m_Acceptor; 
     233} 
     234 
     235int 
     236WorldSocketMgr::StartReactiveIO (ACE_UINT16 port, const char* address) 
     237{ 
     238  m_UseNoDelay = sConfig.GetBoolDefault ("Network.TcpNodelay", true); 
     239 
     240  int num_threads = sConfig.GetIntDefault ("Network.Threads", 1); 
     241 
     242  if (num_threads <= 0) 
     243    { 
     244      sLog.outError ("Network.Threads is wrong in your config file"); 
     245      return -1; 
     246    } 
     247 
     248  m_NetThreadsCount = static_cast<size_t> (num_threads + 1); 
     249 
     250  m_NetThreads = new ReactorRunnable[m_NetThreadsCount]; 
     251 
     252  sLog.outBasic ("Max alowed socket connections %d",ACE::max_handles ()); 
     253 
     254  m_SockOutKBuff = sConfig.GetIntDefault ("Network.OutKBuff", -1); // -1 means use default 
     255 
     256  m_SockOutUBuff = sConfig.GetIntDefault ("Network.OutUBuff", 65536); 
     257 
     258  if ( m_SockOutUBuff <= 0 ) 
     259    { 
     260      sLog.outError ("Network.OutUBuff is wrong in your config file"); 
     261      return -1; 
     262    } 
     263 
     264  WorldSocket::Acceptor *acc = new WorldSocket::Acceptor; 
     265  m_Acceptor = acc; 
     266 
     267  ACE_INET_Addr listen_addr (port, address); 
     268 
     269  if (acc->open (listen_addr, m_NetThreads[0].GetReactor (), ACE_NONBLOCK) == -1) 
     270    { 
     271      sLog.outError ("Failed to open acceptor ,check if the port is free"); 
     272      return -1; 
     273    } 
     274 
     275  for (size_t i = 0; i < m_NetThreadsCount; ++i) 
     276    m_NetThreads[i].Start (); 
     277 
     278  return 0; 
     279} 
     280 
     281int 
     282WorldSocketMgr::StartNetwork (ACE_UINT16 port, const char* address) 
     283{ 
     284  if (!sLog.IsOutDebug ()) 
     285    ACE_Log_Msg::instance ()->priority_mask (LM_ERROR, ACE_Log_Msg::PROCESS); 
     286 
     287  if (this->StartReactiveIO (port, address) == -1) 
     288    return -1; 
     289 
     290  return 0; 
     291} 
     292 
     293void 
     294WorldSocketMgr::StopNetwork () 
     295{ 
     296  if (m_Acceptor) 
     297    { 
     298      WorldSocket::Acceptor* acc = dynamic_cast<WorldSocket::Acceptor*> (m_Acceptor); 
     299 
     300      if (acc) 
     301        acc->close (); 
     302    } 
     303 
     304  if (m_NetThreadsCount != 0) 
     305    { 
     306      for (size_t i = 0; i < m_NetThreadsCount; ++i) 
     307        m_NetThreads[i].Stop (); 
     308    } 
     309   
     310  this->Wait (); 
     311} 
     312 
     313void  
     314WorldSocketMgr::Wait () 
     315{ 
     316  if (m_NetThreadsCount != 0) 
     317    { 
     318      for (size_t i = 0; i < m_NetThreadsCount; ++i) 
     319        m_NetThreads[i].Wait (); 
     320    } 
     321} 
     322 
     323int 
     324WorldSocketMgr::OnSocketOpen (WorldSocket* sock) 
     325{ 
     326  // set some options here 
     327  if (m_SockOutKBuff >= 0) 
     328    if (sock->peer ().set_option (SOL_SOCKET, 
     329                                  SO_SNDBUF, 
     330                                  (void*) & m_SockOutKBuff, 
     331                                  sizeof (int)) == -1 && errno != ENOTSUP) 
     332      { 
     333        sLog.outError ("WorldSocketMgr::OnSocketOpen set_option SO_SNDBUF"); 
     334        return -1; 
     335      } 
     336 
     337  static const int ndoption = 1; 
     338 
     339  // Set TCP_NODELAY. 
     340  if (m_UseNoDelay) 
     341    if (sock->peer ().set_option (ACE_IPPROTO_TCP, 
     342                                  TCP_NODELAY, 
     343                                  (void*) & ndoption, 
     344                                  sizeof (int)) == -1) 
     345      { 
     346        sLog.outError ("WorldSocketMgr::OnSocketOpen: peer ().set_option TCP_NODELAY errno = %s", ACE_OS::strerror (errno)); 
     347        return -1; 
     348      } 
     349   
     350  sock->m_OutBufferSize = static_cast<size_t> (m_SockOutUBuff); 
     351 
     352  // we skip the Acceptor Thread 
     353  size_t min = 1; 
     354 
     355  ACE_ASSERT (m_NetThreadsCount >= 1); 
     356 
     357  for (size_t i = 1; i < m_NetThreadsCount; ++i) 
     358    if (m_NetThreads[i].Connections () < m_NetThreads[min].Connections ()) 
     359      min = i; 
     360 
     361  return m_NetThreads[min].AddSocket (sock); 
     362 
     363  return 0; 
     364} 
     365 
     366WorldSocketMgr* 
     367WorldSocketMgr::Instance () 
     368{ 
     369  return ACE_Singleton<WorldSocketMgr,ACE_Thread_Mutex>::instance(); 
     370}