root/trunk/dep/src/sockets/SocketHandler.cpp @ 2

Revision 2, 31.6 kB (checked in by yumileroy, 17 years ago)

[svn] * Proper SVN structure

Original author: Neo2003
Date: 2008-10-02 16:23:55-05:00

Line 
1/** \file SocketHandler.cpp
2 **     \date  2004-02-13
3 **     \author grymse@alhem.net
4**/
5/*
6Copyright (C) 2004-2007  Anders Hedstrom
7
8This library is made available under the terms of the GNU GPL.
9
10If you would like to use this library in a closed-source application,
11a separate license agreement is available. For information about
12the closed-source license agreement for the C++ sockets library,
13please visit http://www.alhem.net/Sockets/license.html and/or
14email license@alhem.net.
15
16This program is free software; you can redistribute it and/or
17modify it under the terms of the GNU General Public License
18as published by the Free Software Foundation; either version 2
19of the License, or (at your option) any later version.
20
21This program is distributed in the hope that it will be useful,
22but WITHOUT ANY WARRANTY; without even the implied warranty of
23MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
24GNU General Public License for more details.
25
26You should have received a copy of the GNU General Public License
27along with this program; if not, write to the Free Software
28Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
29*/
30#ifdef _WIN32
31#ifdef _MSC_VER
32#pragma warning(disable:4786)
33#endif
34#endif
35#include <stdlib.h>
36#include <errno.h>
37
38#include "SocketHandler.h"
39#include "UdpSocket.h"
40#include "ResolvSocket.h"
41#include "ResolvServer.h"
42#include "TcpSocket.h"
43#include "Mutex.h"
44#include "Utility.h"
45#include "SocketAddress.h"
46
47#ifdef SOCKETS_NAMESPACE
48namespace SOCKETS_NAMESPACE {
49#endif
50
51
52//#ifdef _DEBUG
53//#define DEB(x) x; fflush(stderr);
54//#else
55#define DEB(x)
56//#endif
57
58
59SocketHandler::SocketHandler(StdLog *p)
60:m_stdlog(p)
61,m_mutex(m_mutex)
62,m_b_use_mutex(false)
63,m_maxsock(0)
64,m_preverror(-1)
65,m_errcnt(0)
66,m_tlast(0)
67#ifdef ENABLE_SOCKS4
68,m_socks4_host(0)
69,m_socks4_port(0)
70,m_bTryDirect(false)
71#endif
72#ifdef ENABLE_RESOLVER
73,m_resolv_id(0)
74,m_resolver(NULL)
75#endif
76#ifdef ENABLE_POOL
77,m_b_enable_pool(false)
78#endif
79#ifdef ENABLE_TRIGGERS
80,m_next_trigger_id(0)
81#endif
82#ifdef ENABLE_DETACH
83,m_slave(false)
84#endif
85{
86        FD_ZERO(&m_rfds);
87        FD_ZERO(&m_wfds);
88        FD_ZERO(&m_efds);
89}
90
91
92SocketHandler::SocketHandler(Mutex& mutex,StdLog *p)
93:m_stdlog(p)
94,m_mutex(mutex)
95,m_b_use_mutex(true)
96,m_maxsock(0)
97,m_preverror(-1)
98,m_errcnt(0)
99,m_tlast(0)
100#ifdef ENABLE_SOCKS4
101,m_socks4_host(0)
102,m_socks4_port(0)
103,m_bTryDirect(false)
104#endif
105#ifdef ENABLE_RESOLVER
106,m_resolv_id(0)
107,m_resolver(NULL)
108#endif
109#ifdef ENABLE_POOL
110,m_b_enable_pool(false)
111#endif
112#ifdef ENABLE_TRIGGERS
113,m_next_trigger_id(0)
114#endif
115#ifdef ENABLE_DETACH
116,m_slave(false)
117#endif
118{
119        m_mutex.Lock();
120        FD_ZERO(&m_rfds);
121        FD_ZERO(&m_wfds);
122        FD_ZERO(&m_efds);
123}
124
125
126SocketHandler::~SocketHandler()
127{
128#ifdef ENABLE_RESOLVER
129        if (m_resolver)
130        {
131                m_resolver -> Quit();
132        }
133#endif
134        {
135                while (m_sockets.size())
136                {
137DEB(                    fprintf(stderr, "Emptying sockets list in SocketHandler destructor, %d instances\n", (int)m_sockets.size());)
138                        socket_m::iterator it = m_sockets.begin();
139                        Socket *p = it -> second;
140                        if (p)
141                        {
142DEB(                            fprintf(stderr, "  fd %d\n", p -> GetSocket());)
143                                p -> Close();
144DEB(                            fprintf(stderr, "  fd closed %d\n", p -> GetSocket());)
145//                              p -> OnDelete(); // hey, I turn this back on. what's the worst that could happen??!!
146                                // MinionSocket breaks, calling MinderHandler methods in OnDelete -
147                                // MinderHandler is already gone when that happens...
148
149                                // only delete socket when controlled
150                                // ie master sockethandler can delete non-detached sockets
151                                // and a slave sockethandler can only delete a detach socket
152                                if (p -> DeleteByHandler()
153#ifdef ENABLE_DETACH
154                                        && !(m_slave ^ p -> IsDetached()) 
155#endif
156                                        )
157                                {
158                                        p -> SetErasedByHandler();
159                                        delete p;
160                                }
161                                m_sockets.erase(it);
162                        }
163                        else
164                        {
165                                m_sockets.erase(it);
166                        }
167DEB(                    fprintf(stderr, "next\n");)
168                }
169DEB(            fprintf(stderr, "/Emptying sockets list in SocketHandler destructor, %d instances\n", (int)m_sockets.size());)
170        }
171#ifdef ENABLE_RESOLVER
172        if (m_resolver)
173        {
174                delete m_resolver;
175        }
176#endif
177        if (m_b_use_mutex)
178        {
179                m_mutex.Unlock();
180        }
181}
182
183
184Mutex& SocketHandler::GetMutex() const
185{
186        return m_mutex; 
187}
188
189
190#ifdef ENABLE_DETACH
191void SocketHandler::SetSlave(bool x)
192{
193        m_slave = x;
194}
195
196
197bool SocketHandler::IsSlave()
198{
199        return m_slave;
200}
201#endif
202
203
204void SocketHandler::RegStdLog(StdLog *log)
205{
206        m_stdlog = log;
207}
208
209
210void SocketHandler::LogError(Socket *p,const std::string& user_text,int err,const std::string& sys_err,loglevel_t t)
211{
212        if (m_stdlog)
213        {
214                m_stdlog -> error(this, p, user_text, err, sys_err, t);
215        }
216}
217
218
219void SocketHandler::Add(Socket *p)
220{
221        if (p -> GetSocket() == INVALID_SOCKET)
222        {
223                LogError(p, "Add", -1, "Invalid socket", LOG_LEVEL_WARNING);
224                if (p -> CloseAndDelete())
225                {
226                        m_delete.push_back(p);
227                }
228                return;
229        }
230        if (m_add.find(p -> GetSocket()) != m_add.end())
231        {
232                LogError(p, "Add", (int)p -> GetSocket(), "Attempt to add socket already in add queue", LOG_LEVEL_FATAL);
233                m_delete.push_back(p);
234                return;
235        }
236        m_add[p -> GetSocket()] = p;
237}
238
239
240void SocketHandler::Get(SOCKET s,bool& r,bool& w,bool& e)
241{
242        if (s >= 0)
243        {
244                r = FD_ISSET(s, &m_rfds) ? true : false;
245                w = FD_ISSET(s, &m_wfds) ? true : false;
246                e = FD_ISSET(s, &m_efds) ? true : false;
247        }
248}
249
250
251void SocketHandler::Set(SOCKET s,bool bRead,bool bWrite,bool bException)
252{
253DEB(    fprintf(stderr, "Set(%d, %s, %s, %s)\n", s, bRead ? "true" : "false", bWrite ? "true" : "false", bException ? "true" : "false");)
254        if (s >= 0)
255        {
256                if (bRead)
257                {
258                        if (!FD_ISSET(s, &m_rfds))
259                        {
260                                FD_SET(s, &m_rfds);
261                        }
262                }
263                else
264                {
265                        FD_CLR(s, &m_rfds);
266                }
267                if (bWrite)
268                {
269                        if (!FD_ISSET(s, &m_wfds))
270                        {
271                                FD_SET(s, &m_wfds);
272                        }
273                }
274                else
275                {
276                        FD_CLR(s, &m_wfds);
277                }
278                if (bException)
279                {
280                        if (!FD_ISSET(s, &m_efds))
281                        {
282                                FD_SET(s, &m_efds);
283                        }
284                }
285                else
286                {
287                        FD_CLR(s, &m_efds);
288                }
289        }
290}
291
292
293int SocketHandler::Select(long sec,long usec)
294{
295        struct timeval tv;
296        tv.tv_sec = sec;
297        tv.tv_usec = usec;
298        return Select(&tv);
299}
300
301
302int SocketHandler::Select()
303{
304        if (!m_fds_callonconnect.empty() ||
305#ifdef ENABLE_DETACH
306                (!m_slave && !m_fds_detach.empty()) ||
307#endif
308                !m_fds_timeout.empty() ||
309                !m_fds_retry.empty() ||
310                !m_fds_close.empty() ||
311                !m_fds_erase.empty())
312        {
313                return Select(0, 200000);
314        }
315        return Select(NULL);
316}
317
318
319int SocketHandler::Select(struct timeval *tsel)
320{
321        size_t ignore = 0;
322        while (m_add.size() > ignore)
323        {
324                if (m_sockets.size() >= FD_SETSIZE)
325                {
326                        LogError(NULL, "Select", (int)m_sockets.size(), "FD_SETSIZE reached", LOG_LEVEL_WARNING);
327                        break;
328                }
329                socket_m::iterator it = m_add.begin();
330                SOCKET s = it -> first;
331                Socket *p = it -> second;
332DEB(            fprintf(stderr, "Trying to add fd %d,  m_add.size() %d,  ignore %d\n", (int)s, (int)m_add.size(), (int)ignore);)
333                //
334                if (m_sockets.find(p -> GetSocket()) != m_sockets.end())
335                {
336                        LogError(p, "Add", (int)p -> GetSocket(), "Attempt to add socket already in controlled queue", LOG_LEVEL_FATAL);
337                        // %! it's a dup, don't add to delete queue, just ignore it
338                        m_delete.push_back(p);
339                        m_add.erase(it);
340//                      ignore++;
341                        continue;
342                }
343                if (!p -> CloseAndDelete())
344                {
345                        StreamSocket *scp = dynamic_cast<StreamSocket *>(p);
346                        if (scp && scp -> Connecting()) // 'Open' called before adding socket
347                        {
348                                Set(s,false,true);
349                        }
350                        else
351                        {
352                                TcpSocket *tcp = dynamic_cast<TcpSocket *>(p);
353                                bool bWrite = tcp ? tcp -> GetOutputLength() != 0 : false;
354                                if (p -> IsDisableRead())
355                                {
356                                        Set(s, false, bWrite);
357                                }
358                                else
359                                {
360                                        Set(s, true, bWrite);
361                                }
362                        }
363                        m_maxsock = (s > m_maxsock) ? s : m_maxsock;
364                }
365                else
366                {
367                        LogError(p, "Add", (int)p -> GetSocket(), "Trying to add socket with SetCloseAndDelete() true", LOG_LEVEL_WARNING);
368                }
369                // only add to m_fds (process fd_set events) if
370                //  slave handler and detached/detaching socket
371                //  master handler and non-detached socket
372#ifdef ENABLE_DETACH
373                if (!(m_slave ^ p -> IsDetach()))
374#endif
375                {
376                        m_fds.push_back(s);
377                }
378                m_sockets[s] = p;
379                //
380                m_add.erase(it);
381        }
382#ifdef MACOSX
383        fd_set rfds;
384        fd_set wfds;
385        fd_set efds;
386        FD_COPY(&m_rfds, &rfds);
387        FD_COPY(&m_wfds, &wfds);
388        FD_COPY(&m_efds, &efds);
389#else
390        fd_set rfds = m_rfds;
391        fd_set wfds = m_wfds;
392        fd_set efds = m_efds;
393#endif
394        int n;
395        if (m_b_use_mutex)
396        {
397                m_mutex.Unlock();
398                n = select( (int)(m_maxsock + 1),&rfds,&wfds,&efds,tsel);
399                m_mutex.Lock();
400        }
401        else
402        {
403                n = select( (int)(m_maxsock + 1),&rfds,&wfds,&efds,tsel);
404        }
405        if (n == -1)
406        {
407                /*
408                        EBADF  An invalid file descriptor was given in one of the sets.
409                        EINTR  A non blocked signal was caught.
410                        EINVAL n is negative. Or struct timeval contains bad time values (<0).
411                        ENOMEM select was unable to allocate memory for internal tables.
412                */
413                if (Errno != m_preverror || m_errcnt++ % 10000 == 0)
414                {
415                        LogError(NULL, "select", Errno, StrError(Errno));
416DEB(                    fprintf(stderr, "m_maxsock: %d\n", m_maxsock);
417                        fprintf(stderr, "%s\n", Errno == EINVAL ? "EINVAL" :
418                                Errno == EINTR ? "EINTR" :
419                                Errno == EBADF ? "EBADF" :
420                                Errno == ENOMEM ? "ENOMEM" : "<another>");
421                        // test bad fd
422                        for (SOCKET i = 0; i <= m_maxsock; i++)
423                        {
424                                bool t = false;
425                                FD_ZERO(&rfds);
426                                FD_ZERO(&wfds);
427                                FD_ZERO(&efds);
428                                if (FD_ISSET(i, &m_rfds))
429                                {
430                                        FD_SET(i, &rfds);
431                                        t = true;
432                                }
433                                if (FD_ISSET(i, &m_wfds))
434                                {
435                                        FD_SET(i, &wfds);
436                                        t = true;
437                                }
438                                if (FD_ISSET(i, &m_efds))
439                                {
440                                        FD_SET(i, &efds);
441                                        t = true;
442                                }
443                                if (t && m_sockets.find(i) == m_sockets.end())
444                                {
445                                        fprintf(stderr, "Bad fd in fd_set: %d\n", i);
446                                }
447                        }
448) // DEB
449                        m_preverror = Errno;
450                }
451                /// \todo rebuild fd_set's from active sockets list (m_sockets) here
452        }
453        else
454        if (!n)
455        {
456                m_preverror = -1;
457        }
458        else
459        if (n > 0)
460        {
461                for (socket_v::iterator it2 = m_fds.begin(); it2 != m_fds.end() && n; it2++)
462                {
463                        SOCKET i = *it2;
464                        if (FD_ISSET(i, &rfds))
465                        {
466                                socket_m::iterator itmp = m_sockets.find(i);
467                                if (itmp != m_sockets.end()) // found
468                                {
469                                        Socket *p = itmp -> second;
470                                        // new SSL negotiate method
471#ifdef HAVE_OPENSSL
472                                        if (p -> IsSSLNegotiate())
473                                        {
474                                                p -> SSLNegotiate();
475                                        }
476                                        else
477#endif
478                                        {
479                                                p -> OnRead();
480                                        }
481                                }
482                                else
483                                {
484                                        LogError(NULL, "GetSocket/handler/1", (int)i, "Did not find expected socket using file descriptor", LOG_LEVEL_WARNING);
485                                }
486                                n--;
487                        }
488                        if (FD_ISSET(i, &wfds))
489                        {
490                                socket_m::iterator itmp = m_sockets.find(i);
491                                if (itmp != m_sockets.end()) // found
492                                {
493                                        Socket *p = itmp -> second;
494                                        // new SSL negotiate method
495#ifdef HAVE_OPENSSL
496                                        if (p -> IsSSLNegotiate())
497                                        {
498                                                p -> SSLNegotiate();
499                                        }
500                                        else
501#endif
502                                        {
503                                                p -> OnWrite();
504                                        }
505                                }
506                                else
507                                {
508                                        LogError(NULL, "GetSocket/handler/2", (int)i, "Did not find expected socket using file descriptor", LOG_LEVEL_WARNING);
509                                }
510                                n--;
511                        }
512                        if (FD_ISSET(i, &efds))
513                        {
514                                socket_m::iterator itmp = m_sockets.find(i);
515                                if (itmp != m_sockets.end()) // found
516                                {
517                                        Socket *p = itmp -> second;
518                                        p -> OnException();
519                                }
520                                else
521                                {
522                                        LogError(NULL, "GetSocket/handler/3", (int)i, "Did not find expected socket using file descriptor", LOG_LEVEL_WARNING);
523                                }
524                                n--;
525                        }
526                } // m_fds loop
527                m_preverror = -1;
528        } // if (n > 0)
529
530        // check CallOnConnect - EVENT
531        if (!m_fds_callonconnect.empty())
532        {
533                socket_v tmp = m_fds_callonconnect;
534                for (socket_v::iterator it = tmp.begin(); it != tmp.end(); it++)
535                {
536                        Socket *p = NULL;
537                        {
538                                socket_m::iterator itmp = m_sockets.find(*it);
539                                if (itmp != m_sockets.end()) // found
540                                {
541                                        p = itmp -> second;
542                                }
543                                else
544                                {
545                                        LogError(NULL, "GetSocket/handler/4", (int)*it, "Did not find expected socket using file descriptor", LOG_LEVEL_WARNING);
546                                }
547                        }
548                        if (p)
549                        {
550//                              if (p -> CallOnConnect() && p -> Ready() )
551                                {
552                                        p -> SetConnected(); // moved here from inside if (tcp) check below
553#ifdef HAVE_OPENSSL
554                                        if (p -> IsSSL()) // SSL Enabled socket
555                                                p -> OnSSLConnect();
556                                        else
557#endif
558#ifdef ENABLE_SOCKS4
559                                        if (p -> Socks4())
560                                                p -> OnSocks4Connect();
561                                        else
562#endif
563                                        {
564                                                TcpSocket *tcp = dynamic_cast<TcpSocket *>(p);
565                                                if (tcp)
566                                                {
567                                                        if (tcp -> GetOutputLength())
568                                                        {
569                                                                p -> OnWrite();
570                                                        }
571                                                }
572#ifdef ENABLE_RECONNECT
573                                                if (tcp && tcp -> IsReconnect())
574                                                        p -> OnReconnect();
575                                                else
576#endif
577                                                {
578//                                                      LogError(p, "Calling OnConnect", 0, "Because CallOnConnect", LOG_LEVEL_INFO);
579                                                        p -> OnConnect();
580                                                }
581                                        }
582//                                      p -> SetCallOnConnect( false );
583                                        AddList(p -> GetSocket(), LIST_CALLONCONNECT, false);
584                                }
585                        }
586                }
587        }
588#ifdef ENABLE_DETACH
589        // check detach of socket if master handler - EVENT
590        if (!m_slave && !m_fds_detach.empty())
591        {
592                // %! why not using tmp list here??!?
593                for (socket_v::iterator it = m_fds_detach.begin(); it != m_fds_detach.end(); it++)
594                {
595                        Socket *p = NULL;
596                        {
597                                socket_m::iterator itmp = m_sockets.find(*it);
598                                if (itmp != m_sockets.end()) // found
599                                {
600                                        p = itmp -> second;
601                                }
602                                else
603                                {
604                                        LogError(NULL, "GetSocket/handler/5", (int)*it, "Did not find expected socket using file descriptor", LOG_LEVEL_WARNING);
605                                }
606                        }
607                        if (p)
608                        {
609//                              if (p -> IsDetach())
610                                {
611                                        Set(p -> GetSocket(), false, false, false);
612                                        // After DetachSocket(), all calls to Handler() will return a reference
613                                        // to the new slave SocketHandler running in the new thread.
614                                        p -> DetachSocket();
615                                        // Adding the file descriptor to m_fds_erase will now also remove the
616                                        // socket from the detach queue - tnx knightmad
617                                        m_fds_erase.push_back(p -> GetSocket());
618                                }
619                        }
620                }
621        }
622#endif
623        // check Connecting - connection timeout - conditional event
624        if (m_fds_timeout.size())
625        {
626                time_t tnow = time(NULL);
627                if (tnow != m_tlast)
628                {
629                        socket_v tmp = m_fds_timeout;
630DEB(                    fprintf(stderr, "Checking %d socket(s) for timeout\n", tmp.size());)
631                        for (socket_v::iterator it = tmp.begin(); it != tmp.end(); it++)
632                        {
633                                Socket *p = NULL;
634                                {
635                                        socket_m::iterator itmp = m_sockets.find(*it);
636                                        if (itmp != m_sockets.end()) // found
637                                        {
638                                                p = itmp -> second;
639                                        }
640                                        else
641                                        {
642                                                itmp = m_add.find(*it);
643                                                if (itmp != m_add.end())
644                                                {
645                                                        p = itmp -> second;
646                                                }
647                                                else
648                                                {
649                                                        LogError(NULL, "GetSocket/handler/6", (int)*it, "Did not find expected socket using file descriptor", LOG_LEVEL_WARNING);
650                                                }
651                                        }
652                                }
653                                if (p)
654                                {
655                                        if (p -> Timeout(tnow))
656                                        {
657                                                StreamSocket *scp = dynamic_cast<StreamSocket *>(p);
658                                                if (scp && scp -> Connecting())
659                                                        p -> OnConnectTimeout();
660                                                else
661                                                        p -> OnTimeout();
662                                                p -> SetTimeout(0);
663                                        }
664                                }
665                        }
666                        m_tlast = tnow;
667                } // tnow != tlast
668        }
669        // check retry client connect - EVENT
670        if (!m_fds_retry.empty())
671        {
672                socket_v tmp = m_fds_retry;
673                for (socket_v::iterator it = tmp.begin(); it != tmp.end(); it++)
674                {
675                        Socket *p = NULL;
676                        {
677                                socket_m::iterator itmp = m_sockets.find(*it);
678                                if (itmp != m_sockets.end()) // found
679                                {
680                                        p = itmp -> second;
681                                }
682                                else
683                                {
684                                        LogError(NULL, "GetSocket/handler/7", (int)*it, "Did not find expected socket using file descriptor", LOG_LEVEL_WARNING);
685                                }
686                        }
687                        if (p)
688                        {
689//                              if (p -> RetryClientConnect())
690                                {
691                                        TcpSocket *tcp = dynamic_cast<TcpSocket *>(p);
692                                        SOCKET nn = *it; //(*it3).first;
693                                        tcp -> SetRetryClientConnect(false);
694DEB(                                    fprintf(stderr, "Close() before retry client connect\n");)
695                                        p -> Close(); // removes from m_fds_retry
696                                        std::auto_ptr<SocketAddress> ad = p -> GetClientRemoteAddress();
697                                        if (ad.get())
698                                        {
699                                                tcp -> Open(*ad);
700                                        }
701                                        else
702                                        {
703                                                LogError(p, "RetryClientConnect", 0, "no address", LOG_LEVEL_ERROR);
704                                        }
705                                        Add(p);
706                                        m_fds_erase.push_back(nn);
707                                }
708                        }
709                }
710        }
711        // check close and delete - conditional event
712        if (!m_fds_close.empty())
713        {
714                socket_v tmp = m_fds_close;
715DEB(            fprintf(stderr, "m_fds_close.size() == %d\n", (int)m_fds_close.size());)
716                for (socket_v::iterator it = tmp.begin(); it != tmp.end(); it++)
717                {
718                        Socket *p = NULL;
719                        {
720                                socket_m::iterator itmp = m_sockets.find(*it);
721                                if (itmp != m_sockets.end()) // found
722                                {
723                                        p = itmp -> second;
724                                }
725                                else
726                                {
727                                        itmp = m_add.find(*it);
728                                        if (itmp != m_add.end())
729                                        {
730                                                p = itmp -> second;
731                                        }
732                                        else
733                                        {
734                                                LogError(NULL, "GetSocket/handler/8", (int)*it, "Did not find expected socket using file descriptor", LOG_LEVEL_WARNING);
735                                        }
736                                }
737                        }
738                        if (p)
739                        {
740//                              if (p -> CloseAndDelete() )
741                                {
742                                        TcpSocket *tcp = dynamic_cast<TcpSocket *>(p);
743                                        // new graceful tcp - flush and close timeout 5s
744                                        if (tcp && p -> IsConnected() && tcp -> GetFlushBeforeClose() && 
745#ifdef HAVE_OPENSSL
746                                                !tcp -> IsSSL() && 
747#endif
748                                                p -> TimeSinceClose() < 5)
749                                        {
750DEB(                                            fprintf(stderr, " close(1)\n");)
751                                                if (tcp -> GetOutputLength())
752                                                {
753                                                        LogError(p, "Closing", (int)tcp -> GetOutputLength(), "Sending all data before closing", LOG_LEVEL_INFO);
754                                                }
755                                                else // shutdown write when output buffer is empty
756                                                if (!(tcp -> GetShutdown() & SHUT_WR))
757                                                {
758                                                        SOCKET nn = *it;
759                                                        if (nn != INVALID_SOCKET && shutdown(nn, SHUT_WR) == -1)
760                                                        {
761                                                                LogError(p, "graceful shutdown", Errno, StrError(Errno), LOG_LEVEL_ERROR);
762                                                        }
763                                                        tcp -> SetShutdown(SHUT_WR);
764                                                }
765                                        }
766                                        else
767#ifdef ENABLE_RECONNECT
768                                        if (tcp && p -> IsConnected() && tcp -> Reconnect())
769                                        {
770                                                SOCKET nn = *it; //(*it3).first;
771DEB(                                            fprintf(stderr, " close(2) fd %d\n", nn);)
772                                                p -> SetCloseAndDelete(false);
773                                                tcp -> SetIsReconnect();
774                                                p -> SetConnected(false);
775DEB(                                            fprintf(stderr, "Close() before reconnect\n");)
776                                                p -> Close(); // dispose of old file descriptor (Open creates a new)
777                                                p -> OnDisconnect();
778                                                std::auto_ptr<SocketAddress> ad = p -> GetClientRemoteAddress();
779                                                if (ad.get())
780                                                {
781                                                        tcp -> Open(*ad);
782                                                }
783                                                else
784                                                {
785                                                        LogError(p, "Reconnect", 0, "no address", LOG_LEVEL_ERROR);
786                                                }
787                                                tcp -> ResetConnectionRetries();
788                                                Add(p);
789                                                m_fds_erase.push_back(nn);
790                                        }
791                                        else
792#endif
793                                        {
794                                                SOCKET nn = *it; //(*it3).first;
795DEB(                                            fprintf(stderr, " close(3) fd %d GetSocket() %d\n", nn, p -> GetSocket());)
796                                                if (tcp && p -> IsConnected() && tcp -> GetOutputLength())
797                                                {
798                                                        LogError(p, "Closing", (int)tcp -> GetOutputLength(), "Closing socket while data still left to send", LOG_LEVEL_WARNING);
799                                                }
800#ifdef ENABLE_POOL
801                                                if (p -> Retain() && !p -> Lost())
802                                                {
803                                                        PoolSocket *p2 = new PoolSocket(*this, p);
804                                                        p2 -> SetDeleteByHandler();
805                                                        Add(p2);
806                                                        //
807                                                        p -> SetCloseAndDelete(false); // added - remove from m_fds_close
808                                                }
809                                                else
810#endif // ENABLE_POOL
811                                                {
812                                                        Set(p -> GetSocket(),false,false,false);
813DEB(                                                    fprintf(stderr, "Close() before OnDelete\n");)
814                                                        p -> Close();
815                                                }
816                                                p -> OnDelete();
817                                                if (p -> DeleteByHandler())
818                                                {
819                                                        p -> SetErasedByHandler();
820                                                }
821                                                m_fds_erase.push_back(nn);
822                                        }
823                                }
824                        }
825                }
826        }
827
828        // check erased sockets
829        bool check_max_fd = false;
830        while (!m_fds_erase.empty())
831        {
832                socket_v::iterator it = m_fds_erase.begin();
833                SOCKET nn = *it;
834#ifdef ENABLE_DETACH
835                {
836                        for (socket_v::iterator it = m_fds_detach.begin(); it != m_fds_detach.end(); it++)
837                        {
838                                if (*it == nn)
839                                {
840                                        m_fds_detach.erase(it);
841                                        break;
842                                }
843                        }
844                }
845#endif
846                {
847                        for (socket_v::iterator it = m_fds.begin(); it != m_fds.end(); it++)
848                        {
849                                if (*it == nn)
850                                {
851                                        m_fds.erase(it);
852                                        break;
853                                }
854                        }
855                }
856                {
857                        socket_m::iterator it = m_sockets.find(nn);
858                        if (it != m_sockets.end())
859                        {
860                                Socket *p = it -> second;
861                                /* Sometimes a SocketThread class can finish its run before the master
862                                   sockethandler gets here. In that case, the SocketThread has set the
863                                   'ErasedByHandler' flag on the socket which will make us end up with a
864                                   double delete on the socket instance.
865                                   The fix is to make sure that the master sockethandler only can delete
866                                   non-detached sockets, and a slave sockethandler only can delete
867                                   detach sockets. */
868                                if (p -> ErasedByHandler()
869#ifdef ENABLE_DETACH
870                                        && !(m_slave ^ p -> IsDetached()) 
871#endif
872                                        )
873                                {
874#ifdef ENABLE_TRIGGERS
875                                        bool again = false;
876                                        do
877                                        {
878                                                again = false;
879                                                for (std::map<int, Socket *>::iterator it = m_trigger_src.begin(); it != m_trigger_src.end(); it++)
880                                                {
881                                                        int id = it -> first;
882                                                        Socket *src = it -> second;
883                                                        if (src == p)
884                                                        {
885                                                                for (std::map<Socket *, bool>::iterator it = m_trigger_dst[id].begin(); it != m_trigger_dst[id].end(); it++)
886                                                                {
887                                                                        Socket *dst = it -> first;
888                                                                        if (Valid(dst))
889                                                                        {
890                                                                                dst -> OnCancelled(id);
891                                                                        }
892                                                                }
893                                                                m_trigger_src.erase(m_trigger_src.find(id));
894                                                                m_trigger_dst.erase(m_trigger_dst.find(id));
895                                                                again = true;
896                                                                break;
897                                                        }
898                                                }
899                                        } while (again);
900#endif
901                                        delete p;
902                                }
903                                m_sockets.erase(it);
904                        }
905                }
906                m_fds_erase.erase(it);
907                check_max_fd = true;
908        }
909        // calculate max file descriptor for select() call
910        if (check_max_fd)
911        {
912                m_maxsock = 0;
913                for (socket_v::iterator it = m_fds.begin(); it != m_fds.end(); it++)
914                {
915                        SOCKET s = *it;
916                        m_maxsock = s > m_maxsock ? s : m_maxsock;
917                }
918        }
919        // remove Add's that fizzed
920        while (!m_delete.empty())
921        {
922                std::list<Socket *>::iterator it = m_delete.begin();
923                Socket *p = *it;
924                p -> OnDelete();
925                m_delete.erase(it);
926                if (p -> DeleteByHandler()
927#ifdef ENABLE_DETACH
928                        && !(m_slave ^ p -> IsDetached()) 
929#endif
930                        )
931                {
932                        p -> SetErasedByHandler();
933#ifdef ENABLE_TRIGGERS
934                        bool again = false;
935                        do
936                        {
937                                again = false;
938                                for (std::map<int, Socket *>::iterator it = m_trigger_src.begin(); it != m_trigger_src.end(); it++)
939                                {
940                                        int id = it -> first;
941                                        Socket *src = it -> second;
942                                        if (src == p)
943                                        {
944                                                for (std::map<Socket *, bool>::iterator it = m_trigger_dst[id].begin(); it != m_trigger_dst[id].end(); it++)
945                                                {
946                                                        Socket *dst = it -> first;
947                                                        if (Valid(dst))
948                                                        {
949                                                                dst -> OnCancelled(id);
950                                                        }
951                                                }
952                                                m_trigger_src.erase(m_trigger_src.find(id));
953                                                m_trigger_dst.erase(m_trigger_dst.find(id));
954                                                again = true;
955                                                break;
956                                        }
957                                }
958                        } while (again);
959#endif
960                        delete p;
961                }
962        }
963        return n;
964}
965
966
967#ifdef ENABLE_RESOLVER
968bool SocketHandler::Resolving(Socket *p0)
969{
970        std::map<Socket *, bool>::iterator it = m_resolve_q.find(p0);
971        return it != m_resolve_q.end();
972}
973#endif
974
975
976bool SocketHandler::Valid(Socket *p0)
977{
978        for (socket_m::iterator it = m_sockets.begin(); it != m_sockets.end(); it++)
979        {
980                Socket *p = it -> second;
981                if (p0 == p)
982                        return true;
983        }
984        return false;
985}
986
987
988bool SocketHandler::OkToAccept(Socket *)
989{
990        return true;
991}
992
993
994size_t SocketHandler::GetCount()
995{
996/*
997printf(" m_sockets : %d\n", m_sockets.size());
998printf(" m_add     : %d\n", m_add.size());
999printf(" m_delete  : %d\n", m_delete.size());
1000*/
1001        return m_sockets.size() + m_add.size() + m_delete.size();
1002}
1003
1004
1005#ifdef ENABLE_SOCKS4
1006void SocketHandler::SetSocks4Host(ipaddr_t a)
1007{
1008        m_socks4_host = a;
1009}
1010
1011
1012void SocketHandler::SetSocks4Host(const std::string& host)
1013{
1014        Utility::u2ip(host, m_socks4_host);
1015}
1016
1017
1018void SocketHandler::SetSocks4Port(port_t port)
1019{
1020        m_socks4_port = port;
1021}
1022
1023
1024void SocketHandler::SetSocks4Userid(const std::string& id)
1025{
1026        m_socks4_userid = id;
1027}
1028#endif
1029
1030
1031#ifdef ENABLE_RESOLVER
1032int SocketHandler::Resolve(Socket *p,const std::string& host,port_t port)
1033{
1034        // check cache
1035        ResolvSocket *resolv = new ResolvSocket(*this, p, host, port);
1036        resolv -> SetId(++m_resolv_id);
1037        resolv -> SetDeleteByHandler();
1038        ipaddr_t local;
1039        Utility::u2ip("127.0.0.1", local);
1040        if (!resolv -> Open(local, m_resolver_port))
1041        {
1042                LogError(resolv, "Resolve", -1, "Can't connect to local resolve server", LOG_LEVEL_FATAL);
1043        }
1044        Add(resolv);
1045        m_resolve_q[p] = true;
1046DEB(    fprintf(stderr, " *** Resolve '%s:%d' id#%d  m_resolve_q size: %d  p: %p\n", host.c_str(), port, resolv -> GetId(), m_resolve_q.size(), p);)
1047        return resolv -> GetId();
1048}
1049
1050
1051#ifdef ENABLE_IPV6
1052int SocketHandler::Resolve6(Socket *p,const std::string& host,port_t port)
1053{
1054        // check cache
1055        ResolvSocket *resolv = new ResolvSocket(*this, p, host, port, true);
1056        resolv -> SetId(++m_resolv_id);
1057        resolv -> SetDeleteByHandler();
1058        ipaddr_t local;
1059        Utility::u2ip("127.0.0.1", local);
1060        if (!resolv -> Open(local, m_resolver_port))
1061        {
1062                LogError(resolv, "Resolve", -1, "Can't connect to local resolve server", LOG_LEVEL_FATAL);
1063        }
1064        Add(resolv);
1065        m_resolve_q[p] = true;
1066        return resolv -> GetId();
1067}
1068#endif
1069
1070
1071int SocketHandler::Resolve(Socket *p,ipaddr_t a)
1072{
1073        // check cache
1074        ResolvSocket *resolv = new ResolvSocket(*this, p, a);
1075        resolv -> SetId(++m_resolv_id);
1076        resolv -> SetDeleteByHandler();
1077        ipaddr_t local;
1078        Utility::u2ip("127.0.0.1", local);
1079        if (!resolv -> Open(local, m_resolver_port))
1080        {
1081                LogError(resolv, "Resolve", -1, "Can't connect to local resolve server", LOG_LEVEL_FATAL);
1082        }
1083        Add(resolv);
1084        m_resolve_q[p] = true;
1085        return resolv -> GetId();
1086}
1087
1088
1089#ifdef ENABLE_IPV6
1090int SocketHandler::Resolve(Socket *p,in6_addr& a)
1091{
1092        // check cache
1093        ResolvSocket *resolv = new ResolvSocket(*this, p, a);
1094        resolv -> SetId(++m_resolv_id);
1095        resolv -> SetDeleteByHandler();
1096        ipaddr_t local;
1097        Utility::u2ip("127.0.0.1", local);
1098        if (!resolv -> Open(local, m_resolver_port))
1099        {
1100                LogError(resolv, "Resolve", -1, "Can't connect to local resolve server", LOG_LEVEL_FATAL);
1101        }
1102        Add(resolv);
1103        m_resolve_q[p] = true;
1104        return resolv -> GetId();
1105}
1106#endif
1107
1108
1109void SocketHandler::EnableResolver(port_t port)
1110{
1111        if (!m_resolver)
1112        {
1113                m_resolver_port = port;
1114                m_resolver = new ResolvServer(port);
1115        }
1116}
1117
1118
1119bool SocketHandler::ResolverReady()
1120{
1121        return m_resolver ? m_resolver -> Ready() : false;
1122}
1123#endif // ENABLE_RESOLVER
1124
1125
1126#ifdef ENABLE_SOCKS4
1127void SocketHandler::SetSocks4TryDirect(bool x)
1128{
1129        m_bTryDirect = x;
1130}
1131
1132
1133ipaddr_t SocketHandler::GetSocks4Host()
1134{
1135        return m_socks4_host;
1136}
1137
1138
1139port_t SocketHandler::GetSocks4Port()
1140{
1141        return m_socks4_port;
1142}
1143
1144
1145const std::string& SocketHandler::GetSocks4Userid()
1146{
1147        return m_socks4_userid;
1148}
1149
1150
1151bool SocketHandler::Socks4TryDirect()
1152{
1153        return m_bTryDirect;
1154}
1155#endif
1156
1157
1158#ifdef ENABLE_RESOLVER
1159bool SocketHandler::ResolverEnabled() 
1160{ 
1161        return m_resolver ? true : false; 
1162}
1163
1164
1165port_t SocketHandler::GetResolverPort() 
1166{ 
1167        return m_resolver_port; 
1168}
1169#endif // ENABLE_RESOLVER
1170
1171
1172#ifdef ENABLE_POOL
1173ISocketHandler::PoolSocket *SocketHandler::FindConnection(int type,const std::string& protocol,SocketAddress& ad)
1174{
1175        for (socket_m::iterator it = m_sockets.begin(); it != m_sockets.end() && !m_sockets.empty(); it++)
1176        {
1177                PoolSocket *pools = dynamic_cast<PoolSocket *>(it -> second);
1178                if (pools)
1179                {
1180                        if (pools -> GetSocketType() == type &&
1181                            pools -> GetSocketProtocol() == protocol &&
1182// %!                       pools -> GetClientRemoteAddress() &&
1183                            *pools -> GetClientRemoteAddress() == ad)
1184                        {
1185                                m_sockets.erase(it);
1186                                pools -> SetRetain(); // avoid Close in Socket destructor
1187                                return pools; // Caller is responsible that this socket is deleted
1188                        }
1189                }
1190        }
1191        return NULL;
1192}
1193
1194
1195void SocketHandler::EnablePool(bool x)
1196{
1197        m_b_enable_pool = x;
1198}
1199
1200
1201bool SocketHandler::PoolEnabled() 
1202{ 
1203        return m_b_enable_pool; 
1204}
1205#endif
1206
1207
1208void SocketHandler::Remove(Socket *p)
1209{
1210#ifdef ENABLE_RESOLVER
1211        std::map<Socket *, bool>::iterator it4 = m_resolve_q.find(p);
1212        if (it4 != m_resolve_q.end())
1213                m_resolve_q.erase(it4);
1214#endif
1215        if (p -> ErasedByHandler())
1216        {
1217                return;
1218        }
1219        for (socket_m::iterator it = m_sockets.begin(); it != m_sockets.end(); it++)
1220        {
1221                if (it -> second == p)
1222                {
1223                        LogError(p, "Remove", -1, "Socket destructor called while still in use", LOG_LEVEL_WARNING);
1224                        m_sockets.erase(it);
1225                        return;
1226                }
1227        }
1228        for (socket_m::iterator it2 = m_add.begin(); it2 != m_add.end(); it2++)
1229        {
1230                if ((*it2).second == p)
1231                {
1232                        LogError(p, "Remove", -2, "Socket destructor called while still in use", LOG_LEVEL_WARNING);
1233                        m_add.erase(it2);
1234                        return;
1235                }
1236        }
1237        for (std::list<Socket *>::iterator it3 = m_delete.begin(); it3 != m_delete.end(); it3++)
1238        {
1239                if (*it3 == p)
1240                {
1241                        LogError(p, "Remove", -3, "Socket destructor called while still in use", LOG_LEVEL_WARNING);
1242                        m_delete.erase(it3);
1243                        return;
1244                }
1245        }
1246}
1247
1248
1249void SocketHandler::CheckSanity()
1250{
1251        CheckList(m_fds, "active sockets"); // active sockets
1252        CheckList(m_fds_erase, "sockets to be erased"); // should always be empty anyway
1253        CheckList(m_fds_callonconnect, "checklist CallOnConnect");
1254#ifdef ENABLE_DETACH
1255        CheckList(m_fds_detach, "checklist Detach");
1256#endif
1257        CheckList(m_fds_timeout, "checklist Timeout");
1258        CheckList(m_fds_retry, "checklist retry client connect");
1259        CheckList(m_fds_close, "checklist close and delete");
1260}
1261
1262
1263void SocketHandler::CheckList(socket_v& ref,const std::string& listname)
1264{
1265        for (socket_v::iterator it = ref.begin(); it != ref.end(); it++)
1266        {
1267                SOCKET s = *it;
1268                if (m_sockets.find(s) != m_sockets.end())
1269                        continue;
1270                if (m_add.find(s) != m_add.end())
1271                        continue;
1272                bool found = false;
1273                for (std::list<Socket *>::iterator it = m_delete.begin(); it != m_delete.end(); it++)
1274                {
1275                        Socket *p = *it;
1276                        if (p -> GetSocket() == s)
1277                        {
1278                                found = true;
1279                                break;
1280                        }
1281                }
1282                if (!found)
1283                {
1284                        fprintf(stderr, "CheckList failed for \"%s\": fd %d\n", listname.c_str(), s);
1285                }
1286        }
1287}
1288
1289
1290void SocketHandler::AddList(SOCKET s,list_t which_one,bool add)
1291{
1292        if (s == INVALID_SOCKET)
1293        {
1294DEB(            fprintf(stderr, "AddList:  invalid_socket\n");)
1295                return;
1296        }
1297        socket_v& ref =
1298                (which_one == LIST_CALLONCONNECT) ? m_fds_callonconnect :
1299#ifdef ENABLE_DETACH
1300                (which_one == LIST_DETACH) ? m_fds_detach :
1301#endif
1302                (which_one == LIST_TIMEOUT) ? m_fds_timeout :
1303                (which_one == LIST_RETRY) ? m_fds_retry :
1304                (which_one == LIST_CLOSE) ? m_fds_close : m_fds_close;
1305        if (add)
1306        {
1307#ifdef ENABLE_DETACH
1308DEB(    fprintf(stderr, "AddList;  %5d: %s: %s\n", s, (which_one == LIST_CALLONCONNECT) ? "CallOnConnect" :
1309                (which_one == LIST_DETACH) ? "Detach" :
1310                (which_one == LIST_TIMEOUT) ? "Timeout" :
1311                (which_one == LIST_RETRY) ? "Retry" :
1312                (which_one == LIST_CLOSE) ? "Close" : "<undef>",
1313                add ? "Add" : "Remove");)
1314#else
1315DEB(    fprintf(stderr, "AddList;  %5d: %s: %s\n", s, (which_one == LIST_CALLONCONNECT) ? "CallOnConnect" :
1316                (which_one == LIST_TIMEOUT) ? "Timeout" :
1317                (which_one == LIST_RETRY) ? "Retry" :
1318                (which_one == LIST_CLOSE) ? "Close" : "<undef>",
1319                add ? "Add" : "Remove");)
1320#endif
1321        }
1322        if (add)
1323        {
1324                for (socket_v::iterator it = ref.begin(); it != ref.end(); it++)
1325                {
1326                        if (*it == s) // already there
1327                        {
1328                                return;
1329                        }
1330                }
1331                ref.push_back(s);
1332                return;
1333        }
1334        // remove
1335        for (socket_v::iterator it = ref.begin(); it != ref.end(); it++)
1336        {
1337                if (*it == s)
1338                {
1339                        ref.erase(it);
1340                        break;
1341                }
1342        }
1343//DEB(  fprintf(stderr, "/AddList\n");)
1344}
1345
1346
1347#ifdef ENABLE_TRIGGERS
1348int SocketHandler::TriggerID(Socket *src)
1349{
1350        int id = m_next_trigger_id++;
1351        m_trigger_src[id] = src;
1352        return id;
1353}
1354
1355
1356bool SocketHandler::Subscribe(int id, Socket *dst)
1357{
1358        if (m_trigger_src.find(id) != m_trigger_src.end())
1359        {
1360                std::map<Socket *, bool>::iterator it = m_trigger_dst[id].find(dst);
1361                if (it != m_trigger_dst[id].end())
1362                {
1363                        m_trigger_dst[id][dst] = true;
1364                        return true;
1365                }
1366                LogError(dst, "Subscribe", id, "Already subscribed", LOG_LEVEL_INFO);
1367                return false;
1368        }
1369        LogError(dst, "Subscribe", id, "Trigger id not found", LOG_LEVEL_INFO);
1370        return false;
1371}
1372
1373
1374bool SocketHandler::Unsubscribe(int id, Socket *dst)
1375{
1376        if (m_trigger_src.find(id) != m_trigger_src.end())
1377        {
1378                std::map<Socket *, bool>::iterator it = m_trigger_dst[id].find(dst);
1379                if (it != m_trigger_dst[id].end())
1380                {
1381                        m_trigger_dst[id].erase(it);
1382                        return true;
1383                }
1384                LogError(dst, "Unsubscribe", id, "Not subscribed", LOG_LEVEL_INFO);
1385                return false;
1386        }
1387        LogError(dst, "Unsubscribe", id, "Trigger id not found", LOG_LEVEL_INFO);
1388        return false;
1389}
1390
1391
1392void SocketHandler::Trigger(int id, Socket::TriggerData& data, bool erase)
1393{
1394        if (m_trigger_src.find(id) != m_trigger_src.end())
1395        {
1396                data.SetSource( m_trigger_src[id] );
1397                for (std::map<Socket *, bool>::iterator it = m_trigger_dst[id].begin(); it != m_trigger_dst[id].end(); it++)
1398                {
1399                        Socket *dst = it -> first;
1400                        if (Valid(dst))
1401                        {
1402                                dst -> OnTrigger(id, data);
1403                        }
1404                }
1405                if (erase)
1406                {
1407                        m_trigger_src.erase(m_trigger_src.find(id));
1408                        m_trigger_dst.erase(m_trigger_dst.find(id));
1409                }
1410        }
1411        else
1412        {
1413                LogError(NULL, "Trigger", id, "Trigger id not found", LOG_LEVEL_INFO);
1414        }
1415}
1416#endif // ENABLE_TRIGGERS
1417
1418
1419#ifdef SOCKETS_NAMESPACE
1420}
1421#endif
1422
Note: See TracBrowser for help on using the browser.