root/trunk/src/shared/Database/DatabasePostgre.cpp @ 8

Revision 2, 8.9 kB (checked in by yumileroy, 17 years ago)

[svn] * Proper SVN structure

Original author: Neo2003
Date: 2008-10-02 16:23:55-05:00

Line 
1/*
2 * Copyright (C) 2005-2008 MaNGOS <http://www.mangosproject.org/>
3 *
4 * This program is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License as published by
6 * the Free Software Foundation; either version 2 of the License, or
7 * (at your option) any later version.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
15 * along with this program; if not, write to the Free Software
16 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
17 */
18
19#ifdef DO_POSTGRESQL
20
21#include "Util.h"
22#include "Policies/SingletonImp.h"
23#include "Platform/Define.h"
24#include "../src/zthread/ThreadImpl.h"
25#include "DatabaseEnv.h"
26#include "Database/PGSQLDelayThread.h"
27#include "Database/SqlOperations.h"
28#include "Timer.h"
29
30void DatabasePostgre::ThreadStart()
31{
32}
33
34void DatabasePostgre::ThreadEnd()
35{
36}
37
38size_t DatabasePostgre::db_count = 0;
39
40DatabasePostgre::DatabasePostgre() : Database(), mPGconn(NULL)
41{
42    // before first connection
43    if( db_count++ == 0 )
44    {
45
46        if (!PQisthreadsafe())
47        {
48            sLog.outError("FATAL ERROR: PostgreSQL libpq isn't thread-safe.");
49            exit(1);
50        }
51    }
52}
53
54DatabasePostgre::~DatabasePostgre()
55{
56
57    if (m_delayThread)
58        HaltDelayThread();
59
60    if( mPGconn )
61    {
62        PQfinish(mPGconn);
63        mPGconn = NULL;
64    }
65}
66
67bool DatabasePostgre::Initialize(const char *infoString)
68{
69    if(!Database::Initialize(infoString))
70        return false;
71
72    tranThread = NULL;
73
74    InitDelayThread();
75
76    Tokens tokens = StrSplit(infoString, ";");
77
78    Tokens::iterator iter;
79
80    std::string host, port_or_socket, user, password, database;
81
82    iter = tokens.begin();
83
84    if(iter != tokens.end())
85        host = *iter++;
86    if(iter != tokens.end())
87        port_or_socket = *iter++;
88    if(iter != tokens.end())
89        user = *iter++;
90    if(iter != tokens.end())
91        password = *iter++;
92    if(iter != tokens.end())
93        database = *iter++;
94
95    mPGconn = PQsetdbLogin(host.c_str(), port_or_socket.c_str(), NULL, NULL, database.c_str(), user.c_str(), password.c_str());
96
97    /* check to see that the backend connection was successfully made */
98    if (PQstatus(mPGconn) != CONNECTION_OK)
99    {
100        sLog.outError( "Could not connect to Postgre database at %s: %s",
101            host.c_str(), PQerrorMessage(mPGconn));
102        PQfinish(mPGconn);
103        return false;
104    }
105    else
106    {
107        sLog.outDetail( "Connected to Postgre database at %s",
108            host.c_str());
109        sLog.outString( "PostgreSQL server ver: %d",PQserverVersion(mPGconn));
110        return true;
111    }
112
113}
114
115QueryResult* DatabasePostgre::Query(const char *sql)
116{
117    if (!mPGconn)
118        return 0;
119
120    uint64 rowCount = 0;
121    uint32 fieldCount = 0;
122
123    // guarded block for thread-safe request
124    ZThread::Guard<ZThread::FastMutex> query_connection_guard(mMutex);
125    #ifdef MANGOS_DEBUG
126    uint32 _s = getMSTime();
127    #endif
128    // Send the query
129    PGresult * result = PQexec(mPGconn, sql);
130    if (!result )
131    {
132        return NULL;
133    }
134
135    if (PQresultStatus(result) != PGRES_TUPLES_OK)
136    {
137        sLog.outErrorDb( "SQL : %s", sql );
138        sLog.outErrorDb( "SQL %s", PQerrorMessage(mPGconn));
139        PQclear(result);
140        return NULL;
141    }
142    else
143    {
144        #ifdef MANGOS_DEBUG
145        sLog.outDebug("[%u ms] SQL: %s", getMSTime() - _s, sql );
146        #endif
147    }
148
149    rowCount = PQntuples(result);
150    fieldCount = PQnfields(result);
151    // end guarded block
152
153    if (!rowCount)
154    {
155        PQclear(result);
156        return NULL;
157    }
158
159    QueryResultPostgre * queryResult = new QueryResultPostgre(result, rowCount, fieldCount);
160    queryResult->NextRow();
161
162    return queryResult;
163}
164
165bool DatabasePostgre::Execute(const char *sql)
166{
167
168    if (!mPGconn)
169        return false;
170
171    // don't use queued execution if it has not been initialized
172    if (!m_threadBody) return DirectExecute(sql);
173
174    tranThread = ZThread::ThreadImpl::current();            // owner of this transaction
175    TransactionQueues::iterator i = m_tranQueues.find(tranThread);
176    if (i != m_tranQueues.end() && i->second != NULL)
177    {                                                       // Statement for transaction
178        i->second->DelayExecute(sql);
179    }
180    else
181    {
182        // Simple sql statement
183        m_threadBody->Delay(new SqlStatement(sql));
184    }
185
186    return true;
187}
188
189bool DatabasePostgre::DirectExecute(const char* sql)
190{
191    if (!mPGconn)
192        return false;
193    {
194        // guarded block for thread-safe  request
195        ZThread::Guard<ZThread::FastMutex> query_connection_guard(mMutex);
196        #ifdef MANGOS_DEBUG
197        uint32 _s = getMSTime();
198        #endif
199        PGresult *res = PQexec(mPGconn, sql);
200        if (PQresultStatus(res) != PGRES_COMMAND_OK)
201        {
202            sLog.outErrorDb( "SQL: %s", sql );
203            sLog.outErrorDb( "SQL %s", PQerrorMessage(mPGconn) );
204            return false;
205        }
206        else
207        {
208            #ifdef MANGOS_DEBUG
209            sLog.outDebug("[%u ms] SQL: %s", getMSTime() - _s, sql );
210            #endif
211        }
212        PQclear(res);
213
214        // end guarded block
215    }
216    return true;
217}
218
219bool DatabasePostgre::_TransactionCmd(const char *sql)
220{
221    if (!mPGconn)
222        return false;
223
224    PGresult *res = PQexec(mPGconn, sql);
225    if (PQresultStatus(res) != PGRES_COMMAND_OK)
226    {
227        sLog.outError("SQL: %s", sql);
228        sLog.outError("SQL ERROR: %s", PQerrorMessage(mPGconn));
229        return false;
230    }
231    else
232    {
233        DEBUG_LOG("SQL: %s", sql);
234    }
235    return true;
236}
237
238bool DatabasePostgre::BeginTransaction()
239{
240    if (!mPGconn)
241        return false;
242    // don't use queued execution if it has not been initialized
243    if (!m_threadBody)
244    {
245        if (tranThread==ZThread::ThreadImpl::current())
246            return false;                                   // huh? this thread already started transaction
247        mMutex.acquire();
248        if (!_TransactionCmd("START TRANSACTION"))
249        {
250            mMutex.release();                               // can't start transaction
251            return false;
252        }
253        return true;
254    }
255    // transaction started
256    tranThread = ZThread::ThreadImpl::current();            // owner of this transaction
257    TransactionQueues::iterator i = m_tranQueues.find(tranThread);
258    if (i != m_tranQueues.end() && i->second != NULL)
259        // If for thread exists queue and also contains transaction
260        // delete that transaction (not allow trans in trans)
261        delete i->second;
262
263    m_tranQueues[tranThread] = new SqlTransaction();
264
265    return true;
266}
267
268bool DatabasePostgre::CommitTransaction()
269{
270    if (!mPGconn)
271        return false;
272
273    // don't use queued execution if it has not been initialized
274    if (!m_threadBody)
275    {
276        if (tranThread!=ZThread::ThreadImpl::current())
277            return false;
278        bool _res = _TransactionCmd("COMMIT");
279        tranThread = NULL;
280        mMutex.release();
281        return _res;
282    }
283    tranThread = ZThread::ThreadImpl::current();
284    TransactionQueues::iterator i = m_tranQueues.find(tranThread);
285    if (i != m_tranQueues.end() && i->second != NULL)
286    {
287        m_threadBody->Delay(i->second);
288        i->second = NULL;
289        return true;
290    }
291    else
292        return false;
293}
294
295bool DatabasePostgre::RollbackTransaction()
296{
297    if (!mPGconn)
298        return false;
299    // don't use queued execution if it has not been initialized
300    if (!m_threadBody)
301    {
302        if (tranThread!=ZThread::ThreadImpl::current())
303            return false;
304        bool _res = _TransactionCmd("ROLLBACK");
305        tranThread = NULL;
306        mMutex.release();
307        return _res;
308    }
309    tranThread = ZThread::ThreadImpl::current();
310    TransactionQueues::iterator i = m_tranQueues.find(tranThread);
311    if (i != m_tranQueues.end() && i->second != NULL)
312    {
313        delete i->second;
314        i->second = NULL;
315    }
316    return true;
317}
318
319unsigned long DatabasePostgre::escape_string(char *to, const char *from, unsigned long length)
320{
321    if (!mPGconn || !to || !from || !length)
322        return 0;
323
324    return PQescapeString(to, from, length);
325}
326
327void DatabasePostgre::InitDelayThread()
328{
329    assert(!m_delayThread);
330
331    //New delay thread for delay execute
332    m_delayThread = new ZThread::Thread(m_threadBody = new PGSQLDelayThread(this));
333}
334
335void DatabasePostgre::HaltDelayThread()
336{
337    if (!m_threadBody || !m_delayThread) return;
338
339    m_threadBody->Stop();                                   //Stop event
340    m_delayThread->wait();                                  //Wait for flush to DB
341    delete m_delayThread;                                   //This also deletes m_threadBody
342    m_delayThread = NULL;
343    m_threadBody = NULL;
344}
345#endif
Note: See TracBrowser for help on using the browser.