1
package org.zalando.zhewbacca
2

3
import java.util.concurrent.atomic.AtomicInteger
4
import javax.inject.{Inject, Singleton}
5

6
import akka.actor.ActorSystem
7
import akka.pattern.CircuitBreaker
8
import org.zalando.zhewbacca.metrics.PluggableMetrics
9
import play.api.http.Status._
10
import play.api.libs.ws.WSClient
11
import play.api.{Configuration, Logger}
12

13
import scala.concurrent.duration._
14
import scala.concurrent.{ExecutionContext, Future}
15
import scala.util.control.NonFatal
16

17
import atmos.dsl._
18
import atmos.dsl.Slf4jSupport._
19

20
/**
21
  * Retrieves TokenInfo for given OAuth2 token using IAM API.
22
  *
23
  * Class applies a Circuit Breaker pattern, so it must be a singleton in the client's code. Implementation
24
  * depends on Play infrastructure so it will work only in a context of running application.
25
  *
26
  * @param config Play config to get configuration parameters for WS client and circuit breaker
27
  */
28
@Singleton
29
class IAMClient @Inject() (
30
    config: Configuration,
31
    pluggableMetrics: PluggableMetrics,
32
    ws: WSClient,
33
    actorSystem: ActorSystem,
34
    implicit val ec: ExecutionContext) extends (OAuth2Token => Future[Option[TokenInfo]]) {
35

36 2
  val logger: Logger = Logger("security.IAMClient")
37

38 2
  val METRICS_BREAKER_CLOSED = 0
39 2
  val METRICS_BREAKER_OPEN = 1
40 2
  val circuitStatus = new AtomicInteger()
41

42 2
  pluggableMetrics.gauge {
43 0
    circuitStatus.get
44
  }
45

46 2
  val authEndpoint: String = config.getOptional[String]("authorisation.iam.endpoint").getOrElse(
47 0
    throw new IllegalArgumentException("Authorisation: IAM endpoint is not configured"))
48

49 2
  val breakerMaxFailures: Int = config.getOptional[Int]("authorisation.iam.cb.maxFailures").getOrElse(
50 0
    throw new IllegalArgumentException("Authorisation: Circuit Breaker max failures is not configured"))
51

52 2
  val breakerCallTimeout: FiniteDuration = config.getOptional[FiniteDuration]("authorisation.iam.cb.callTimeout").getOrElse(
53 0
    throw new IllegalArgumentException("Authorisation: Circuit Breaker call timeout is not configured"))
54

55 2
  val breakerResetTimeout: FiniteDuration = config.getOptional[FiniteDuration]("authorisation.iam.cb.resetTimeout").getOrElse(
56 0
    throw new IllegalArgumentException("Authorisation: Circuit Breaker reset timeout is not configured"))
57

58 2
  val breakerMaxRetries: TerminationPolicy = config.getOptional[Int]("authorisation.iam.maxRetries").getOrElse(
59 2
    throw new IllegalArgumentException("Authorisation: Circuit Breaker max retries is not configured")).attempts
60

61 2
  val breakerRetryBackoff: FiniteDuration = config.getOptional[FiniteDuration]("authorisation.iam.retry.backoff.duration").getOrElse(
62 0
    throw new IllegalArgumentException("Authorisation: Circuit Breaker the duration of exponential backoff is not configured"))
63

64 2
  lazy val breaker: CircuitBreaker = new CircuitBreaker(
65 2
    actorSystem.scheduler,
66 2
    breakerMaxFailures,
67 2
    breakerCallTimeout,
68 2
    breakerResetTimeout).onHalfOpen {
69 2
    circuitStatus.set(METRICS_BREAKER_OPEN)
70
  }.onOpen {
71 2
    circuitStatus.set(METRICS_BREAKER_OPEN)
72 2
  }.onClose {
73 0
    circuitStatus.set(METRICS_BREAKER_CLOSED)
74
  }
75

76 2
  implicit val retryRecover = retryFor { breakerMaxRetries } using {
77 2
    exponentialBackoff { breakerRetryBackoff }
78 2
  } monitorWith {
79 2
    logger.logger onRetrying logNothing onInterrupted logWarning onAborted logError
80
  }
81

82
  override def apply(token: OAuth2Token): Future[Option[TokenInfo]] = {
83
    breaker.withCircuitBreaker(
84 2
      pluggableMetrics.timing(
85 2
        retryAsync(s"Calling $authEndpoint") {
86 2
          ws.url(authEndpoint).withQueryStringParameters(("access_token", token.value)).get()
87 2
        })).map { response =>
88 2
        response.status match {
89 2
          case OK => Some(response.json.as[TokenInfo])
90 0
          case _ => None
91
        }
92 2
      } recover {
93
        case NonFatal(e) =>
94 2
          logger.error(s"Exception occurred during validation of token '${token.toSafeString}': $e")
95 2
          None // consider any exception as invalid token
96
      }
97
  }
98

99
}

Read our documentation on viewing source code .

Loading