1 2
import { create as createLogger } from '../common/log'
2 2
const log = createLogger('throughput-middleware')
3
import { Middleware, MiddlewareCallback, MiddlewareServices, Pipelines } from '../types/middleware'
4
import { AccountInfo } from '../types/accounts'
5 2
import TokenBucket from '../lib/token-bucket'
6 2
import * as IlpPacket from 'ilp-packet'
7 2
const { InsufficientLiquidityError } = IlpPacket.Errors
8

9 2
const DEFAULT_REFILL_PERIOD = 1000 // 1 second
10

11 2
export default class ThroughputMiddleware implements Middleware {
12
  private getInfo: (accountId: string) => AccountInfo
13

14
  constructor (opts: {}, { getInfo }: MiddlewareServices) {
15 2
    this.getInfo = getInfo
16
  }
17

18
  async applyToPipelines (pipelines: Pipelines, accountId: string) {
19 2
    const accountInfo = this.getInfo(accountId)
20 2
    if (!accountInfo) {
21 0
      throw new Error('could not load info for account. accountId=' + accountId)
22
    }
23

24 2
    if (accountInfo.throughput) {
25
      const {
26 2
        refillPeriod = DEFAULT_REFILL_PERIOD,
27 2
        incomingAmount = false,
28 2
        outgoingAmount = false
29 2
      } = accountInfo.throughput || {}
30

31 2
      if (incomingAmount) {
32
        // TODO: When we add the ability to update middleware, our state will get
33
        //   reset every update, which may not be desired.
34 0
        const incomingBucket = new TokenBucket({ refillPeriod, refillCount: Number(incomingAmount) })
35 0
        log.trace('created incoming amount limit token bucket for account. accountId=%s refillPeriod=%s incomingAmount=%s', accountId, refillPeriod, incomingAmount)
36

37 0
        pipelines.incomingData.insertLast({
38
          name: 'throughput',
39
          method: async (data: Buffer, next: MiddlewareCallback<Buffer, Buffer>) => {
40 2
            if (data[0] === IlpPacket.Type.TYPE_ILP_PREPARE) {
41 0
              const parsedPacket = IlpPacket.deserializeIlpPrepare(data)
42

43
              // TODO: Do we need a BigNumber-based token bucket?
44 2
              if (!incomingBucket.take(Number(parsedPacket.amount))) {
45 0
                throw new InsufficientLiquidityError('exceeded money bandwidth, throttling.')
46
              }
47

48 0
              return next(data)
49
            } else {
50 0
              return next(data)
51
            }
52
          }
53
        })
54
      }
55

56 2
      if (outgoingAmount) {
57
        // TODO: When we add the ability to update middleware, our state will get
58
        //   reset every update, which may not be desired.
59 2
        const incomingBucket = new TokenBucket({ refillPeriod, refillCount: Number(outgoingAmount) })
60 2
        log.trace('created outgoing amount limit token bucket for account. accountId=%s refillPeriod=%s outgoingAmount=%s', accountId, refillPeriod, outgoingAmount)
61

62 2
        pipelines.outgoingData.insertLast({
63
          name: 'throughput',
64
          method: async (data: Buffer, next: MiddlewareCallback<Buffer, Buffer>) => {
65 2
            if (data[0] === IlpPacket.Type.TYPE_ILP_PREPARE) {
66 2
              const parsedPacket = IlpPacket.deserializeIlpPrepare(data)
67

68
              // TODO: Do we need a BigNumber-based token bucket?
69 2
              if (!incomingBucket.take(Number(parsedPacket.amount))) {
70 2
                throw new InsufficientLiquidityError('exceeded money bandwidth, throttling.')
71
              }
72

73 2
              return next(data)
74
            } else {
75 0
              return next(data)
76
            }
77
          }
78
        })
79
      }
80
    }
81
  }
82
}

Read our documentation on viewing source code .

Loading