1
// Copyright 2019 Miguel Angel Rivera Notararigo. All rights reserved.
2
// This source code was released under the MIT license.
3

4
package sdb
5

6
import (
7
	"errors"
8

9
	"github.com/blevesearch/bleve"
10
	"github.com/blevesearch/bleve/search/query"
11
	"github.com/dgraph-io/badger/v2"
12
	"github.com/niubaoshu/gotiny"
13
)
14

15
const (
16
	RW = true
17
	RO = false
18
)
19

20
// Tx is a transaction object which provides data management methods. The
21
// search index doesn't support transactions yet, so indexing operations just
22
// take effect after committing the transaction.
23
type Tx struct {
24
	db   *DB
25
	dbTx *badger.Txn
26
	si   bleve.Index
27
	rw   bool
28

29
	// Search index operations to be done when the transaction is committed.
30
	operations map[string]interface{}
31
}
32

33
// NewTx creates a database transaction. If rw is false, the new transaction
34
// will be read-only.
35 1
func (db *DB) NewTx(rw bool) *Tx {
36 1
	return &Tx{
37 1
		db:   db,
38 1
		dbTx: db.db.NewTransaction(rw),
39 1
		si:   db.si,
40 1
		rw:   rw,
41
	}
42
}
43

44
// Commit writes the transaction operations to the database. If a Bleve error
45
// is returned, the search index should be reloaded (see DB.ReloadIndex), keep
46
// the amount of operations per transaction low to avoid this.
47 1
func (tx *Tx) Commit() error {
48 1
	if err := tx.dbTx.Commit(); err != nil {
49 0
		return badgerError(err)
50
	}
51

52 1
	for id, data := range tx.operations {
53 1
		if data != nil {
54 1
			if err := tx.si.Index(id, data); err != nil {
55 0
				return bleveError(err)
56
			}
57 1
		} else {
58 1
			if err := tx.si.Delete(id); err != nil {
59 0
				return bleveError(err)
60
			}
61
		}
62
	}
63

64 1
	return nil
65
}
66

67
// Delete deletes the given key. This operation happens in memory, it will be
68
// written to the database once Commit is called.
69 1
func (tx *Tx) Delete(key []byte) error {
70 1
	if err := tx.dbTx.Delete(key); err != nil {
71 0
		return badgerError(err)
72
	}
73

74 1
	if tx.operations == nil {
75 1
		tx.operations = make(map[string]interface{})
76
	}
77

78 1
	tx.operations[string(key)] = nil
79

80 1
	return nil
81
}
82

83
// Discard drops all the pending modifications and set the transactions as
84
// discarded.
85 1
func (tx *Tx) Discard() {
86 1
	if tx.rw {
87 1
		tx.operations = nil
88
	}
89

90 1
	tx.dbTx.Discard()
91
}
92

93
// Find fetches the keys from the values that satisfies the given constraints.
94
// See http://blevesearch.com/docs/Query-String-Query/ for more info about the
95
// the query language syntax. sort is a list of field names used for sorting,
96
// any field prefixed by a hyphen (-) will user reverse order.
97 1
func (tx *Tx) Find(q string, sort ...string) ([][]byte, error) {
98 1
	var bq query.Query
99

100 1
	if q == "" {
101 1
		bq = bleve.NewMatchAllQuery()
102 1
	} else {
103 1
		bq = bleve.NewQueryStringQuery(q)
104
	}
105

106 1
	req := bleve.NewSearchRequest(bq)
107

108 1
	if len(sort) > 0 {
109 0
		req.SortBy(sort)
110
	}
111

112 1
	res, err := tx.si.Search(req)
113 1
	if err != nil {
114 0
		return nil, bleveError(err)
115
	}
116

117 1
	result := [][]byte{}
118 1
	for _, hit := range res.Hits {
119 1
		result = append(result, []byte(hit.ID))
120
	}
121

122 1
	return result, nil
123
}
124

125
// Get reads the value from the given key and decodes it into v. v must be a
126
// pointer.
127 1
func (tx *Tx) Get(key []byte, v interface{}) error {
128 1
	item, err := tx.dbTx.Get(key)
129 1
	if errors.Is(err, badger.ErrKeyNotFound) {
130 1
		return ErrKeyNotFound
131 1
	} else if err != nil {
132 0
		return badgerError(err)
133
	}
134

135 1
	buf := tx.db.buffers.Get()
136 1
	defer tx.db.buffers.Add(buf)
137

138 1
	data, err := item.ValueCopy(buf.Bytes())
139 1
	if err != nil {
140 0
		return badgerError(err)
141
	}
142

143 1
	gotiny.Unmarshal(data, v)
144

145 1
	return nil
146
}
147

148
// Prefix fetches all the keys from the database with the given prefix.
149 1
func (tx *Tx) Prefix(prefix []byte) [][]byte {
150 1
	it := tx.dbTx.NewIterator(badger.DefaultIteratorOptions)
151 1
	defer it.Close()
152

153 1
	result := [][]byte{}
154 1
	for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() {
155 1
		buf := tx.db.buffers.Get()
156 1
		result = append(result, it.Item().KeyCopy(buf.Bytes()))
157 1
		tx.db.buffers.Add(buf)
158
	}
159

160 1
	return result
161
}
162

163
// Set set v as value of the given key. This operation happens in memory, it
164
// will be written to the database once Commit is called. v must be a pointer.
165 1
func (tx *Tx) Set(key []byte, v interface{}) (err error) {
166 1
	defer func() {
167 1
		if r := recover(); r != nil {
168 1
			err = ErrValMustBePointer
169
		}
170
	}()
171

172 1
	data := gotiny.Marshal(v)
173

174 1
	if err = tx.dbTx.Set(key, data); err != nil {
175 1
		return badgerError(err)
176
	}
177

178 1
	if tx.operations == nil {
179 1
		tx.operations = make(map[string]interface{})
180
	}
181

182 1
	tx.operations[string(key)] = v
183

184 1
	return nil
185
}

Read our documentation on viewing source code .

Loading