1
package io.finch
2

3
import java.nio.charset.Charset
4

5
import cats.effect.{Async, Effect, IO}
6
import com.twitter.io._
7
import com.twitter.util.Future
8
import io.finch.internal._
9
import io.iteratee.{Enumerator, Iteratee}
10

11
package object iteratee extends IterateeInstances {
12

13
  implicit def enumeratorLiftReader[F[_]](implicit
14
      F: Async[F],
15
      TE: ToAsync[Future, F]
16
  ): LiftReader[Enumerator, F] =
17 1
    new LiftReader[Enumerator, F] {
18
      final def apply[A](reader: Reader[Buf], process: Buf => A): Enumerator[F, A] = {
19
        def loop(): Enumerator[F, A] =
20 1
          Enumerator.liftM[F, Option[Buf]](F.suspend(TE(reader.read()))).flatMap {
21
            case None      => Enumerator.empty[F, A]
22
            case Some(buf) => Enumerator.enumOne[F, A](process(buf)).append(loop())
23
          }
24

25 1
        loop().ensure(F.delay(reader.discard()))
26
      }
27
    }
28

29
  implicit def encodeJsonEnumerator[F[_]: Effect, A](implicit
30
      A: Encode.Json[A],
31
      TE: ToAsync[Future, F]
32
  ): EncodeStream.Json[F, Enumerator, A] =
33 0
    new EncodeNewLineDelimitedEnumerator[F, A, Application.Json]
34

35
  implicit def encodeSseEnumerator[F[_]: Effect, A](implicit
36
      A: Encode.Aux[A, Text.EventStream],
37
      TE: ToAsync[Future, F]
38
  ): EncodeStream.Aux[F, Enumerator, A, Text.EventStream] =
39 0
    new EncodeNewLineDelimitedEnumerator[F, A, Text.EventStream]
40

41
  implicit def encodeTextEnumerator[F[_]: Effect, A](implicit
42
      A: Encode.Text[A],
43
      TE: ToAsync[Future, F]
44
  ): EncodeStream.Text[F, Enumerator, A] =
45 1
    new EncodeEnumerator[F, A, Text.Plain] {
46 1
      override protected def encodeChunk(chunk: A, cs: Charset): Buf = A(chunk, cs)
47
    }
48

49
}
50

51
trait IterateeInstances {
52

53
  final protected class EncodeNewLineDelimitedEnumerator[F[_]: Effect, A, CT <: String](implicit
54
      A: Encode.Aux[A, CT],
55
      TE: ToAsync[Future, F]
56
  ) extends EncodeEnumerator[F, A, CT] {
57
    protected def encodeChunk(chunk: A, cs: Charset): Buf =
58 0
      A(chunk, cs).concat(newLine(cs))
59
  }
60

61
  abstract protected class EncodeEnumerator[F[_], A, CT <: String](implicit
62
      F: Effect[F],
63
      TE: ToAsync[Future, F]
64
  ) extends EncodeStream[F, Enumerator, A]
65
      with (Either[Throwable, Unit] => IO[Unit]) {
66

67
    type ContentType = CT
68

69
    protected def encodeChunk(chunk: A, cs: Charset): Buf
70

71
    private def writeIteratee(w: Writer[Buf]): Iteratee[F, Buf, Unit] =
72 1
      Iteratee.foreachM[F, Buf](chunk => TE(w.write(chunk)))
73

74 1
    final def apply(cb: Either[Throwable, Unit]): IO[Unit] = IO.unit
75

76
    def apply(s: Enumerator[F, A], cs: Charset): F[Reader[Buf]] = {
77 1
      val p = new Pipe[Buf]
78 1
      val run = s.ensure(F.suspend(TE(p.close()))).map(chunk => encodeChunk(chunk, cs)).into(writeIteratee(p))
79

80 1
      F.productR(F.runAsync(run)(this).to[F])(F.pure(p))
81
    }
82
  }
83

84
  implicit def encodeBufEnumerator[F[_]: Effect, CT <: String]: EncodeStream.Aux[F, Enumerator, Buf, CT] =
85 0
    new EncodeEnumerator[F, Buf, CT] {
86
      protected def encodeChunk(chunk: Buf, cs: Charset): Buf = chunk
87
    }
88
}

Read our documentation on viewing source code .

Loading