ovrclk / akash

@@ -2,7 +2,6 @@
Loading
2 2
3 3
import (
4 4
	"bytes"
5 -
	"context"
6 5
	"crypto/tls"
7 6
	"encoding/json"
8 7
	"fmt"
@@ -98,7 +97,7 @@
Loading
98 97
			return err
99 98
		}
100 99
101 -
		err = gclient.SubmitManifest(context.Background(), dseq, mani)
100 +
		err = gclient.SubmitManifest(cmd.Context(), dseq, mani)
102 101
		res := result{
103 102
			Provider: prov,
104 103
			Status:   "PASS",

@@ -33,7 +33,39 @@
Loading
33 33
	return v.Interface()
34 34
}
35 35
36 +
const waitForDefault = 10 * time.Second
37 +
36 38
func ChannelWaitForValue(t *testing.T, waitOn interface{}) interface{} {
37 -
	const waitForDefault = 10 * time.Second
38 39
	return ChannelWaitForValueUpTo(t, waitOn, waitForDefault)
39 40
}
41 +
42 +
func ChannelWaitForCloseUpTo(t *testing.T, waitOn interface{}, waitFor time.Duration) {
43 +
	cases := make([]reflect.SelectCase, 2)
44 +
	cases[0] = reflect.SelectCase{
45 +
		Dir:  reflect.SelectRecv,
46 +
		Chan: reflect.ValueOf(waitOn),
47 +
		Send: reflect.Value{},
48 +
	}
49 +
50 +
	delayChan := time.After(waitFor)
51 +
52 +
	cases[1] = reflect.SelectCase{
53 +
		Dir:  reflect.SelectRecv,
54 +
		Chan: reflect.ValueOf(delayChan),
55 +
		Send: reflect.Value{},
56 +
	}
57 +
58 +
	idx, v, ok := reflect.Select(cases)
59 +
	if !ok {
60 +
		return // Channel closed, everything OK
61 +
	}
62 +
	if idx != 0 {
63 +
		t.Fatalf("channel not closed after waiting %v seconds", waitOn)
64 +
	}
65 +
66 +
	t.Fatalf("got unexpected message: %v", v.Interface())
67 +
}
68 +
69 +
func ChannelWaitForClose(t *testing.T, waitOn interface{}) {
70 +
	ChannelWaitForCloseUpTo(t, waitOn, waitForDefault)
71 +
}

@@ -27,9 +27,8 @@
Loading
27 27
28 28
// order manages bidding and general lifecycle handling of an order.
29 29
type order struct {
30 -
	orderID   mtypes.OrderID
31 -
	bidPlaced bool
32 -
	cfg       Config
30 +
	orderID mtypes.OrderID
31 +
	cfg     Config
33 32
34 33
	session                    session.Session
35 34
	cluster                    cluster.Cluster
@@ -96,7 +95,6 @@
Loading
96 95
	order := &order{
97 96
		cfg:                        cfg,
98 97
		orderID:                    oid,
99 -
		bidPlaced:                  false,
100 98
		session:                    session,
101 99
		cluster:                    svc.cluster,
102 100
		bus:                        svc.bus,
@@ -124,6 +122,33 @@
Loading
124 122
125 123
var matchBidNotFound = regexp.MustCompile("^.+bid not found.+$")
126 124
125 +
func (o *order) bidTimeoutEnabled() bool {
126 +
	return o.cfg.BidTimeout > time.Duration(0)
127 +
}
128 +
129 +
func (o *order) getBidTimeout() <-chan time.Time {
130 +
	if o.bidTimeoutEnabled() {
131 +
		return time.After(o.cfg.BidTimeout)
132 +
	}
133 +
134 +
	return nil
135 +
}
136 +
137 +
func (o *order) isStaleBid(bid mtypes.Bid) bool {
138 +
	if !o.bidTimeoutEnabled() {
139 +
		return false
140 +
	}
141 +
142 +
	// This bid could be very old, compute the minimum age of the bid
143 +
	// do not try anything clever here like asking the RPC node for the current height
144 +
	// just use the height from when the session is created
145 +
	createdAtBlock := bid.GetCreatedAt()
146 +
	blockAge := createdAtBlock - o.session.CreatedAtBlockHeight()
147 +
	const minTimePerBlock = 5 * time.Second
148 +
	atLeastThisOld := time.Duration(blockAge) * minTimePerBlock
149 +
	return atLeastThisOld > o.cfg.BidTimeout
150 +
}
151 +
127 152
func (o *order) run(checkForExistingBid bool) {
128 153
	defer o.lc.ShutdownCompleted()
129 154
	ctx, cancel := context.WithCancel(context.Background())
@@ -142,8 +167,9 @@
Loading
142 167
		group       *dtypes.Group
143 168
		reservation ctypes.Reservation
144 169
145 -
		won bool
146 -
		msg *mtypes.MsgCreateBid
170 +
		won       bool
171 +
		bidPlaced bool
172 +
		msg       *mtypes.MsgCreateBid
147 173
	)
148 174
149 175
	// Begin fetching group details immediately.
@@ -166,6 +192,7 @@
Loading
166 192
		storedGroupCh = groupch
167 193
		groupch = nil
168 194
	}
195 +
169 196
loop:
170 197
	for {
171 198
		select {
@@ -185,8 +212,22 @@
Loading
185 212
			}
186 213
187 214
			if bidFound {
188 -
				o.bidPlaced = true
189 -
				o.session.Log().Info("Found existing bid ")
215 +
				o.session.Log().Info("found existing bid")
216 +
				bidResponse := queryBid.Value().(*mtypes.QueryBidResponse)
217 +
				bid := bidResponse.GetBid()
218 +
				bidState := bid.GetState()
219 +
				if bidState != mtypes.BidOpen {
220 +
					o.session.Log().Error("bid in unexpected state", "bid-state", bidState)
221 +
					break loop
222 +
				}
223 +
				bidPlaced = true
224 +
225 +
				if o.isStaleBid(bid) {
226 +
					o.session.Log().Info("found expired bid", "block-height", bid.GetCreatedAt())
227 +
					break loop
228 +
				}
229 +
230 +
				bidTimeout = o.getBidTimeout()
190 231
			}
191 232
			groupch = storedGroupCh // Allow getting the group details result now
192 233
			storedGroupCh = nil
@@ -205,6 +246,7 @@
Loading
205 246
				if ev.ID.Provider != o.session.Provider().Address().String() {
206 247
					orderCompleteCounter.WithLabelValues("lease-lost").Inc()
207 248
					o.log.Info("lease lost", "lease", ev.ID)
249 +
					bidPlaced = false // Lease lost, network closes bid
208 250
					break loop
209 251
				}
210 252
				orderCompleteCounter.WithLabelValues("lease-won").Inc()
@@ -224,7 +266,6 @@
Loading
224 266
				break loop
225 267
226 268
			case mtypes.EventOrderClosed:
227 -
228 269
				// different deployment
229 270
				if !ev.ID.Equals(o.orderID) {
230 271
					break
@@ -233,6 +274,27 @@
Loading
233 274
				o.log.Info("order closed")
234 275
				orderCompleteCounter.WithLabelValues("order-closed").Inc()
235 276
				break loop
277 +
278 +
			case mtypes.EventBidClosed:
279 +
				if won {
280 +
					// Ignore any event after LeaseCreated
281 +
					continue
282 +
				}
283 +
284 +
				// Ignore bid closed not for this group
285 +
				if !o.orderID.GroupID().Equals(ev.ID.GroupID()) {
286 +
					break
287 +
				}
288 +
289 +
				// Ignore bid closed not for this provider
290 +
				if ev.ID.GetProvider() != o.session.Provider().String() {
291 +
					break
292 +
				}
293 +
294 +
				// Bid has been closed (possibly by someone manually closing it on the CLI)
295 +
				bidPlaced = false // bid already not on the blockchain
296 +
				orderCompleteCounter.WithLabelValues("bid-closed-external").Inc()
297 +
				break loop
236 298
			}
237 299
238 300
		case result := <-groupch:
@@ -300,7 +362,7 @@
Loading
300 362
301 363
			// Resources reserved.
302 364
			reservation = result.Value().(ctypes.Reservation)
303 -
			if o.bidPlaced {
365 +
			if bidPlaced {
304 366
				o.log.Info("Fulfillment already exists")
305 367
				// fulfillment already created (state recovered via queryExistingOrders)
306 368
				break
@@ -334,23 +396,21 @@
Loading
334 396
335 397
		case result := <-bidch:
336 398
			bidch = nil
337 -
			o.log.Info("bid complete")
338 -
339 399
			if result.Error() != nil {
340 400
				bidCounter.WithLabelValues(metricsutils.OpenLabel, metricsutils.FailLabel).Inc()
341 -
				o.log.Error("submitting fulfillment", "err", result.Error())
401 +
				o.log.Error("bid failed", "err", result.Error())
342 402
				break loop
343 403
			}
404 +
405 +
			o.log.Info("bid complete")
344 406
			bidCounter.WithLabelValues(metricsutils.OpenLabel, metricsutils.SuccessLabel).Inc()
345 407
346 408
			// Fulfillment placed.
347 -
			o.bidPlaced = true
348 -
349 -
			if o.cfg.BidTimeout > time.Duration(0) {
350 -
				bidTimeout = time.After(o.cfg.BidTimeout)
351 -
			}
409 +
			bidPlaced = true
352 410
411 +
			bidTimeout = o.getBidTimeout()
353 412
		case <-bidTimeout:
413 +
			// The bid was not acted upon (e.g. lease created or deployment closed) so close it now
354 414
			o.log.Info("bid timeout, closing bid")
355 415
			orderCompleteCounter.WithLabelValues("bid-timeout").Inc()
356 416
			break loop
@@ -380,7 +440,7 @@
Loading
380 440
			}
381 441
		}
382 442
383 -
		if o.bidPlaced {
443 +
		if bidPlaced {
384 444
			o.log.Debug("closing bid")
385 445
			err := o.session.Client().Tx().Broadcast(ctx, &mtypes.MsgCloseBid{
386 446
				BidID: mtypes.MakeBidID(o.orderID, o.session.Provider().Address()),

@@ -502,7 +502,7 @@
Loading
502 502
	)
503 503
504 504
	res, err := aclient.Query().Provider(
505 -
		context.Background(),
505 +
		cmd.Context(),
506 506
		&ptypes.QueryProviderRequest{Owner: info.GetAddress().String()},
507 507
	)
508 508
	if err != nil {
@@ -536,7 +536,12 @@
Loading
536 536
		return err
537 537
	}
538 538
539 -
	session := session.New(log, aclient, pinfo)
539 +
	statusResult, err := cctx.Client.Status(cmd.Context())
540 +
	if err != nil {
541 +
		return err
542 +
	}
543 +
	currentBlockHeight := statusResult.SyncInfo.LatestBlockHeight
544 +
	session := session.New(log, aclient, pinfo, currentBlockHeight)
540 545
541 546
	if err := cctx.Client.Start(); err != nil {
542 547
		return err

@@ -1,8 +1,6 @@
Loading
1 1
package cmd
2 2
3 3
import (
4 -
	"context"
5 -
6 4
	sdkclient "github.com/cosmos/cosmos-sdk/client"
7 5
	sdk "github.com/cosmos/cosmos-sdk/types"
8 6
	"github.com/spf13/cobra"
@@ -42,7 +40,7 @@
Loading
42 40
		return err
43 41
	}
44 42
45 -
	result, err := gclient.Status(context.Background())
43 +
	result, err := gclient.Status(cmd.Context())
46 44
	if err != nil {
47 45
		return showErrorToUser(err)
48 46
	}

@@ -13,21 +13,24 @@
Loading
13 13
	Client() client.Client
14 14
	Provider() *ptypes.Provider
15 15
	ForModule(string) Session
16 +
	CreatedAtBlockHeight() int64
16 17
}
17 18
18 19
// New returns new session instance with provided details
19 -
func New(log log.Logger, client client.Client, provider *ptypes.Provider) Session {
20 +
func New(log log.Logger, client client.Client, provider *ptypes.Provider, createdAtBlockHeight int64) Session {
20 21
	return session{
21 -
		client:   client,
22 -
		provider: provider,
23 -
		log:      log,
22 +
		client:               client,
23 +
		provider:             provider,
24 +
		log:                  log,
25 +
		createdAtBlockHeight: createdAtBlockHeight,
24 26
	}
25 27
}
26 28
27 29
type session struct {
28 -
	client   client.Client
29 -
	provider *ptypes.Provider
30 -
	log      log.Logger
30 +
	client               client.Client
31 +
	provider             *ptypes.Provider
32 +
	log                  log.Logger
33 +
	createdAtBlockHeight int64
31 34
}
32 35
33 36
func (s session) Log() log.Logger {
@@ -46,3 +49,7 @@
Loading
46 49
	s.log = s.log.With("module", name)
47 50
	return s
48 51
}
52 +
53 +
func (s session) CreatedAtBlockHeight() int64 {
54 +
	return s.createdAtBlockHeight
55 +
}

@@ -99,5 +99,10 @@
Loading
99 99
	}
100 100
	cancel()
101 101
102 +
	if result != nil {
103 +
		// The context has been cancelled, so wait for the result now and discard it
104 +
		<-result
105 +
	}
106 +
102 107
	dw.log.Debug("shutdown complete")
103 108
}

@@ -98,6 +98,7 @@
Loading
98 98
99 99
	go func() {
100 100
		<-dm.lc.Done()
101 +
		dm.log.Debug("sending manager into channel")
101 102
		s.managerch <- dm
102 103
	}()
103 104
@@ -147,6 +148,7 @@
Loading
147 148
		if err != nil {
148 149
			dm.log.Error("failed releasing hostnames", "err", err)
149 150
		}
151 +
		dm.log.Debug("hostnames released")
150 152
	}()
151 153
152 154
loop:
@@ -212,6 +214,7 @@
Loading
212 214
		}
213 215
	}
214 216
217 +
	dm.log.Debug("shutting down")
215 218
	dm.lc.ShutdownInitiated(shutdownErr)
216 219
217 220
	if runch != nil {
Files Coverage
app 77.28%
client 2.84%
cmd 8.55%
deploy/cmd 0.00%
events 8.43%
manifest 80.95%
provider 33.65%
pubsub 89.18%
sdkutil 38.01%
sdl 70.68%
testutil 90.36%
types 28.34%
util 43.11%
x 37.73%
docgen/main.go 0.00%
integration/test_helpers.go 5.40%
validation/manifest.go 94.78%
Project Totals (357 files) 38.31%

No yaml found.

Create your codecov.yml to customize your Codecov experience

Sunburst
The inner-most circle is the entire project, moving away from the center are folders then, finally, a single file. The size and color of each slice is representing the number of statements and the coverage, respectively.
Icicle
The top section represents the entire project. Proceeding with folders and finally individual files. The size and color of each slice is representing the number of statements and the coverage, respectively.
Grid
Each block represents a single file in the project. The size and color of each block is represented by the number of statements and the coverage, respectively.
Loading