denizzzka / dpq2
1
/**
2
 * Represents connection to the PostgreSQL server
3
 *
4
 * Most functions is correspond to those in the documentation of Postgres:
5
 * $(HTTPS https://www.postgresql.org/docs/current/static/libpq.html)
6
 */
7
module dpq2.connection;
8

9
import dpq2.query;
10
import dpq2.args: QueryParams;
11
import dpq2.result;
12
import dpq2.exception;
13

14
import derelict.pq.pq;
15
import std.conv: to;
16
import std.string: toStringz, fromStringz;
17
import std.exception: enforce;
18
import std.range;
19
import std.stdio: File;
20
import std.socket;
21
import core.exception;
22
import core.time: Duration;
23

24
/*
25
 * Bugs: On Unix connection is not thread safe.
26
 *
27
 * On Unix, forking a process with open libpq connections can lead
28
 * to unpredictable results because the parent and child processes share
29
 * the same sockets and operating system resources. For this reason,
30
 * such usage is not recommended, though doing an exec from the child
31
 * process to load a new executable is safe.
32

33

34

35
int PQisthreadsafe();
36
Returns 1 if the libpq is thread-safe and 0 if it is not.
37
*/
38

39
/// dumb flag for Connection ctor parametrization
40
struct ConnectionStart {};
41

42
/// Connection
43
class Connection
44
{
45
    package PGconn* conn;
46

47
    invariant
48
    {
49 0
        assert(conn !is null);
50
    }
51

52
    /// Makes a new connection to the database server
53 0
    this(string connString)
54
    {
55 0
        conn = PQconnectdb(toStringz(connString));
56

57 0
        enforce!OutOfMemoryError(conn, "Unable to allocate libpq connection data");
58

59 0
        if(status != CONNECTION_OK)
60 0
            throw new ConnectionException(this, __FILE__, __LINE__);
61
    }
62

63
	/// Starts creation of a connection to the database server in a nonblocking manner
64 0
    this(ConnectionStart, string connString)
65
    {
66 0
        conn = PQconnectStart(toStringz(connString));
67

68 0
        enforce!OutOfMemoryError(conn, "Unable to allocate libpq connection data");
69

70 0
        if( status == CONNECTION_BAD )
71 0
            throw new ConnectionException(this, __FILE__, __LINE__);
72
    }
73

74
    ~this()
75
    {
76 0
        PQfinish( conn );
77
    }
78

79
    mixin Queries;
80

81
    /// Returns the blocking status of the database connection
82
    bool isNonBlocking()
83
    {
84 0
        return PQisnonblocking(conn) == 1;
85
    }
86

87
    /// Sets the nonblocking status of the connection
88
    private void setNonBlocking(bool state)
89
    {
90 0
        if( PQsetnonblocking(conn, state ? 1 : 0 ) == -1 )
91 0
            throw new ConnectionException(this, __FILE__, __LINE__);
92
    }
93

94
    /// Begin reset the communication channel to the server, in a nonblocking manner
95
    ///
96
    /// Useful only for non-blocking operations.
97
    void resetStart()
98
    {
99 0
        if(PQresetStart(conn) == 0)
100 0
            throw new ConnectionException(this, __FILE__, __LINE__);
101
    }
102

103
    /// Useful only for non-blocking operations.
104
    PostgresPollingStatusType poll() nothrow
105
    {
106 0
        assert(conn);
107

108 0
        return PQconnectPoll(conn);
109
    }
110

111
    /// Useful only for non-blocking operations.
112
    PostgresPollingStatusType resetPoll() nothrow
113
    {
114 0
        assert(conn);
115

116 0
        return PQresetPoll(conn);
117
    }
118

119
    /// Returns the status of the connection
120
    ConnStatusType status() nothrow
121
    {
122 0
        return PQstatus(conn);
123
    }
124

125
    /**
126
        Returns the current in-transaction status of the server.
127
        The status can be:
128
            * PQTRANS_IDLE    - currently idle
129
            * PQTRANS_ACTIVE  - a command is in progress (reported only when a query has been sent to the server and not yet completed)
130
            * PQTRANS_INTRANS - idle, in a valid transaction block
131
            * PQTRANS_INERROR - idle, in a failed transaction block
132
            * PQTRANS_UNKNOWN - reported if the connection is bad
133
     */
134
    PGTransactionStatusType transactionStatus() nothrow
135
    {
136 0
        return PQtransactionStatus(conn);
137
    }
138

139
    /// If input is available from the server, consume it
140
    ///
141
    /// Useful only for non-blocking operations.
142
    void consumeInput()
143
    {
144 0
        assert(conn);
145

146 0
        const size_t r = PQconsumeInput( conn );
147 0
        if( r != 1 ) throw new ConnectionException(this, __FILE__, __LINE__);
148
    }
149

150
    package bool flush()
151
    {
152 0
        assert(conn);
153

154 0
        auto r = PQflush(conn);
155 0
        if( r == -1 ) throw new ConnectionException(this, __FILE__, __LINE__);
156 0
        return r == 0;
157
    }
158

159
    /// Obtains the file descriptor number of the connection socket to the server
160
    int posixSocket()
161
    {
162 0
        int r = PQsocket(conn);
163

164 0
        if(r == -1)
165 0
            throw new ConnectionException(this, __FILE__, __LINE__);
166

167 0
        return r;
168
    }
169

170
    /// Obtains duplicate file descriptor number of the connection socket to the server
171
    socket_t posixSocketDuplicate()
172
    {
173
        version(Windows)
174
        {
175
            assert(false, "FIXME: implement socket duplication");
176
        }
177
        else // Posix OS
178
        {
179
            import core.sys.posix.unistd: dup;
180

181 0
            return cast(socket_t) dup(cast(socket_t) posixSocket);
182
        }
183
    }
184

185
    /// Obtains std.socket.Socket of the connection to the server
186
    ///
187
    /// Due to a limitation of Socket actually for the Socket creation
188
    /// duplicate of internal posix socket will be used.
189
    Socket socket()
190
    {
191 0
        return new Socket(posixSocketDuplicate, AddressFamily.UNSPEC);
192
    }
193

194
    /// Returns the error message most recently generated by an operation on the connection
195
    string errorMessage() const nothrow
196
    {
197 0
        return PQerrorMessage(conn).to!string;
198
    }
199

200
    /**
201
     * Sets or examines the current notice processor
202
     *
203
     * Returns the previous notice receiver or processor function pointer, and sets the new value.
204
     * If you supply a null function pointer, no action is taken, but the current pointer is returned.
205
     */
206
    PQnoticeProcessor setNoticeProcessor(PQnoticeProcessor proc, void* arg) nothrow
207
    {
208 0
        assert(conn);
209

210 0
        return PQsetNoticeProcessor(conn, proc, arg);
211
    }
212

213
    /// Get next result after sending a non-blocking commands. Can return null.
214
    ///
215
    /// Useful only for non-blocking operations.
216
    immutable(Result) getResult()
217
    {
218
        // is guaranteed by libpq that the result will not be changed until it will not be destroyed
219 0
        auto r = cast(immutable) PQgetResult(conn);
220

221 0
        if(r)
222
        {
223 0
            auto container = new immutable ResultContainer(r);
224 0
            return new immutable Result(container);
225
        }
226

227 0
        return null;
228
    }
229

230
    /// Get result after PQexec* functions or throw exception if pull is empty
231
    package immutable(ResultContainer) createResultContainer(immutable PGresult* r) const
232
    {
233 0
        if(r is null) throw new ConnectionException(this, __FILE__, __LINE__);
234

235 0
        return new immutable ResultContainer(r);
236
    }
237

238
    /// Select single-row mode for the currently-executing query
239
    bool setSingleRowMode()
240
    {
241 0
        return PQsetSingleRowMode(conn) == 1;
242
    }
243

244
    /**
245
     Try to cancel query
246

247
     If the cancellation is effective, the current command will
248
     terminate early and return an error result or exception. If the
249
     cancellation will fails (say, because the server was already done
250
     processing the command) there will be no visible result at all.
251
    */
252
    void cancel()
253
    {
254 0
        auto c = new Cancellation(this);
255 0
        c.doCancel;
256
    }
257

258
    ///
259
    bool isBusy() nothrow
260
    {
261 0
        assert(conn);
262

263 0
        return PQisBusy(conn) == 1;
264
    }
265

266
    ///
267
    string parameterStatus(string paramName)
268
    {
269 0
        assert(conn);
270

271 0
        auto res = PQparameterStatus(conn, toStringz(paramName));
272

273 0
        if(res is null)
274 0
            throw new ConnectionException(this, __FILE__, __LINE__);
275

276 0
        return to!string(fromStringz(res));
277
    }
278

279
    ///
280
    string escapeLiteral(string msg)
281
    {
282 0
        assert(conn);
283

284 0
        auto buf = PQescapeLiteral(conn, msg.toStringz, msg.length);
285

286 0
        if(buf is null)
287 0
            throw new ConnectionException(this, __FILE__, __LINE__);
288

289 0
        string res = buf.fromStringz.to!string;
290

291 0
        PQfreemem(buf);
292

293 0
        return res;
294
    }
295

296
    ///
297
    string escapeIdentifier(string msg)
298
    {
299 0
        assert(conn);
300

301 0
        auto buf = PQescapeIdentifier(conn, msg.toStringz, msg.length);
302

303 0
        if(buf is null)
304 0
            throw new ConnectionException(this, __FILE__, __LINE__);
305

306 0
        string res = buf.fromStringz.to!string;
307

308 0
        PQfreemem(buf);
309

310 0
        return res;
311
    }
312

313
    ///
314
    string dbName() const nothrow
315
    {
316 0
        assert(conn);
317

318 0
        return PQdb(conn).fromStringz.to!string;
319
    }
320

321
    ///
322
    string host() const nothrow
323
    {
324 0
        assert(conn);
325

326 0
        return PQhost(conn).fromStringz.to!string;
327
    }
328

329
    ///
330
    int protocolVersion() const nothrow
331
    {
332 0
        assert(conn);
333

334 0
        return PQprotocolVersion(conn);
335
    }
336

337
    ///
338
    int serverVersion() const nothrow
339
    {
340 0
        assert(conn);
341

342 0
        return PQserverVersion(conn);
343
    }
344

345
    ///
346
    void trace(ref File stream)
347
    {
348 0
        PQtrace(conn, stream.getFP);
349
    }
350

351
    ///
352
    void untrace()
353
    {
354 0
        PQuntrace(conn);
355
    }
356

357
    ///
358
    void setClientEncoding(string encoding)
359
    {
360 0
        if(PQsetClientEncoding(conn, encoding.toStringz) != 0)
361 0
            throw new ConnectionException(this, __FILE__, __LINE__);
362
    }
363
}
364

365
/// Check connection options in the provided connection string
366
///
367
/// Throws exception if connection string isn't passes check.
368
void connStringCheck(string connString)
369
{
370 1
    char* errmsg = null;
371 1
    PQconninfoOption* r = PQconninfoParse(connString.toStringz, &errmsg);
372

373 1
    if(r is null)
374
    {
375 1
        enforce!OutOfMemoryError(errmsg, "Unable to allocate libpq conninfo data");
376
    }
377
    else
378
    {
379 1
        PQconninfoFree(r);
380
    }
381

382 1
    if(errmsg !is null)
383
    {
384 1
        string s = errmsg.fromStringz.to!string;
385 1
        PQfreemem(cast(void*) errmsg);
386

387 1
        throw new ConnectionException(s, __FILE__, __LINE__);
388
    }
389
}
390

391
unittest
392
{
393 1
    connStringCheck("dbname=postgres user=postgres");
394

395
    {
396 1
        bool raised = false;
397

398
        try
399 1
            connStringCheck("wrong conninfo string");
400
        catch(ConnectionException e)
401 1
            raised = true;
402

403 1
        assert(raised);
404
    }
405
}
406

407
/// Represents query cancellation process
408
class Cancellation
409
{
410
    private PGcancel* cancel;
411

412
    ///
413 0
    this(Connection c)
414
    {
415 0
        cancel = PQgetCancel(c.conn);
416

417 0
        if(cancel is null)
418 0
            throw new ConnectionException(c, __FILE__, __LINE__);
419
    }
420

421
    ///
422
    ~this()
423
    {
424 0
        PQfreeCancel(cancel);
425
    }
426

427
    /**
428
     Requests that the server abandon processing of the current command
429

430
     Throws exception if cancel request was not successfully dispatched.
431

432
     Successful dispatch is no guarantee that the request will have any
433
     effect, however. If the cancellation is effective, the current
434
     command will terminate early and return an error result
435
     (exception). If the cancellation fails (say, because the server
436
     was already done processing the command), then there will be no
437
     visible result at all.
438
    */
439
    void doCancel()
440
    {
441 0
        char[256] errbuf;
442 0
        auto res = PQcancel(cancel, errbuf.ptr, errbuf.length);
443

444 0
        if(res != 1)
445 0
            throw new CancellationException(to!string(errbuf.ptr.fromStringz), __FILE__, __LINE__);
446
    }
447
}
448

449
///
450
class CancellationException : Dpq2Exception
451
{
452 0
    this(string msg, string file = __FILE__, size_t line = __LINE__)
453
    {
454 0
        super(msg, file, line);
455
    }
456
}
457

458
/// Connection exception
459
class ConnectionException : Dpq2Exception
460
{
461 0
    this(in Connection c, string file = __FILE__, size_t line = __LINE__)
462
    {
463 0
        super(c.errorMessage(), file, line);
464
    }
465

466 1
    this(string msg, string file = __FILE__, size_t line = __LINE__)
467
    {
468 1
        super(msg, file, line);
469
    }
470
}
471

472
version (integration_tests)
473
void _integration_test( string connParam )
474
{
475 0
    assert( PQlibVersion() >= 9_0100 );
476

477
    {
478
        debug import std.experimental.logger;
479

480 0
        auto c = new Connection(connParam);
481 0
        auto dbname = c.dbName();
482 0
        auto pver = c.protocolVersion();
483 0
        auto sver = c.serverVersion();
484

485
        debug
486
        {
487 0
            trace("DB name: ", dbname);
488 0
            trace("Protocol version: ", pver);
489 0
            trace("Server version: ", sver);
490
        }
491

492 0
        destroy(c);
493
    }
494

495
    {
496 0
        bool exceptionFlag = false;
497

498
        try
499 0
            auto c = new Connection(ConnectionStart(), "!!!some incorrect connection string!!!");
500
        catch(ConnectionException e)
501
        {
502 0
            exceptionFlag = true;
503 0
            assert(e.msg.length > 40); // error message check
504
        }
505
        finally
506 0
            assert(exceptionFlag);
507
    }
508

509
    {
510 0
        auto c = new Connection(connParam);
511

512 0
        assert(c.escapeLiteral("abc'def") == "'abc''def'");
513 0
        assert(c.escapeIdentifier("abc'def") == "\"abc'def\"");
514

515 0
        c.setClientEncoding("WIN866");
516 0
        assert(c.exec("show client_encoding")[0][0].as!string == "WIN866");
517
    }
518

519
    {
520 0
        auto c = new Connection(connParam);
521

522 0
        assert(c.transactionStatus == PQTRANS_IDLE);
523

524 0
        c.exec("BEGIN");
525 0
        assert(c.transactionStatus == PQTRANS_INTRANS);
526

527 0
        try c.exec("DISCARD ALL");
528
        catch (Exception) {}
529 0
        assert(c.transactionStatus == PQTRANS_INERROR);
530

531 0
        c.exec("ROLLBACK");
532 0
        assert(c.transactionStatus == PQTRANS_IDLE);
533
    }
534
}

Read our documentation on viewing source code .

Loading