1
|
|
// Copyright 2019 Miguel Angel Rivera Notararigo. All rights reserved.
|
2
|
|
// This source code was released under the MIT license.
|
3
|
|
|
4
|
|
package sdb
|
5
|
|
|
6
|
|
import (
|
7
|
|
"encoding/json"
|
8
|
|
"errors"
|
9
|
|
|
10
|
|
"github.com/blevesearch/bleve"
|
11
|
|
"github.com/dgraph-io/badger/v2"
|
12
|
|
)
|
13
|
|
|
14
|
|
const (
|
15
|
|
RW = true
|
16
|
|
RO = false
|
17
|
|
)
|
18
|
|
|
19
|
|
// Tx is a transaction object which provides data management methods. The
|
20
|
|
// search index doesn't support transactions yet, so indexing operations just
|
21
|
|
// take effect after committing the transaction.
|
22
|
|
type Tx struct {
|
23
|
|
db *DB
|
24
|
|
dbTx *badger.Txn
|
25
|
|
si bleve.Index
|
26
|
|
rw bool
|
27
|
|
|
28
|
|
// Search index operations to be done when the transaction is committed.
|
29
|
|
operations map[string]interface{}
|
30
|
|
}
|
31
|
|
|
32
|
|
// NewTx creates a database transaction. If rw is false, the new transaction
|
33
|
|
// will be read-only.
|
34
|
1
|
func (db *DB) NewTx(rw bool) *Tx {
|
35
|
1
|
return &Tx{
|
36
|
1
|
db: db,
|
37
|
1
|
dbTx: db.db.NewTransaction(rw),
|
38
|
1
|
si: db.si,
|
39
|
1
|
rw: rw,
|
40
|
|
}
|
41
|
|
}
|
42
|
|
|
43
|
|
// Commit writes the transaction operations to the database. If a Bleve error
|
44
|
|
// is returned, the search index should be reloaded (see DB.ReloadIndex), keep
|
45
|
|
// the amount of operations per transaction low to avoid this.
|
46
|
1
|
func (tx *Tx) Commit() error {
|
47
|
1
|
if err := tx.dbTx.Commit(); err != nil {
|
48
|
0
|
return badgerError(err)
|
49
|
|
}
|
50
|
|
|
51
|
1
|
for id, data := range tx.operations {
|
52
|
1
|
if data != nil {
|
53
|
1
|
if err := tx.si.Index(id, data); err != nil {
|
54
|
0
|
return bleveError(err)
|
55
|
|
}
|
56
|
1
|
} else {
|
57
|
1
|
if err := tx.si.Delete(id); err != nil {
|
58
|
0
|
return bleveError(err)
|
59
|
|
}
|
60
|
|
}
|
61
|
|
}
|
62
|
|
|
63
|
1
|
return nil
|
64
|
|
}
|
65
|
|
|
66
|
|
// Delete deletes the given key. This operation happens in memory, it will be
|
67
|
|
// written to the database once Commit is called.
|
68
|
1
|
func (tx *Tx) Delete(key []byte) error {
|
69
|
1
|
if err := tx.dbTx.Delete(key); err != nil {
|
70
|
0
|
return badgerError(err)
|
71
|
|
}
|
72
|
|
|
73
|
1
|
if tx.operations == nil {
|
74
|
1
|
tx.operations = make(map[string]interface{})
|
75
|
|
}
|
76
|
|
|
77
|
1
|
tx.operations[string(key)] = nil
|
78
|
|
|
79
|
1
|
return nil
|
80
|
|
}
|
81
|
|
|
82
|
|
// Discard drops all the pending modifications and set the transactions as
|
83
|
|
// discarded.
|
84
|
1
|
func (tx *Tx) Discard() {
|
85
|
1
|
if tx.rw {
|
86
|
1
|
tx.operations = nil
|
87
|
|
}
|
88
|
|
|
89
|
1
|
tx.dbTx.Discard()
|
90
|
|
}
|
91
|
|
|
92
|
|
// Find fetches the keys from the values that satisfies the given constraints.
|
93
|
|
// See http://blevesearch.com/docs/Query-String-Query/ for more info about the
|
94
|
|
// the query language syntax. sort is a list of field names used for sorting,
|
95
|
|
// any field prefixed by a hyphen (-) will user reverse order.
|
96
|
1
|
func (tx *Tx) Find(q string, sort ...string) ([][]byte, error) {
|
97
|
1
|
if q == "" && len(sort) == 0 {
|
98
|
0
|
return nil, nil
|
99
|
|
}
|
100
|
|
|
101
|
1
|
result := [][]byte{}
|
102
|
1
|
bq := bleve.NewQueryStringQuery(q)
|
103
|
1
|
req := bleve.NewSearchRequest(bq)
|
104
|
1
|
req.SortBy(sort)
|
105
|
|
|
106
|
1
|
res, err := tx.si.Search(req)
|
107
|
1
|
if err != nil {
|
108
|
0
|
return nil, bleveError(err)
|
109
|
|
}
|
110
|
|
|
111
|
1
|
for _, hit := range res.Hits {
|
112
|
1
|
result = append(result, []byte(hit.ID))
|
113
|
|
}
|
114
|
|
|
115
|
1
|
return result, nil
|
116
|
|
}
|
117
|
|
|
118
|
|
// Get reads the value from the given key and decodes it into v (must be a
|
119
|
|
// pointer).
|
120
|
1
|
func (tx *Tx) Get(key []byte, v interface{}) error {
|
121
|
1
|
item, err := tx.dbTx.Get(key)
|
122
|
1
|
if errors.Is(err, badger.ErrKeyNotFound) {
|
123
|
1
|
return ErrKeyNotFound
|
124
|
1
|
} else if err != nil {
|
125
|
0
|
return badgerError(err)
|
126
|
|
}
|
127
|
|
|
128
|
1
|
buf := tx.db.buffers.Get()
|
129
|
1
|
defer tx.db.buffers.Add(buf)
|
130
|
|
|
131
|
1
|
data, err := item.ValueCopy(buf.Bytes())
|
132
|
1
|
if err != nil {
|
133
|
0
|
return err
|
134
|
|
}
|
135
|
|
|
136
|
1
|
return json.Unmarshal(data, v)
|
137
|
|
}
|
138
|
|
|
139
|
|
// Set set val as value of the given key. This operation happens in memory, it
|
140
|
|
// will be written to the database once Commit is called.
|
141
|
1
|
func (tx *Tx) Set(key []byte, val interface{}) error {
|
142
|
1
|
buf := tx.db.buffers.Get()
|
143
|
1
|
defer tx.db.buffers.Add(buf)
|
144
|
|
|
145
|
1
|
e := json.NewEncoder(buf)
|
146
|
1
|
if err := e.Encode(val); err != nil {
|
147
|
0
|
return err
|
148
|
|
}
|
149
|
|
|
150
|
1
|
if err := tx.dbTx.Set(key, buf.Bytes()); err != nil {
|
151
|
0
|
return badgerError(err)
|
152
|
|
}
|
153
|
|
|
154
|
1
|
if tx.operations == nil {
|
155
|
1
|
tx.operations = make(map[string]interface{})
|
156
|
|
}
|
157
|
|
|
158
|
1
|
tx.operations[string(key)] = val
|
159
|
|
|
160
|
1
|
return nil
|
161
|
|
}
|