1
package hammock
2
package asynchttpclient
3

4
import cats._
5
import cats.implicits._
6
import cats.data.Kleisli
7
import cats.effect._
8
import org.asynchttpclient._
9
import java.util.{concurrent => jc}
10
import scala.util._
11
import scala.collection.JavaConverters._
12

13
object AsyncHttpClientInterpreter {
14

15
  def apply[F[_]](implicit F: InterpTrans[F]): InterpTrans[F] = F
16

17
  implicit def instance[F[_]: Async](
18
      implicit client: AsyncHttpClient = new DefaultAsyncHttpClient()
19 0
  ): InterpTrans[F] = new InterpTrans[F] {
20 0
    override def trans: HttpF ~> F = transK andThen λ[Kleisli[F, AsyncHttpClient, *] ~> F](_.run(client))
21
  }
22

23
  def transK[F[_]: Async]: HttpF ~> Kleisli[F, AsyncHttpClient, *] = {
24

25
    def toF[A](future: jc.Future[A]): F[A] =
26 0
      Async[F].async(_(Try(future.get) match {
27 0
        case Failure(err) => Left(err)
28 0
        case Success(a)   => Right(a)
29
      }))
30

31 0
    λ[HttpF ~> Kleisli[F, AsyncHttpClient, *]] {
32
      case reqF @ (Get(_) | Options(_) | Delete(_) | Head(_) | Options(_) | Trace(_) | Post(_) | Put(_) | Patch(_)) =>
33 0
        Kleisli { implicit client =>
34
          for {
35 0
            req             <- mapRequest[F](reqF)
36 0
            ahcResponse     <- toF(req.execute())
37 0
            hammockResponse <- mapResponse[F](ahcResponse)
38
          } yield hammockResponse
39
        }
40
    }
41
  }
42

43
  def mapRequest[F[_]: Async](reqF: HttpF[HttpResponse])(implicit client: AsyncHttpClient): F[BoundRequestBuilder] = {
44

45
    def putHeaders(req: BoundRequestBuilder, headers: Map[String, String]): F[Unit] =
46 1
      Async[F].delay {
47 1
        req.setSingleHeaders(headers.map(kv => kv._1.asInstanceOf[CharSequence] -> kv._2).asJava)
48 1
      } *> ().pure[F]
49

50
    def getBuilder(reqF: HttpF[HttpResponse]): BoundRequestBuilder = reqF match {
51 1
      case Get(_)     => client.prepareGet(reqF.req.uri.show)
52 0
      case Delete(_)  => client.prepareDelete(reqF.req.uri.show)
53 0
      case Head(_)    => client.prepareHead(reqF.req.uri.show)
54 0
      case Options(_) => client.prepareOptions(reqF.req.uri.show)
55 1
      case Post(_)    => client.preparePost(reqF.req.uri.show)
56 1
      case Put(_)     => client.preparePut(reqF.req.uri.show)
57 0
      case Trace(_)   => client.prepareTrace(reqF.req.uri.show)
58 0
      case Patch(_)   => client.preparePatch(reqF.req.uri.show)
59
    }
60

61
    for {
62 1
      req <- getBuilder(reqF).pure[F]
63 1
      _   <- putHeaders(req, reqF.req.headers)
64
      _ = reqF.req.entity
65 1
        .foreach(_.cata(str => req.setBody(str.content), bytes => req.setBody(bytes.content), Function.const(())))
66
    } yield req
67
  }
68

69
  def mapResponse[F[_]: Applicative](ahcResponse: Response): F[HttpResponse] = {
70

71 1
    def createEntity(r: Response): Entity = r.getContentType match {
72 1
      case AsyncHttpClientContentType.`application/octet-stream` => Entity.ByteArrayEntity(r.getResponseBodyAsBytes)
73 1
      case _                                                     => Entity.StringEntity(r.getResponseBody)
74
    }
75

76
    HttpResponse(
77
      Status.Statuses(ahcResponse.getStatusCode),
78
      ahcResponse.getHeaders.names.asScala.map(name => (name, ahcResponse.getHeaders.get(name))).toMap,
79
      createEntity(ahcResponse)
80 1
    ).pure[F]
81
  }
82

83
}

Read our documentation on viewing source code .

Loading