denizzzka / dpq2
1
/// Query methods
2
module dpq2.query;
3

4
public import dpq2.args;
5

6
import dpq2.connection: Connection, ConnectionException;
7
import dpq2.result: Result;
8
import dpq2.value;
9
import dpq2.oids: OidType;
10
import derelict.pq.pq;
11
import core.time: Duration, dur;
12
import std.exception: enforce;
13

14
/// Extends Connection by adding query methods
15
///
16
/// Just use it as Connection.* methods.
17
mixin template Queries()
18
{
19
    /// Perform SQL query to DB
20
    /// It uses the old wire protocol and all values are returned in textual
21
    /// form. This means that the dpq2.conv.to_d_types.as template will likely
22
    /// not work for anything but strings.
23
    /// Try to used execParams instead, even if now parameters are present.
24
    immutable (Answer) exec( string SQLcmd )
25
    {
26
        auto pgResult = PQexec(conn, toStringz( SQLcmd ));
27

28
        // is guaranteed by libpq that the result will not be changed until it will not be destroyed
29
        auto container = createResultContainer(cast(immutable) pgResult);
30

31
        return new immutable Answer(container);
32
    }
33

34
    /// Perform SQL query to DB
35
    immutable (Answer) execParams(in ref QueryParams qp)
36
    {
37
        auto p = InternalQueryParams(&qp);
38
        auto pgResult = PQexecParams (
39
                conn,
40
                p.command,
41
                p.nParams,
42
                p.paramTypes,
43
                p.paramValues,
44
                p.paramLengths,
45
                p.paramFormats,
46
                p.resultFormat
47
        );
48

49
        // is guaranteed by libpq that the result will not be changed until it will not be destroyed
50
        auto container = createResultContainer(cast(immutable) pgResult);
51

52
        return new immutable Answer(container);
53
    }
54

55
    /// Submits a command to the server without waiting for the result(s)
56
    void sendQuery( string SQLcmd )
57
    {
58
        const size_t r = PQsendQuery( conn, toStringz(SQLcmd) );
59
        if(r != 1) throw new ConnectionException(this, __FILE__, __LINE__);
60
    }
61

62
    /// Submits a command and separate parameters to the server without waiting for the result(s)
63
    void sendQueryParams(in ref QueryParams qp)
64
    {
65
        auto p = InternalQueryParams(&qp);
66
        size_t r = PQsendQueryParams (
67
                conn,
68
                p.command,
69
                p.nParams,
70
                p.paramTypes,
71
                p.paramValues,
72
                p.paramLengths,
73
                p.paramFormats,
74
                p.resultFormat
75
            );
76

77
        if(r != 1) throw new ConnectionException(this, __FILE__, __LINE__);
78
    }
79

80
    /// Sends a request to execute a prepared statement with given parameters, without waiting for the result(s)
81
    void sendQueryPrepared(in ref QueryParams qp)
82
    {
83
        auto p = InternalQueryParams(&qp);
84
        size_t r = PQsendQueryPrepared(
85
                conn,
86
                p.stmtName,
87
                p.nParams,
88
                p.paramValues,
89
                p.paramLengths,
90
                p.paramFormats,
91
                p.resultFormat
92
            );
93

94
        if(r != 1) throw new ConnectionException(this, __FILE__, __LINE__);
95
    }
96

97
    /// Returns null if no notifies was received
98
    Notify getNextNotify()
99
    {
100
        consumeInput();
101
        auto n = PQnotifies(conn);
102
        return n is null ? null : new Notify( n );
103
    }
104

105
    /// Submits a request to create a prepared statement with the given parameters, and waits for completion.
106
    /// Returns: Result of query preparing
107
    immutable(Result) prepare(string statementName, string sqlStatement, in Oid[] oids = null)
108
    {
109
        PGresult* pgResult = PQprepare(
110
                conn,
111
                toStringz(statementName),
112
                toStringz(sqlStatement),
113
                oids.length.to!int,
114
                oids.ptr
115
            );
116

117
        // is guaranteed by libpq that the result will not be changed until it will not be destroyed
118
        auto container = createResultContainer(cast(immutable) pgResult);
119

120
        return new immutable Result(container);
121
    }
122

123
    /// Submits a request to create a prepared statement with the given parameters, and waits for completion.
124
    ///
125
    /// Throws an exception if preparing failed.
126
    void prepareEx(string statementName, string sqlStatement, in Oid[] oids = null)
127
    {
128
        auto r = prepare(statementName, sqlStatement, oids);
129

130
        if(r.status != PGRES_COMMAND_OK)
131
            throw new ResponseException(r, __FILE__, __LINE__);
132
    }
133

134
    /// Submits a request to execute a prepared statement with given parameters, and waits for completion.
135
    immutable(Answer) execPrepared(in ref QueryParams qp)
136
    {
137
        auto p = InternalQueryParams(&qp);
138
        auto pgResult = PQexecPrepared(
139
                conn,
140
                p.stmtName,
141
                p.nParams,
142
                cast(const(char*)*)p.paramValues,
143
                p.paramLengths,
144
                p.paramFormats,
145
                p.resultFormat
146
            );
147

148
        // is guaranteed by libpq that the result will not be changed until it will not be destroyed
149
        auto container = createResultContainer(cast(immutable) pgResult);
150

151
        return new immutable Answer(container);
152
    }
153

154
    /// Sends a request to create a prepared statement with the given parameters, without waiting for completion.
155
    void sendPrepare(string statementName, string sqlStatement, in Oid[] oids = null)
156
    {
157
        size_t r = PQsendPrepare(
158
                conn,
159
                toStringz(statementName),
160
                toStringz(sqlStatement),
161
                oids.length.to!int,
162
                oids.ptr
163
            );
164

165
        if(r != 1) throw new ConnectionException(this, __FILE__, __LINE__);
166
    }
167

168
    /// Submits a request to obtain information about the specified prepared statement, and waits for completion.
169
    immutable(Answer) describePrepared(string statementName)
170
    {
171
        PGresult* pgResult = PQdescribePrepared(conn, toStringz(statementName));
172

173
        // is guaranteed by libpq that the result will not be changed until it will not be destroyed
174
        auto container = createResultContainer(cast(immutable) pgResult);
175

176
        return new immutable Answer(container);
177
    }
178

179
    /// Submits a request to obtain information about the specified prepared statement, without waiting for completion.
180
    void sendDescribePrepared(string statementName)
181
    {
182
        size_t r = PQsendDescribePrepared(conn, statementName.toStringz);
183

184
        if(r != 1) throw new ConnectionException(this, __FILE__, __LINE__);
185
    }
186

187
    /// Sends a buffer of CSV data to the COPY command
188
    ///
189
    /// Returns: true if the data was queued, false if it was not queued because of full buffers (this will only happen in nonblocking mode)
190
    bool putCopyData( string data )
191
    {
192
        const int r = PQputCopyData(conn, data.toStringz, data.length.to!int);
193

194
        if(r == -1) throw new ConnectionException(this);
195

196
        return r != 0;
197
    }
198

199
    /// Signals that COPY data send is finished. Finalize and flush the COPY command.
200
    immutable(Answer) putCopyEnd()
201
    {
202
        assert(!isNonBlocking, "Only for blocking connections");
203

204
        const bool r = sendPutCopyEnd;
205

206
        assert(r, "Impossible status for blocking connections");
207

208
        // after the copying is finished, and there is no connection error, we must still get the command result
209
        // this will get if there is any errors in the process (invalid data format or constraint violation, etc.)
210
        auto pgResult = PQgetResult(conn);
211

212
        // is guaranteed by libpq that the result will not be changed until it will not be destroyed
213
        auto container = createResultContainer(cast(immutable) pgResult);
214

215
        return new immutable Answer(container);
216
    }
217

218
    /// Signals that COPY data send is finished.
219
    ///
220
    /// Returns: true if the termination data was sent, zero if it was not sent because the attempt would block (this case is only possible if the connection is in nonblocking mode)
221
    bool sendPutCopyEnd()
222
    {
223
        const char* error;
224
        const int r = PQputCopyEnd(conn, error);
225

226
        if(error !is null) throw new ConnectionException(error.to!string);
227

228
        if(r == -1) throw new ConnectionException(this);
229

230
        return r != 0;
231
    }
232

233
    // Waiting for completion of reading or writing
234
    // Returns: timeout is not occured
235
    version(integration_tests)
236
    bool waitEndOf(WaitType type, Duration timeout = Duration.zero)
237
    {
238
        import std.socket;
239

240
        auto socket = this.socket();
241
        auto set = new SocketSet;
242
        set.add(socket);
243

244
        while(true)
245
        {
246
            if(status() == CONNECTION_BAD)
247
                throw new ConnectionException(this, __FILE__, __LINE__);
248

249
            if(poll() == PGRES_POLLING_OK)
250
            {
251
                return true;
252
            }
253
            else
254
            {
255
                size_t sockNum;
256

257
                with(WaitType)
258
                final switch(type)
259
                {
260
                    case READ:
261
                        sockNum = Socket.select(set, null, set, timeout);
262
                        break;
263

264
                    case WRITE:
265
                        sockNum = Socket.select(null, set, set, timeout);
266
                        break;
267

268
                    case READ_WRITE:
269
                        sockNum = Socket.select(set, set, set, timeout);
270
                        break;
271
                }
272

273
                enforce(sockNum >= 0);
274
                if(sockNum == 0) return false; // timeout is occurred
275

276
                continue;
277
            }
278
        }
279
    }
280
}
281

282
version(integration_tests)
283
enum WaitType
284
{
285
    READ,
286
    WRITE,
287
    READ_WRITE
288
}
289

290
version (integration_tests)
291
void _integration_test( string connParam ) @trusted
292
{
293
    import dpq2.conv.to_d_types;
294
    import dpq2.conv.to_bson;
295

296 0
    auto conn = new Connection(connParam);
297

298
    // Text type arguments testing
299
    {
300 0
        string sql_query =
301
        "select now() as time, 'abc'::text as string, 123, 456.78\n"~
302
        "union all\n"~
303
        "select now(), 'абвгд'::text, 777, 910.11\n"~
304
        "union all\n"~
305
        "select NULL, 'ijk'::text, 789, 12345.115345";
306

307 0
        auto a = conn.exec( sql_query );
308

309 0
        assert( a.cmdStatus.length > 2 );
310 0
        assert( a.columnCount == 4 );
311 0
        assert( a.length == 3 );
312 0
        assert( a.columnFormat(1) == ValueFormat.TEXT );
313 0
        assert( a.columnFormat(2) == ValueFormat.TEXT );
314
    }
315

316
    // Binary type arguments testing
317
    {
318
        import vibe.data.bson: Bson;
319

320 0
        const string sql_query =
321
        "select $1::text, $2::integer, $3::text, $4, $5::integer[]";
322

323 0
        Value[5] args;
324 0
        args[0] = toValue("абвгд");
325 0
        args[1] = Value(ValueFormat.BINARY, OidType.Undefined); // undefined type NULL value
326 0
        args[2] = toValue("123");
327 0
        args[3] = Value(ValueFormat.BINARY, OidType.Int8); // NULL value
328

329 0
        Bson binArray = Bson([
330
            Bson([Bson(null), Bson(123), Bson(456)]),
331
            Bson([Bson(0), Bson(789), Bson(null)])
332
        ]);
333

334 0
        args[4] = bsonToValue(binArray);
335

336 0
        QueryParams p;
337 0
        p.sqlCommand = sql_query;
338 0
        p.args = args[];
339

340 0
        auto a = conn.execParams( p );
341

342 0
        foreach(i; 0 .. args.length)
343 0
            assert(a.columnFormat(i) == ValueFormat.BINARY);
344

345 0
        assert( a.OID(0) == OidType.Text );
346 0
        assert( a.OID(1) == OidType.Int4 );
347 0
        assert( a.OID(2) == OidType.Text );
348 0
        assert( a.OID(3) == OidType.Int8 );
349 0
        assert( a.OID(4) == OidType.Int4Array );
350

351
        // binary args array test
352 0
        assert( a[0][4].as!Bson == binArray );
353
    }
354

355
    {
356
        // Bug #52: empty text argument
357 0
        QueryParams p;
358 0
        Value v = toValue("");
359

360 0
        p.sqlCommand = "SELECT $1";
361 0
        p.args = [v];
362

363 0
        auto a = conn.execParams(p);
364

365 0
        assert( !a[0][0].isNull );
366 0
        assert( a[0][0].as!string == "" );
367
    }
368

369
    // checking prepared statements
370
    {
371
        // uses PQprepare:
372 0
        conn.prepareEx("prepared statement 1", "SELECT $1::integer");
373

374 0
        QueryParams p;
375 0
        p.preparedStatementName = "prepared statement 1";
376 0
        p.args = [42.toValue];
377 0
        auto r = conn.execPrepared(p);
378 0
        assert (r[0][0].as!int == 42);
379
    }
380
    {
381
        // uses PQsendPrepare:
382 0
        conn.sendPrepare("prepared statement 2", "SELECT $1::text, $2::integer");
383

384 0
        conn.waitEndOf(WaitType.READ, dur!"seconds"(5));
385 0
        conn.consumeInput();
386

387 0
        immutable(Result)[] res;
388

389 0
        while(true)
390
        {
391 0
            auto r = conn.getResult();
392 0
            if(r is null) break;
393 0
            res ~= r;
394
        }
395

396 0
        assert(res.length == 1);
397 0
        assert(res[0].status == PGRES_COMMAND_OK);
398
    }
399
    {
400
        // check prepared arg types and result types
401 0
        auto a = conn.describePrepared("prepared statement 2");
402

403 0
        assert(a.nParams == 2);
404 0
        assert(a.paramType(0) == OidType.Text);
405 0
        assert(a.paramType(1) == OidType.Int4);
406
    }
407
    {
408
        // async check prepared arg types and result types
409 0
        conn.sendDescribePrepared("prepared statement 2");
410

411 0
        conn.waitEndOf(WaitType.READ, dur!"seconds"(5));
412 0
        conn.consumeInput();
413

414 0
        immutable(Result)[] res;
415

416 0
        while(true)
417
        {
418 0
            auto r = conn.getResult();
419 0
            if(r is null) break;
420 0
            res ~= r;
421
        }
422

423 0
        assert(res.length == 1);
424 0
        assert(res[0].status == PGRES_COMMAND_OK);
425

426 0
        auto a = res[0].getAnswer;
427

428 0
        assert(a.nParams == 2);
429 0
        assert(a.paramType(0) == OidType.Text);
430 0
        assert(a.paramType(1) == OidType.Int4);
431
    }
432
    {
433 0
        QueryParams p;
434 0
        p.preparedStatementName = "prepared statement 2";
435 0
        p.argsFromArray = ["abc", "123456"];
436

437 0
        conn.sendQueryPrepared(p);
438

439 0
        conn.waitEndOf(WaitType.READ, dur!"seconds"(5));
440 0
        conn.consumeInput();
441

442 0
        immutable(Result)[] res;
443

444 0
        while(true)
445
        {
446 0
            auto r = conn.getResult();
447 0
            if(r is null) break;
448 0
            res ~= r;
449
        }
450

451 0
        assert(res.length == 1);
452 0
        assert(res[0].getAnswer[0][0].as!PGtext == "abc");
453 0
        assert(res[0].getAnswer[0][1].as!PGinteger == 123456);
454
    }
455
    {
456
        // test COPY
457 0
        conn.exec("CREATE TEMP TABLE test_copy (text_field TEXT, int_field INT8)");
458

459 0
        conn.exec("COPY test_copy FROM STDIN WITH (FORMAT csv)");
460 0
        conn.putCopyData("Val1,1\nval2,2\n");
461 0
        conn.putCopyData("Val3,3\nval4,4\n");
462 0
        conn.putCopyEnd();
463

464 0
        auto res = conn.exec("SELECT count(text_field), sum(int_field) FROM test_copy");
465 0
        assert(res.length == 1);
466 0
        assert(res[0][0].as!string == "4");
467 0
        assert(res[0][1].as!string == "10");
468

469
        // This time with error
470
        import std.exception: assertThrown;
471
        import dpq2.result: ResponseException;
472

473 0
        conn.exec("COPY test_copy FROM STDIN WITH (FORMAT csv)");
474 0
        conn.putCopyData("Val1,2\nval2,4,POORLY_FORMATTED_CSV\n");
475

476 0
        assertThrown!ResponseException(conn.putCopyEnd());
477
    }
478

479
    import std.socket;
480 0
    conn.socket.shutdown(SocketShutdown.BOTH); // breaks connection
481

482
    {
483 0
        bool exceptionFlag = false;
484

485 0
        try conn.exec("SELECT 'abc'::text").getAnswer;
486
        catch(ConnectionException e)
487
        {
488 0
            exceptionFlag = true;
489 0
            assert(e.msg.length > 15); // error message check
490
        }
491
        finally
492 0
            assert(exceptionFlag);
493
    }
494
}

Read our documentation on viewing source code .

Loading