Showing 73 of 246 files from the diff.
Other files ignored by Codecov
.travis.yml has changed.
CONTRIBUTING.md has changed.
CHANGELOG.rst has changed.
README.md has changed.
OWNERS was deleted.
GROUPS was deleted.
build.sbt has changed.

@@ -212,24 +212,37 @@
Loading
212 212
        }
213 213
214 214
        val realLength = r.content.length
215 +
        // see https://tools.ietf.org/html/rfc7230#section-3.3.2
215 216
        contentLengthHeader match {
216 217
          case Some(l) if realLength != l =>
217 218
            // need to clean up the content length header
218 219
            result.headers.set(NettyHttp.HttpHeaderNames.CONTENT_LENGTH, realLength.toString)
219 220
220 221
          case None if realLength > 0 =>
221 -
            // Only set the content length if we are sure there is content. This
222 -
            // behavior complies with the specification that user agents should not
223 -
            // set the content length header for messages without a payload body.
222 +
            // Set the content length if we are sure there is content.
223 +
            result.headers.set(NettyHttp.HttpHeaderNames.CONTENT_LENGTH, realLength.toString)
224 +
225 +
          case None if shouldHaveLengthHeader(r.method) =>
226 +
            // RFC 7230: "A user agent SHOULD send a Content-Length in a request message
227 +
            // when no Transfer-Encoding is sent and the request method defines a meaning
228 +
            // for an enclosed payload body."
224 229
            result.headers.set(NettyHttp.HttpHeaderNames.CONTENT_LENGTH, realLength.toString)
225 230
226 231
          case _ =>
227 232
          // NOP. Either the content length header already matches or
228 -
          // it doesn't exist and there was no content, so there is nothing to do.
233 +
          // it doesn't exist for messages that should not have 0 value (see allowEmpty),
234 +
          // so there is nothing to do.
229 235
        }
230 236
231 237
        result
232 238
      }
233 239
    }
240 +
241 +
    private[this] def shouldHaveLengthHeader(method: Method): Boolean = {
242 +
      method match {
243 +
        case Method.Post | Method.Put | Method.Patch => true
244 +
        case _ => false
245 +
      }
246 +
    }
234 247
  }
235 248
}

@@ -5,6 +5,7 @@
Loading
5 5
import com.twitter.finagle.naming.BindingFactory
6 6
import com.twitter.finagle.param._
7 7
import com.twitter.finagle.stack.nilStack
8 +
import com.twitter.finagle.stats.{Client, RoleConfiguredStatsReceiver}
8 9
import com.twitter.finagle.util.Showable
9 10
10 11
/**
@@ -127,16 +128,18 @@
Loading
127 128
      val baseStack = stack ++ (endpointer +: nilStack)
128 129
      params[RequestLogger.Param] match {
129 130
        case RequestLogger.Param.Enabled =>
130 -
          val tranformer = RequestLogger.newStackTransformer(clientLabel)
131 -
          tranformer(baseStack)
131 +
          val transformer = RequestLogger.newStackTransformer(clientLabel)
132 +
          transformer(baseStack)
132 133
        case RequestLogger.Param.Disabled =>
133 134
          baseStack
134 135
      }
135 136
    }
136 137
138 +
    val clientSr = new RoleConfiguredStatsReceiver(stats.scope(clientLabel), Client)
139 +
137 140
    val clientParams = params +
138 141
      Label(clientLabel) +
139 -
      Stats(stats.scope(clientLabel)) +
142 +
      Stats(clientSr) +
140 143
      BindingFactory.Dest(dest)
141 144
142 145
    clientStack.make(clientParams)

@@ -14,5 +14,30 @@
Loading
14 14
 *
15 15
 * @see `com.twitter.finagle.http.service.HttpResponseClassifier`
16 16
 * @see `com.twitter.finagle.thriftmux.service.ThriftMuxResponseClassifier`
17 +
 * @see [[ReqRepT]] for a type-safe version
17 18
 */
18 -
case class ReqRep(request: Any, response: Try[Any])
19 +
object ReqRep {
20 +
  def apply(req: Any, rep: Try[Any]): ReqRep = ReqRepT[Any, Any](req, rep)
21 +
  def unapply(reqRep: ReqRep): Option[(Any, Try[Any])] = Some((reqRep.request, reqRep.response))
22 +
}
23 +
24 +
/**
25 +
 * A type-safe request/response pair.
26 +
 *
27 +
 * @see [[ReqRep]] for untyped
28 +
 */
29 +
case class ReqRepT[Req, Rep](request: Req, response: Try[Rep]) extends ReqRep {
30 +
  type Request = Req
31 +
  type Response = Rep
32 +
}
33 +
34 +
// keeps backward compatibility behavior for when ReqRep was a case class,
35 +
// we can't leak generics here without risking major breaking changes.
36 +
// this can go away when we move everything to ReqRepT version.
37 +
// we use `sealed abstract class` for better Scala 2.11 compatibility.
38 +
sealed abstract class ReqRep {
39 +
  type Request
40 +
  type Response
41 +
  def request: Request
42 +
  def response: Try[Response]
43 +
}

@@ -225,7 +225,7 @@
Loading
225 225
    if (curNameUsage == null) {
226 226
      val next = new Metrics.StoreGaugeImpl(formatted, f)
227 227
      gaugesMap.putIfAbsent(schema.metricBuilder.name, next)
228 -
      metricSchemas.put(formatted, schema)
228 +
      metricSchemas.putIfAbsent(formatted, schema)
229 229
230 230
      if (schema.metricBuilder.verbosity != Verbosity.Default) {
231 231
        verbosityMap.put(formatted, schema.metricBuilder.verbosity)
@@ -236,6 +236,7 @@
Loading
236 236
      // we replace existing gauges to support commons metrics behavior.
237 237
      val next = new Metrics.StoreGaugeImpl(formatted, f)
238 238
      gaugesMap.put(schema.metricBuilder.name, next)
239 +
      metricSchemas.put(formatted, schema)
239 240
    } else {
240 241
      throw new MetricCollisionException(
241 242
        s"A Counter with the name $formatted had already" +

@@ -1,35 +1,18 @@
Loading
1 1
package com.twitter.finagle.zipkin.thrift
2 2
3 3
import com.twitter.conversions.StorageUnitOps._
4 -
import com.twitter.conversions.DurationOps._
5 -
import com.twitter.finagle.Thrift
6 -
import com.twitter.finagle.service.{
7 -
  ResponseClassifier,
8 -
  ResponseClass,
9 -
  RetryBudget,
10 -
  RetryFilter,
11 -
  RetryPolicy,
12 -
  ReqRep,
13 -
  RequeueFilter,
14 -
  StatsFilter
15 -
}
16 -
import com.twitter.finagle.stats.{
17 -
  DenylistStatsReceiver,
18 -
  ClientStatsReceiver,
19 -
  NullStatsReceiver,
20 -
  StatsReceiver
21 -
}
4 +
import com.twitter.finagle.scribe.{Publisher, ScribeStats}
5 +
import com.twitter.finagle.stats.{DefaultStatsReceiver, StatsReceiver}
22 6
import com.twitter.finagle.thrift.Protocols
23 -
import com.twitter.finagle.thrift.scribe.thriftscala.{LogEntry, ResultCode, Scribe}
24 -
import com.twitter.finagle.tracing.{NullTracer, TracelessFilter}
7 +
import com.twitter.finagle.thrift.scribe.thriftscala.{LogEntry, Scribe}
25 8
import com.twitter.finagle.util.DefaultTimer
26 -
import com.twitter.finagle.zipkin.{host => Host}
27 9
import com.twitter.finagle.zipkin.core.{RawZipkinTracer, Span, TracerCache}
10 +
import com.twitter.finagle.zipkin.{host => Host}
28 11
import com.twitter.scrooge.TReusableMemoryTransport
29 12
import com.twitter.util._
30 13
import java.nio.charset.StandardCharsets
14 +
import java.util.concurrent.ArrayBlockingQueue
31 15
import java.util.{Arrays, Base64}
32 -
import java.util.concurrent.{ArrayBlockingQueue, TimeUnit}
33 16
import org.apache.thrift.TByteArrayOutputStream
34 17
import org.apache.thrift.protocol.TProtocol
35 18
import scala.collection.mutable.ArrayBuffer
@@ -37,93 +20,7 @@
Loading
37 20
38 21
object ScribeRawZipkinTracer {
39 22
  private[this] val tracerCache = new TracerCache[ScribeRawZipkinTracer]
40 -
  private[this] val clientLabel = "zipkin-scribe"
41 -
42 -
  // only report these finagle metrics (including counters for individual exceptions)
43 -
  private[this] val clientStatsReceiver: StatsReceiver = new DenylistStatsReceiver(
44 -
    ClientStatsReceiver,
45 -
    {
46 -
      // StatsFilter
47 -
      case Seq(_, "requests") => false
48 -
      case Seq(_, "success") => false
49 -
      case Seq(_, "pending") => false
50 -
      case Seq(_, "failures", _*) => false
51 -
      case Seq(_, "logical", _*) => false // MethodBuilder StatsFilter
52 -
53 -
      // RetryFilter
54 -
      case Seq(_, "retries", _*) => false
55 -
56 -
      case _ => true
57 -
    }
58 -
  )
59 -
60 -
  // we must use a response classifer as the finagle Thrift client deals with raw
61 -
  // bytes and not the scrooge generated types.
62 -
  private[finagle] val responseClassifier: ResponseClassifier = {
63 -
    case ReqRep(_, Return(ResultCode.TryLater)) => ResponseClass.RetryableFailure
64 -
  }
65 -
66 -
  private[this] val shouldRetry: PartialFunction[
67 -
    (Scribe.Log.Args, Try[Scribe.Log.SuccessType]),
68 -
    Boolean
69 -
  ] = {
70 -
    // We don't retry failures that the RequeueFilter will handle
71 -
    case (_, Throw(RequeueFilter.Requeueable(_))) => false
72 -
    case tup if responseClassifier.isDefinedAt(ReqRep(tup._1, tup._2)) =>
73 -
      responseClassifier(ReqRep(tup._1, tup._2)) match {
74 -
        case ResponseClass.Failed(retryable) => retryable
75 -
        case _ => false
76 -
      }
77 -
  }
78 -
79 -
  // exposed for testing
80 -
  private[thrift] val retryPolicy = RetryPolicy.tries(
81 -
    numTries = 3, // MethodBuilder default
82 -
    shouldRetry = shouldRetry
83 -
  )
84 -
85 -
  private[this] def newClient(
86 -
    scribeHost: String,
87 -
    scribePort: Int,
88 -
    clientName: String,
89 -
    timer: Timer
90 -
  ): Scribe.MethodPerEndpoint = {
91 -
    // share the retryBudget between the retry and requeue filter
92 -
    val retryBudget = RetryBudget()
93 -
    val retryFilter = new RetryFilter(
94 -
      retryPolicy = retryPolicy,
95 -
      retryBudget = retryBudget,
96 -
      timer = timer,
97 -
      statsReceiver = clientStatsReceiver
98 -
    )
99 -
100 -
    val statsFilter = StatsFilter.typeAgnostic(
101 -
      clientStatsReceiver.scope(clientName).scope("logical"),
102 -
      responseClassifier,
103 -
      StatsFilter.DefaultExceptions,
104 -
      TimeUnit.MILLISECONDS
105 -
    )
106 -
107 -
    val transport = Thrift.client
108 -
      .withRetryBudget(retryBudget)
109 -
      .withSessionPool.maxSize(5)
110 -
      .withSessionPool.maxWaiters(250)
111 -
      .withSessionQualifier.noFailFast
112 -
      .withSessionQualifier.noFailureAccrual
113 -
      .withStatsReceiver(clientStatsReceiver)
114 -
      .withTracer(NullTracer)
115 -
      .withRequestTimeout(1.second) // each "logical" retry will have this timeout
116 -
      .servicePerEndpoint[Scribe.ServicePerEndpoint](s"inet!$scribeHost:$scribePort", clientName)
117 -
118 -
    val filteredTransport = transport
119 -
      .withLog(
120 -
        log = (new TracelessFilter)
121 -
          .andThen(statsFilter)
122 -
          .andThen(retryFilter)
123 -
          .andThen(transport.log))
124 -
125 -
    Thrift.Client.methodPerEndpoint(filteredTransport)
126 -
  }
23 +
  private[this] val label = "zipkin-scribe"
127 24
128 25
  /**
129 26
   * Creates a [[com.twitter.finagle.tracing.Tracer]] that sends traces to Zipkin via scribe.
@@ -132,76 +29,86 @@
Loading
132 29
   * @param scribePort Port to send trace data to
133 30
   * @param scribeCategory scribe category under which traces will be logged
134 31
   * @param statsReceiver Where to log information about tracing success/failures
135 -
   * @param clientName Name of the scribe finagle client
32 +
   * @param label label to use for Scribe stats and the associated Finagle client
136 33
   * @param timer A Timer used for timing out spans in the [[DeadlineSpanMap]]
137 34
   */
138 35
  def apply(
139 36
    scribeHost: String = Host().getHostName,
140 37
    scribePort: Int = Host().getPort,
141 38
    scribeCategory: String = "zipkin",
142 -
    statsReceiver: StatsReceiver = NullStatsReceiver,
143 -
    clientName: String = clientLabel,
39 +
    statsReceiver: StatsReceiver = DefaultStatsReceiver,
40 +
    label: String = label,
144 41
    timer: Timer = DefaultTimer
145 42
  ): ScribeRawZipkinTracer =
146 43
    tracerCache.getOrElseUpdate(
147 44
      scribeHost + scribePort + scribeCategory,
148 45
      apply(
149 -
        newClient(scribeHost, scribePort, clientName, timer),
150 46
        scribeCategory,
151 -
        statsReceiver,
152 -
        timer)
47 +
        Publisher.builder
48 +
          .withDest(s"inet!$scribeHost:$scribePort")
49 +
          .withStatsReceiver(statsReceiver)
50 +
          .build(scribeCategory, label),
51 +
        timer
52 +
      )
153 53
    )
154 54
155 55
  /**
156 56
   * Creates a [[com.twitter.finagle.tracing.Tracer]] that sends traces to scribe with the specified
157 57
   * scribeCategory.
158 58
   *
159 -
   * @param client The scribe client used to send traces to scribe
160 59
   * @param scribeCategory Category under which the trace data should be scribed
161 -
   * @param statsReceiver Where to log information about tracing success/failures
60 +
   * @param scribePublisher the [[com.twitter.finagle.scribe.Publisher]] to use for sending messages
162 61
   * @param timer A Timer used for timing out spans in the [[DeadlineSpanMap]]
163 62
   */
164 63
  def apply(
64 +
    scribeCategory: String,
65 +
    scribePublisher: Publisher,
66 +
    timer: Timer
67 +
  ): ScribeRawZipkinTracer =
68 +
    new ScribeRawZipkinTracer(scribeCategory, scribePublisher, timer)
69 +
70 +
  /**
71 +
   * Creates a [[com.twitter.finagle.tracing.Tracer]] that sends traces to scribe with the specified
72 +
   * scribeCategory.
73 +
   *
74 +
   * @param client the configured [[Scribe.MethodPerEndpoint]]
75 +
   * @param scribeCategory Category under which the trace data should be scribed
76 +
   * @param statsReceiver Where to log information about tracing success/failures
77 +
   * @param timer A Timer used for timing out spans in the [[DeadlineSpanMap]]
78 +
   */
79 +
  private[finagle] def apply(
165 80
    client: Scribe.MethodPerEndpoint,
166 81
    scribeCategory: String,
167 82
    statsReceiver: StatsReceiver,
168 83
    timer: Timer
169 -
  ): ScribeRawZipkinTracer = {
170 -
    new ScribeRawZipkinTracer(
171 -
      client,
172 -
      statsReceiver,
84 +
  ): ScribeRawZipkinTracer =
85 +
    apply(
173 86
      scribeCategory,
87 +
      new Publisher(scribeCategory, new ScribeStats(statsReceiver), client),
174 88
      timer
175 89
    )
176 -
  }
177 -
178 90
}
179 91
180 92
/**
181 93
 * Receives traces and sends them off to scribe with the specified scribeCategory.
182 94
 *
183 -
 * @param client The scribe client used to send traces to scribe
184 -
 * @param statsReceiver We generate stats to keep track of traces sent, failures and so on
185 95
 * @param scribeCategory scribe category under which the trace will be logged
96 +
 * @param scribePublisher the [[com.twitter.finagle.scribe.Publisher]] to use for sending messages.
186 97
 * @param timer A Timer used for timing out spans in the [[DeadlineSpanMap]]
187 98
 * @param poolSize The number of Memory transports to make available for serializing Spans
188 99
 * @param initialBufferSize Initial size of each transport
189 100
 * @param maxBufferSize Max size to keep around. Transports will grow as needed, but will revert back to `initialBufferSize` when reset if
190 101
 * they grow beyond `maxBufferSize`
191 102
 */
192 -
private[thrift] class ScribeRawZipkinTracer(
193 -
  client: Scribe.MethodPerEndpoint,
194 -
  statsReceiver: StatsReceiver,
103 +
private[finagle] class ScribeRawZipkinTracer(
195 104
  scribeCategory: String = "zipkin",
105 +
  scribePublisher: Publisher,
196 106
  timer: Timer = DefaultTimer,
197 107
  poolSize: Int = 10,
198 108
  initialBufferSize: StorageUnit = 512.bytes,
199 109
  maxBufferSize: StorageUnit = 1.megabyte)
200 -
    extends RawZipkinTracer(statsReceiver, timer) {
201 -
  private[this] val scopedReceiver = statsReceiver.scope("log_span")
202 -
  private[this] val okCounter = scopedReceiver.counter("ok")
203 -
  private[this] val tryLaterCounter = scopedReceiver.counter("try_later")
204 -
  private[this] val errorReceiver = scopedReceiver.scope("error")
110 +
    extends RawZipkinTracer(timer)
111 +
    with Closable {
205 112
206 113
  private[this] val initialSizeInBytes = initialBufferSize.inBytes.toInt
207 114
  private[this] val maxSizeInBytes = maxBufferSize.inBytes.toInt
@@ -217,6 +124,23 @@
Loading
217 124
    }
218 125
  }
219 126
127 +
  /**
128 +
   * Log the span data via Scribe.
129 +
   */
130 +
  def sendSpans(spans: Seq[Span]): Future[Unit] = {
131 +
    val logEntries = createLogEntries(spans)
132 +
    if (logEntries.isEmpty) Future.Done
133 +
    else scribePublisher.write(logEntries).unit
134 +
  }
135 +
136 +
  /**
137 +
   * Close the resource with the given deadline. This deadline is advisory,
138 +
   * giving the callee some leeway, for example to drain clients or finish
139 +
   * up other tasks.
140 +
   */
141 +
  override def close(deadline: Time): Future[Unit] =
142 +
    scribePublisher.close(deadline)
143 +
220 144
  /**
221 145
   * A wrapper around the TReusableMemoryTransport from Scrooge that
222 146
   * also resets the size of the underlying buffer if it grows larger
@@ -272,28 +196,13 @@
Loading
272 196
        span.toThrift.write(transport.protocol)
273 197
        entries.append(LogEntry(category = scribeCategory, message = transport.toBase64Line))
274 198
      } catch {
275 -
        case NonFatal(e) => errorReceiver.counter(e.getClass.getName).incr()
199 +
        case NonFatal(e) => scribePublisher.handleError(e)
276 200
      } finally {
277 201
        transport.reset()
278 202
        bufferPool.add(transport)
279 203
      }
280 204
    }
281 205
282 -
    entries.toSeq
283 -
  }
284 -
285 -
  /**
286 -
   * Log the span data via Scribe.
287 -
   */
288 -
  def sendSpans(spans: Seq[Span]): Future[Unit] = {
289 -
    client
290 -
      .log(createLogEntries(spans))
291 -
      .respond {
292 -
        case Return(ResultCode.Ok) => okCounter.incr()
293 -
        case Return(ResultCode.TryLater) => tryLaterCounter.incr()
294 -
        case Return(_) => ()
295 -
        case Throw(e) => errorReceiver.counter(e.getClass.getName).incr()
296 -
      }
297 -
      .unit
206 +
    entries.toSeq // Scala 2.13 needs the `.toSeq` here.
298 207
  }
299 208
}

@@ -5,8 +5,8 @@
Loading
5 5
import com.twitter.finagle.mysql.param._
6 6
import com.twitter.finagle.mysql.transport.Packet
7 7
import com.twitter.finagle.param.{
8 -
  ExceptionStatsHandler => _,
9 8
  Monitor => ParamMonitor,
9 +
  ExceptionStatsHandler => _,
10 10
  ResponseClassifier => _,
11 11
  Tracer => _,
12 12
  _
@@ -15,8 +15,9 @@
Loading
15 15
import com.twitter.finagle.stats.{ExceptionStatsHandler, NullStatsReceiver, StatsReceiver}
16 16
import com.twitter.finagle.tracing.Tracer
17 17
import com.twitter.finagle.transport.{Transport, TransportContext}
18 -
import com.twitter.util.{Duration, Monitor}
18 +
import com.twitter.util.{Duration, FuturePool, Monitor}
19 19
import java.net.SocketAddress
20 +
import java.util.concurrent.ExecutorService
20 21
21 22
/**
22 23
 * Supplements a [[com.twitter.finagle.Client]] with convenient
@@ -253,6 +254,10 @@
Loading
253 254
      fn: Stack[ServiceFactory[Request, Result]] => Stack[ServiceFactory[Request, Result]]
254 255
    ): Client =
255 256
      super.withStack(fn)
257 +
    override def withExecutionOffloaded(executor: ExecutorService): Client =
258 +
      super.withExecutionOffloaded(executor)
259 +
    override def withExecutionOffloaded(pool: FuturePool): Client =
260 +
      super.withExecutionOffloaded(pool)
256 261
    override def configured[P](psp: (P, Stack.Param[P])): Client = super.configured(psp)
257 262
    override def filtered(filter: Filter[Request, Result, Request, Result]): Client =
258 263
      super.filtered(filter)

@@ -0,0 +1,30 @@
Loading
1 +
package com.twitter.finagle.http.exp.routing
2 +
3 +
/** A value corresponding to a [[Parameter]]. */
4 +
private[routing] sealed abstract class ParameterValue {
5 +
6 +
  /** The raw string value. */
7 +
  def value: String
8 +
  override def toString: String = value
9 +
}
10 +
11 +
private[routing] final case class StringValue(value: String) extends ParameterValue
12 +
13 +
private[routing] final case class BooleanValue(
14 +
  value: String,
15 +
  booleanValue: Boolean)
16 +
    extends ParameterValue
17 +
18 +
private[routing] final case class IntValue(value: String, intValue: Int) extends ParameterValue
19 +
20 +
private[routing] final case class LongValue(value: String, longValue: Long) extends ParameterValue
21 +
22 +
private[routing] final case class FloatValue(
23 +
  value: String,
24 +
  floatValue: Float)
25 +
    extends ParameterValue
26 +
27 +
private[routing] final case class DoubleValue(
28 +
  value: String,
29 +
  doubleValue: Double)
30 +
    extends ParameterValue

@@ -0,0 +1,94 @@
Loading
1 +
package com.twitter.finagle.http.exp.routing
2 +
3 +
/** A lookup from [[String name]] to [[ParameterValue parameter value]]. */
4 +
private[http] sealed abstract class ParameterMap {
5 +
6 +
  /** Return true if a named parameter is present, false otherwise. */
7 +
  def isDefinedAt(name: String): Boolean
8 +
9 +
  /** Return Some(ParameterValue) for a named parameter if it is present, None otherwise. */
10 +
  private[routing] def getParam(name: String): Option[ParameterValue]
11 +
12 +
  /** Return Some(classOf[*type*]) for a named parameter if it is present, None otherwise. */
13 +
  def getParamClass(name: String): Option[Class[_]]
14 +
15 +
  /** Return Some(String) value for a named parameter if it is present, None otherwise. */
16 +
  def get(name: String): Option[String]
17 +
18 +
  /** Return Some(Int) value for a named parameter if it is present and a [[IntParam]], None otherwise. */
19 +
  def getInt(name: String): Option[Int]
20 +
21 +
  /** Return Some(Long) value for a named [[Parameter]] if it is present and a [[LongParam]], None otherwise. */
22 +
  def getLong(name: String): Option[Long]
23 +
24 +
  /** Return Some(Boolean) value for a named [[Parameter]] if it is present and a [[BooleanParam]], None otherwise. */
25 +
  def getBoolean(name: String): Option[Boolean]
26 +
27 +
  /** Return Some(Float) value for a named [[Parameter]] if it is present and a [[FloatParam]], None otherwise. */
28 +
  def getFloat(name: String): Option[Float]
29 +
30 +
  /** Return Some(Double) value for a named [[Parameter]] if it is present and a [[DoubleParam]], None otherwise. */
31 +
  def getDouble(name: String): Option[Double]
32 +
}
33 +
34 +
/** A [[ParameterMap]] that contains no results. */
35 +
private[http] object EmptyParameterMap extends ParameterMap {
36 +
  def isDefinedAt(name: String): Boolean = false
37 +
  private[routing] def getParam(name: String): Option[ParameterValue] = None
38 +
  def getParamClass(name: String): Option[Class[_]] = None
39 +
  def get(name: String): Option[String] = None
40 +
  def getInt(name: String): Option[Int] = None
41 +
  def getLong(name: String): Option[Long] = None
42 +
  def getBoolean(name: String): Option[Boolean] = None
43 +
  def getFloat(name: String): Option[Float] = None
44 +
  def getDouble(name: String): Option[Double] = None
45 +
}
46 +
47 +
/** A [[ParameterMap]] backed by a [[Map]]. */
48 +
private[http] class MapParameterMap private[routing] (underlying: Map[String, ParameterValue])
49 +
    extends ParameterMap {
50 +
51 +
  def isDefinedAt(name: String): Boolean = underlying.isDefinedAt(name)
52 +
53 +
  def getParamClass(name: String): Option[Class[_]] = underlying.get(name) match {
54 +
    case Some(_: StringValue) => Some(classOf[String])
55 +
    case Some(_: IntValue) => Some(classOf[Int])
56 +
    case Some(_: LongValue) => Some(classOf[Long])
57 +
    case Some(_: BooleanValue) => Some(classOf[Boolean])
58 +
    case Some(_: FloatValue) => Some(classOf[Float])
59 +
    case Some(_: DoubleValue) => Some(classOf[Double])
60 +
    case _ => None
61 +
  }
62 +
63 +
  private[routing] def getParam(name: String): Option[ParameterValue] = underlying.get(name)
64 +
65 +
  def get(name: String): Option[String] = underlying.get(name) match {
66 +
    case Some(pv) => Some(pv.value)
67 +
    case _ => None
68 +
  }
69 +
70 +
  def getInt(name: String): Option[Int] = underlying.get(name) match {
71 +
    case Some(i: IntValue) => Some(i.intValue)
72 +
    case _ => None
73 +
  }
74 +
75 +
  def getLong(name: String): Option[Long] = underlying.get(name) match {
76 +
    case Some(l: LongValue) => Some(l.longValue)
77 +
    case _ => None
78 +
  }
79 +
80 +
  def getBoolean(name: String): Option[Boolean] = underlying.get(name) match {
81 +
    case Some(b: BooleanValue) => Some(b.booleanValue)
82 +
    case _ => None
83 +
  }
84 +
85 +
  def getFloat(name: String): Option[Float] = underlying.get(name) match {
86 +
    case Some(f: FloatValue) => Some(f.floatValue)
87 +
    case _ => None
88 +
  }
89 +
90 +
  def getDouble(name: String): Option[Double] = underlying.get(name) match {
91 +
    case Some(d: DoubleValue) => Some(d.doubleValue)
92 +
    case _ => None
93 +
  }
94 +
}

@@ -150,7 +150,8 @@
Loading
150 150
  }
151 151
152 152
  /**
153 -
   * Run computation `f` with the given tracer and trace id.
153 +
   * Run computation `f` with the given tracer and trace id. If a sampling decision was not made
154 +
   * on `traceId`, one will be made using `tracer`.
154 155
   *
155 156
   * @param terminal true if the next traceId is a terminal id. Future
156 157
   *                 attempts to set nextId will be ignored.
@@ -160,8 +161,15 @@
Loading
160 161
    else {
161 162
      val oldId = if (terminal) traceId.copy(terminal = terminal) else traceId
162 163
      val newId = oldId.sampled match {
163 -
        case None => oldId.copy(_sampled = tracer.sampleTrace(oldId))
164 164
        case Some(_) => oldId
165 +
        case None =>
166 +
          val sampledOption = tracer.sampleTrace(oldId)
167 +
          sampledOption match {
168 +
            case Some(true) => Tracing.sampled.incr()
169 +
            case _ => //no-op
170 +
          }
171 +
172 +
          oldId.copy(_sampled = sampledOption)
165 173
      }
166 174
167 175
      val ts = tracers

@@ -0,0 +1,11 @@
Loading
1 +
package com.twitter.finagle.http.exp.routing
2 +
3 +
/**
4 +
 * Representation for an HTTP routing path
5 +
 *
6 +
 * @param segments The [[Segment segments]] that define this [[Path path]].
7 +
 */
8 +
private[http] final case class Path private[routing] (
9 +
  segments: Iterable[Segment]) {
10 +
  override def toString: String = segments.mkString
11 +
}

@@ -82,7 +82,16 @@
Loading
82 82
 *
83 83
 * @param underlying          Finagle client stack
84 84
 *
85 -
 * @param getLogicalPartition Getting the logical partition identifier from a host identifier.
85 +
 * @param observable          The state that determines the sharding scheme we use.  The updated values
86 +
 *                            are used by `getPartitionFunctionPerState` and `getLogicalPartitionPerState`
87 +
 *                            as soon as they're updated.
88 +
 *
89 +
 * @param getPartitionFunctionPerState When given a state, gets the partitioning function, which can be used
90 +
 *                            to describe which partitions a request should be subdivided into, and how a request
91 +
 *                            should be sliced and diced for each partition.
92 +
 *                            Note that this function must be referentially transparent.
93 +
 *
94 +
 * @param getLogicalPartitionPerState When given a state, gets the logical partition identifier from a host identifier.
86 95
 *                            Reverse lookup. Indicates which logical partition a physical host
87 96
 *                            belongs to, this is provided by client configuration when needed,
88 97
 *                            multiple hosts can belong to the same partition, for example:
@@ -97,12 +106,30 @@
Loading
97 106
 *                            if not provided, each host is a partition.
98 107
 *                            Host identifiers are derived from [[ZkMetadata]] shardId, logical
99 108
 *                            partition identifiers are defined by users in [[PartitioningStrategy]]
109 +
 *                            Note that this function must be referentially transparent.
100 110
 *
101 111
 * @param params              Configured Finagle client params
112 +
 *
113 +
 * @tparam Req the request type
114 +
 *
115 +
 * @tparam Rep the response type
116 +
 *
117 +
 * @tparam A   parameterizes the observable.  this is the type of the state that determines how
118 +
 *             requests get partitioned.
119 +
 *
120 +
 * @tparam B   the type of a partitioning function that will be snapshotted given a new state of
121 +
 *             type B.
102 122
 */
103 -
private[finagle] class PartitionNodeManager[Req, Rep](
123 +
private[finagle] class PartitionNodeManager[
124 +
  Req,
125 +
  Rep,
126 +
  A,
127 +
  B >: PartialFunction[Any, Future[Nothing]]
128 +
](
104 129
  underlying: Stack[ServiceFactory[Req, Rep]],
105 -
  getLogicalPartition: Int => Int,
130 +
  observable: Activity[A],
131 +
  getPartitionFunctionPerState: A => B,
132 +
  getLogicalPartitionPerState: A => Int => Int,
106 133
  params: Stack.Params)
107 134
    extends Closable { self =>
108 135
@@ -115,21 +142,24 @@
Loading
115 142
    stats.scope("partitioner")
116 143
  }
117 144
118 -
  private[this] val partitionServiceNodes =
119 -
    new AtomicReference[Map[Int, ServiceFactory[Req, Rep]]]()
145 +
  // we initialize this immediately because we filter the event that keeps
146 +
  // track of partitions later.  if some of them are empty, this will be null.
147 +
  // it should at least be shaped right, so that the exceptions we get are more
148 +
  // useful than NPEs
149 +
  // TODO we should return a DelayingServiceFactory until `observable` is no
150 +
  // longer pending
151 +
  private[this] val partitionServiceNodes = new AtomicReference(
152 +
    SnapPartitioner.uninitialized[Req, Rep, B]
153 +
  )
120 154
121 155
  private[this] val partitionerMetrics =
122 -
    statsReceiver.addGauge("nodes") { partitionServiceNodes.get.size }
156 +
    statsReceiver.addGauge("nodes") { partitionServiceNodes.get.partitionMapping.size }
123 157
124 158
  // Keep track of addresses in the current set that already have associate instances
125 159
  private[this] val destActivity = varAddrToActivity(params[LoadBalancerFactory.Dest].va, label)
126 160
127 -
  // listen to the WeightedAddress changes, transform the changes to a stream of
128 -
  // partition id (includes errors) to [[CachedServiceFactory]].
129 -
  private[this] val partitionAddressChanges: Event[
130 -
    Activity.State[Map[Try[Int], CachedServiceFactory[Req, Rep]]]
131 -
  ] = {
132 -
    val cachedServiceFactoryDiffOps = new DiffOps[Address, CachedServiceFactory[Req, Rep]] {
161 +
  private[this] val cachedServiceFactoryDiffOps =
162 +
    new DiffOps[Address, CachedServiceFactory[Req, Rep]] {
133 163
      def remove(factory: CachedServiceFactory[Req, Rep]): Unit = factory.close()
134 164
135 165
      def add(addresses: Set[Address]): CachedServiceFactory[Req, Rep] =
@@ -144,12 +174,17 @@
Loading
144 174
      }
145 175
    }
146 176
147 -
    val getShardIdFromAddress: Address => Try[Int] = {
148 -
      case WeightedAddress(Address.Inet(_, metadata), _) =>
177 +
  private[this] val getShardIdFromAddress: A => Address => Try[Int] = {
178 +
    state =>
179 +
      { addr =>
180 +
        val metadata = addr match {
181 +
          case WeightedAddress(Address.Inet(_, metadata), _) => metadata
182 +
          case Address.ServiceFactory(_, metadata) => metadata
183 +
        }
149 184
        ZkMetadata.fromAddrMetadata(metadata).flatMap(_.shardId) match {
150 185
          case Some(id) =>
151 186
            try {
152 -
              val partitionId = getLogicalPartition(id)
187 +
              val partitionId = getLogicalPartitionPerState(state)(id)
153 188
              Return(partitionId)
154 189
            } catch {
155 190
              case NonFatal(e) =>
@@ -161,59 +196,66 @@
Loading
161 196
            logger.log(Level.ERROR, "getLogicalPartition failed with: ", ex)
162 197
            Throw(ex)
163 198
        }
164 -
    }
199 +
      }
200 +
  }
165 201
166 -
    val init = Map.empty[Try[Int], CachedServiceFactory[Req, Rep]]
167 -
    safelyScanLeft(init, destActivity.states) { (partitionNodes, activeSet) =>
168 -
      updatePartitionMap[Try[Int], CachedServiceFactory[Req, Rep], Address](
169 -
        partitionNodes,
170 -
        activeSet,
171 -
        getShardIdFromAddress,
172 -
        cachedServiceFactoryDiffOps
173 -
      )
202 +
  // listen to the WeightedAddress changes, transform the changes to a stream of
203 +
  // partition id (includes errors) to [[CachedServiceFactory]].
204 +
  private[this] val partitionAddressChanges: Activity[
205 +
    (B, Map[Try[Int], CachedServiceFactory[Req, Rep]])
206 +
  ] = {
207 +
    val init: (B, Map[Try[Int], CachedServiceFactory[Req, Rep]]) =
208 +
      (PartialFunction.empty, Map.empty)
209 +
    safelyScanLeft(init, destActivity.join(observable)) {
210 +
      case ((_, partitionNodes), (activeSet, state)) =>
211 +
        (
212 +
          getPartitionFunctionPerState(state),
213 +
          updatePartitionMap[Try[Int], CachedServiceFactory[Req, Rep], Address](
214 +
            partitionNodes,
215 +
            activeSet,
216 +
            getShardIdFromAddress(state),
217 +
            cachedServiceFactoryDiffOps
218 +
          ))
174 219
    }
175 220
  }
176 221
177 222
  // Transform the stream of [[CachedServiceFactory]] to ServiceFactory and filter out
178 223
  // the failed partition id
179 -
  private[this] val partitionNodesChange: Event[Map[Int, ServiceFactory[Req, Rep]]] = {
180 -
    val init = Map.empty[Int, ServiceFactory[Req, Rep]]
224 +
  private[this] val partitionNodesChange: Event[SnapPartitioner[Req, Rep, B]] = {
225 +
    val init = SnapPartitioner.uninitialized[Req, Rep, B]
181 226
    partitionAddressChanges
227 +
      .states
182 228
      .foldLeft(init) {
183 -
        case (_, Activity.Ok(partitions)) =>
229 +
        case (_, Activity.Ok((partitionFn, partitions))) =>
184 230
          // this could possibly be an empty update if getLogicalPartition returns all Throws
185 -
          partitions.filter(_._1.isReturn).map {
186 -
            case (key, sf) => (key.get() -> sf.factory)
187 -
          }
231 +
          SnapPartitioner(
232 +
            partitionFn,
233 +
            partitions.collect {
234 +
              case (Return(key), sf) => (key -> sf.factory)
235 +
            })
188 236
        case (staleState, _) => staleState
189 -
      }.filter(_.nonEmpty)
237 +
      }.filter(_.partitionMapping.nonEmpty)
190 238
  }
191 239
192 240
  private[this] val nodeWatcher: Closable =
193 241
    partitionNodesChange.register(Witness(partitionServiceNodes))
194 242
195 243
  /**
196 -
   * Returns a Future of [[Service]] which maps to the given partitionId.
197 -
   *
198 -
   * Note: The caller is responsible for relinquishing the use of the returned [[Service]
199 -
   * by calling [[Service#close()]]. Close this node manager will close all underlying services.
200 -
   *
201 -
   * @param partitionId logical partition id
244 +
   * Returns a [[SnapPartitioner]] which describes how to partition requests.
202 245
   */
203 -
  def getServiceByPartitionId(partitionId: Int): Future[Service[Req, Rep]] = {
204 -
    partitionServiceNodes.get.get(partitionId) match {
205 -
      case Some(factory) => factory()
206 -
      case None =>
207 -
        Future.exception(
208 -
          new NoPartitionException(s"No partition: $partitionId found in the node manager"))
209 -
    }
210 -
  }
246 +
  def snapshotSharder(): SnapPartitioner[Req, Rep, B] = partitionServiceNodes.get
211 247
212 248
  /**
213 -
   * When close the node manager, all underlying services are closed.
249 +
   * When we close the node manager, all underlying services are closed.
214 250
   */
215 -
  def close(deadline: Time): Future[Unit] = self.synchronized {
216 -
    nodeWatcher.close(deadline)
217 -
    Closable.all(partitionServiceNodes.get.values.toSeq: _*).close(deadline)
251 +
  def close(deadline: Time): Future[Unit] = {
252 +
    partitionerMetrics.remove()
253 +
    // we want to ensure that nodeWatcher stops updating the partitionServiceNodes
254 +
    // before we start closing them
255 +
    Closable
256 +
      .sequence(
257 +
        nodeWatcher,
258 +
        Closable.all(partitionServiceNodes.get.partitionMapping.values.toSeq: _*)
259 +
      ).close(deadline)
218 260
  }
219 261
}

@@ -3,8 +3,9 @@
Loading
3 3
import com.twitter.finagle.Stack
4 4
import com.twitter.finagle.Thrift.param
5 5
import com.twitter.finagle.thrift.{RichClientParam, ThriftClientRequest}
6 -
import com.twitter.scrooge.{TReusableBuffer, ThriftStruct}
6 +
import com.twitter.scrooge.{TReusableBuffer, ThriftStruct, ThriftStructIface}
7 7
import java.util
8 +
import org.apache.thrift.TBase
8 9
import org.apache.thrift.protocol.{TMessage, TMessageType, TProtocolFactory}
9 10
10 11
/**
@@ -28,12 +29,23 @@
Loading
28 29
   * @param args       Thrift object request
29 30
   * @param oneWay     Expect response or not, this should inherit from the original request
30 31
   */
31 -
  def serialize(methodName: String, args: ThriftStruct, oneWay: Boolean): ThriftClientRequest = {
32 +
  def serialize(
33 +
    methodName: String,
34 +
    args: ThriftStructIface,
35 +
    oneWay: Boolean
36 +
  ): ThriftClientRequest = {
32 37
    val memoryBuffer = thriftReusableBuffer.get()
33 38
    try {
34 39
      val oprot = protocolFactory.getProtocol(memoryBuffer)
35 40
      oprot.writeMessageBegin(new TMessage(methodName, TMessageType.CALL, 0))
36 -
      args.write(oprot)
41 +
      args match {
42 +
        case thriftStruct: ThriftStruct => thriftStruct.write(oprot)
43 +
        case tBase if tBase.isInstanceOf[TBase[_, _]] =>
44 +
          tBase.asInstanceOf[TBase[_, _]].write(oprot)
45 +
        case _ =>
46 +
          throw new IllegalArgumentException(
47 +
            "unsupported request types: supporting scrooge generated java/scala requests")
48 +
      }
37 49
      oprot.writeMessageEnd()
38 50
      oprot.getTransport().flush()
39 51
      val bytes = util.Arrays.copyOfRange(memoryBuffer.getArray(), 0, memoryBuffer.length())

@@ -14,17 +14,17 @@
Loading
14 14
}
15 15
import com.twitter.finagle.redis.RedisPartitioningService
16 16
import com.twitter.finagle.redis.exp.{ConnectionInitCommand, RedisPool}
17 +
import com.twitter.finagle.redis.filter.{RedisLoggingFilter, RedisTracingFilter}
18 +
import com.twitter.finagle.redis.param.{Database, Password}
17 19
import com.twitter.finagle.redis.protocol.{Command, Reply, StageTransport}
18 20
import com.twitter.finagle.service.{ResponseClassifier, RetryBudget}
19 21
import com.twitter.finagle.stats.{ExceptionStatsHandler, StatsReceiver}
20 22
import com.twitter.finagle.tracing.Tracer
21 23
import com.twitter.finagle.transport.{Transport, TransportContext}
22 24
import com.twitter.io.Buf
23 -
import com.twitter.util.{Duration, Monitor}
25 +
import com.twitter.util.{Duration, FuturePool, Monitor}
24 26
import java.net.SocketAddress
25 -
26 -
import com.twitter.finagle.redis.filter.{RedisLoggingFilter, RedisTracingFilter}
27 -
import com.twitter.finagle.redis.param.{Database, Password}
27 +
import java.util.concurrent.ExecutorService
28 28
29 29
trait RedisRichClient { self: Client[Command, Reply] =>
30 30
@@ -155,6 +155,10 @@
Loading
155 155
      fn: Stack[ServiceFactory[Command, Reply]] => Stack[ServiceFactory[Command, Reply]]
156 156
    ): Client =
157 157
      super.withStack(fn)
158 +
    override def withExecutionOffloaded(executor: ExecutorService): Client =
159 +
      super.withExecutionOffloaded(executor)
160 +
    override def withExecutionOffloaded(pool: FuturePool): Client =
161 +
      super.withExecutionOffloaded(pool)
158 162
    override def configured[P](psp: (P, Stack.Param[P])): Client = super.configured(psp)
159 163
    override def filtered(filter: Filter[Command, Reply, Command, Reply]): Client =
160 164
      super.filtered(filter)

@@ -77,12 +77,12 @@
Loading
77 77
   */
78 78
  private[finagle] def safelyScanLeft[T, U](
79 79
    init: U,
80 -
    stream: Event[Activity.State[T]]
80 +
    stream: Activity[T]
81 81
  )(
82 82
    f: (U, T) => U
83 -
  ): Event[Activity.State[U]] = {
83 +
  ): Activity[U] = {
84 84
    val initState: Activity.State[U] = Activity.Ok(init)
85 -
    stream.foldLeft(initState) {
85 +
    Activity(stream.states.foldLeft(initState) {
86 86
      case (Activity.Pending, Activity.Ok(update)) => Activity.Ok(f(init, update))
87 87
      case (Activity.Failed(_), Activity.Ok(update)) => Activity.Ok(f(init, update))
88 88
      case (Activity.Ok(state), Activity.Ok(update)) => Activity.Ok(f(state, update))
@@ -90,7 +90,7 @@
Loading
90 90
      case (stale @ Activity.Ok(state), Activity.Pending) if init != state => stale
91 91
      case (_, failed @ Activity.Failed(_)) => failed
92 92
      case (_, Activity.Pending) => Activity.Pending
93 -
    }
93 +
    })
94 94
  }
95 95
96 96
  /**
@@ -252,8 +252,8 @@
Loading
252 252
   * the [[Address]] does not have a weight, a default weight of 1.0 is used.
253 253
   */
254 254
  private[this] def weightEndpoints(
255 -
    addrs: Event[Activity.State[Set[Address]]]
256 -
  ): Event[Activity.State[Set[WeightedFactory[Req, Rep]]]] = {
255 +
    addrs: Activity[Set[Address]]
256 +
  ): Activity[Set[WeightedFactory[Req, Rep]]] = {
257 257
    val init = Map.empty[Address, WeightedFactory[Req, Rep]]
258 258
    safelyScanLeft(init, addrs) {
259 259
      case (active, addrs) =>
@@ -291,11 +291,7 @@
Loading
291 291
              case _ => cache
292 292
            }
293 293
        }
294 -
    }.map {
295 -
      case Activity.Ok(cache) => Activity.Ok(cache.values.toSet)
296 -
      case Activity.Pending => Activity.Pending
297 -
      case failed @ Activity.Failed(_) => failed
298 -
    }
294 +
    }.map(_.values.toSet)
299 295
  }
300 296
301 297
  /**
@@ -303,8 +299,8 @@
Loading
303 299
   * Because balancer instances are stateful, they need to be cached across updates.
304 300
   */
305 301
  private[this] def partition(
306 -
    endpoints: Event[Activity.State[Set[WeightedFactory[Req, Rep]]]]
307 -
  ): Event[Activity.State[Iterable[WeightClass[Req, Rep]]]] = {
302 +
    endpoints: Activity[Set[WeightedFactory[Req, Rep]]]
303 +
  ): Activity[Iterable[WeightClass[Req, Rep]]] = {
308 304
    // Cache entries are balancer instances together with their backing collection
309 305
    // which is updatable. The entries are keyed by weight class.
310 306
    val init = Map.empty[Double, CachedBalancer[Req, Rep]]
@@ -355,18 +351,15 @@
Loading
355 351
        val bal = newBalancer(Activity(endpoints))
356 352
        Map(1.0 -> CachedBalancer(bal, endpoints, 0))
357 353
      }
358 -
    }.map {
359 -
      case Activity.Ok(cache) =>
360 -
        Activity.Ok(cache.map {
361 -
          case (weight, CachedBalancer(bal, endpoints, size)) =>
362 -
            WeightClass(bal, endpoints, weight, size)
363 -
        })
364 -
      case Activity.Pending => Activity.Pending
365 -
      case failed @ Activity.Failed(_) => failed
354 +
    }.map { cache =>
355 +
      cache.map {
356 +
        case (weight, CachedBalancer(bal, endpoints, size)) =>
357 +
          WeightClass(bal, endpoints, weight, size)
358 +
      }
366 359
    }
367 360
  }
368 361
369 -
  private[this] val weightClasses = partition(weightEndpoints(dest.states))
362 +
  private[this] val weightClasses = partition(weightEndpoints(dest))
370 363
  private[this] val pending = new Promise[ServiceFactory[Req, Rep]]
371 364
  private[this] val init: ServiceFactory[Req, Rep] = new DelayedFactory(pending)
372 365
@@ -394,7 +387,7 @@
Loading
394 387
  // Translate the stream of weightClasses into a stream of underlying
395 388
  // ServiceFactories that can service requests.
396 389
  private[this] val underlying: Event[ServiceFactory[Req, Rep]] =
397 -
    weightClasses.foldLeft(init) {
390 +
    weightClasses.states.foldLeft(init) {
398 391
      case (_, Activity.Ok(wcs)) =>
399 392
        val dist = new Distributor(wcs, busyWeightClasses, rng)
400 393
        updateGauges(wcs)

@@ -0,0 +1,329 @@
Loading
1 +
package com.twitter.finagle.scribe
2 +
3 +
import com.twitter.conversions.DurationOps._
4 +
import com.twitter.finagle.service._
5 +
import com.twitter.finagle.stats.{DefaultStatsReceiver, DenylistStatsReceiver, StatsReceiver}
6 +
import com.twitter.finagle.thrift.scribe.thriftscala.Scribe.Log
7 +
import com.twitter.finagle.thrift.scribe.thriftscala.{LogEntry, ResultCode, Scribe}
8 +
import com.twitter.finagle.tracing.{NullTracer, TracelessFilter}
9 +
import com.twitter.finagle.util.DefaultTimer
10 +
import com.twitter.finagle.{Filter, Service, Thrift}
11 +
import com.twitter.io.Buf
12 +
import com.twitter.util.{Closable, Future, Return, Throw, Time, Try}
13 +
import java.nio.charset.{StandardCharsets => JChar}
14 +
import java.util.concurrent.TimeUnit
15 +
import scala.util.control.NonFatal
16 +
17 +
object Publisher {
18 +
  private val DefaultDest = s"inet!localhost:1463"
19 +
  private val NewLineUtf8Buf: Buf = Buf.Utf8(System.getProperty("line.separator"))
20 +
21 +
  private def recordWithNewline(record: Array[Byte]): String = {
22 +
    val recordBuf = Buf.ByteArray.Shared(record)
23 +
    // add the trailing '\n'
24 +
    val concatenated = recordBuf.concat(NewLineUtf8Buf)
25 +
    Buf.decodeString(concatenated, JChar.UTF_8)
26 +
  }
27 +
28 +
  // only report these Finagle metrics (including counters for individual exceptions)
29 +
  private[this] def filteredStatsReceiver(statsReceiver: StatsReceiver): StatsReceiver = {
30 +
    new DenylistStatsReceiver(
31 +
      statsReceiver,
32 +
      {
33 +
        // MethodBuilder StatsFilter & RetryFilter
34 +
        case Seq(_, "logical", _*) => false
35 +
        case Seq(_, "retries", _*) => false
36 +
37 +
        // Basic Finagle stats
38 +
        case Seq(_, "requests") => false
39 +
        case Seq(_, "success") => false
40 +
        case Seq(_, "pending") => false
41 +
        case Seq(_, "failures", _*) => false
42 +
43 +
        case _ => true
44 +
      })
45 +
  }
46 +
47 +
  // We must use a ResponseClassifier as the Finagle Thrift client deals with raw
48 +
  // bytes and not the Scrooge generated types.
49 +
  private[finagle] val DefaultResponseClassifier: ResponseClassifier = {
50 +
    case ReqRep(_, Return(ResultCode.TryLater)) => ResponseClass.RetryableFailure
51 +
    case ReqRep(_, Return(_: ResultCode.EnumUnknownResultCode)) => ResponseClass.NonRetryableFailure
52 +
  }
53 +
54 +
  private[this] val ShouldRetry: PartialFunction[
55 +
    (Log.Args, Try[Log.SuccessType]),
56 +
    Boolean
57 +
  ] = {
58 +
    // We don't retry failures that the RequeueFilter will handle
59 +
    case (_, Throw(RequeueFilter.Requeueable(_))) => false
60 +
    case tup if DefaultResponseClassifier.isDefinedAt(ReqRep(tup._1, tup._2)) =>
61 +
      DefaultResponseClassifier(ReqRep(tup._1, tup._2)) match {
62 +
        case ResponseClass.Failed(retryable) => retryable
63 +
        case _ => false
64 +
      }
65 +
  }
66 +
67 +
  private[this] val DefaultRetryPolicy: RetryPolicy[(Log.Args, Try[Log.SuccessType])] =
68 +
    RetryPolicy.tries(
69 +
      numTries = 3, // MethodBuilder default
70 +
      shouldRetry = ShouldRetry
71 +
    )
72 +
73 +
  /** A [[Builder]] with all defaults */
74 +
  def builder: Builder = new Builder()
75 +
76 +
  /** Build a new [[Publisher]] with a default [[Builder]] */
77 +
  def build(category: String, label: String): Publisher =
78 +
    builder.build(category, label)
79 +
80 +
  /**
81 +
   * Builder for a Scribe [[Publisher]]
82 +
   * ==Usage==
83 +
   * {{{
84 +
   *   val publisher: Publisher = Publisher.build("category", "label")
85 +
   * }}}
86 +
   *
87 +
   * Or to apply configuration:
88 +
   *
89 +
   * {{{
90 +
   *   val publisher: Publisher =
91 +
   *     Publisher.builder
92 +
   *       .withRetryPolicy(customPolicy)
93 +
   *       .withResponseClassifier(customClassifier)
94 +
   *       .build("category", "label")
95 +
   * }}}
96 +
   *
97 +
   * @param dest Resolvable destination of the Scribe host to which to write entries.
98 +
   * @param statsReceiver [[StatsReceiver]] for collections of success/failure/error metrics.
99 +
   * @param retryPolicy the Finagle client [[RetryPolicy]] for retrying failures.
100 +
   * @param responseClassifier how Finagle should classify responses.
101 +
   * @param filter a user provided Filter chain which is applied directly before writing the entries to Scribe.
102 +
   */
103 +
  class Builder private[scribe] (
104 +
    dest: String = DefaultDest,
105 +
    statsReceiver: StatsReceiver = DefaultStatsReceiver,
106 +
    retryPolicy: RetryPolicy[(Log.Args, Try[Log.SuccessType])] = DefaultRetryPolicy,
107 +
    responseClassifier: ResponseClassifier = DefaultResponseClassifier,
108 +
    filter: Filter[Log.Args, Log.SuccessType, Log.Args, Log.SuccessType] = Filter.identity,
109 +
    logServiceOverride: Option[Service[Log.Args, Log.SuccessType]] = None) {
110 +
111 +
    def withDest(dest: String): Builder = {
112 +
      new Builder(
113 +
        dest = dest,
114 +
        this.statsReceiver,
115 +
        this.retryPolicy,
116 +
        this.responseClassifier,
117 +
        this.filter,
118 +
        this.logServiceOverride)
119 +
    }
120 +
121 +
    def withStatsReceiver(statsReceiver: StatsReceiver): Builder = {
122 +
      new Builder(
123 +
        this.dest,
124 +
        statsReceiver = statsReceiver,
125 +
        this.retryPolicy,
126 +
        this.responseClassifier,
127 +
        this.filter,
128 +
        this.logServiceOverride)
129 +
    }
130 +
131 +
    def withRetryPolicy(retryPolicy: RetryPolicy[(Log.Args, Try[Log.SuccessType])]): Builder = {
132 +
      new Builder(
133 +
        this.dest,
134 +
        this.statsReceiver,
135 +
        retryPolicy = retryPolicy,
136 +
        this.responseClassifier,
137 +
        this.filter,
138 +
        this.logServiceOverride)
139 +
    }
140 +
141 +
    def withResponseClassifier(responseClassifier: ResponseClassifier): Builder = {
142 +
      new Builder(
143 +
        this.dest,
144 +
        this.statsReceiver,
145 +
        this.retryPolicy,
146 +
        responseClassifier = responseClassifier,
147 +
        this.filter,
148 +
        this.logServiceOverride)
149 +
    }
150 +
151 +
    /** APPEND (not replace) the given Filter to the current Filter */
152 +
    def withFilter(
153 +
      filter: Filter[Log.Args, Log.SuccessType, Log.Args, Log.SuccessType]
154 +
    ): Builder = {
155 +
      new Builder(
156 +
        this.dest,
157 +
        this.statsReceiver,
158 +
        this.retryPolicy,
159 +
        this.responseClassifier,
160 +
        this.filter.andThen(filter),
161 +
        this.logServiceOverride)
162 +
    }
163 +
164 +
    /* exposed for testing */
165 +
    private[scribe] def withLogServiceOverride(
166 +
      logServiceOverride: Option[Service[Log.Args, Log.SuccessType]]
167 +
    ): Builder = {
168 +
      new Builder(
169 +
        this.dest,
170 +
        this.statsReceiver,
171 +
        this.retryPolicy,
172 +
        this.responseClassifier,
173 +
        this.filter,
174 +
        logServiceOverride = logServiceOverride
175 +
      )
176 +
    }
177 +
178 +
    /**
179 +
     * Build a new Scribe [[Publisher]] from the given category and label.
180 +
     * @param category the Scribe category to which to publish entries.
181 +
     * @param label the client label used in metrics
182 +
     * @return a new [[Publisher]] configured from the current state this [[Builder]].
183 +
     */
184 +
    def build(category: String, label: String): Publisher = {
185 +
      val stats = new ScribeStats(statsReceiver.scope(label))
186 +
      val client = newClient(label, stats, this.statsReceiver.scope("clnt"))
187 +
      new Publisher(
188 +
        category = category,
189 +
        stats = stats,
190 +
        client = client
191 +
      )
192 +
    }
193 +
194 +
    private def newClient(
195 +
      label: String,
196 +
      stats: ScribeStats, // captures per-request Scribe stats (not logical)
197 +
      clientStatsReceiver: StatsReceiver // Finagle client stats -- filtered & will be use for logical and per-request
198 +
    ): Scribe.MethodPerEndpoint = {
199 +
      val statsReceiver = filteredStatsReceiver(clientStatsReceiver)
200 +
      val scopedStatsReceiver = statsReceiver.scope(label)
201 +
      val logicalStatsReceiver = scopedStatsReceiver.scope("logical")
202 +
203 +
      // share the RetryBudget between the retry and requeue filters
204 +
      val retryBudget = RetryBudget()
205 +
      val retryFilter = new RetryFilter[Log.Args, ResultCode](
206 +
        retryPolicy = this.retryPolicy,
207 +
        retryBudget = retryBudget,
208 +
        timer = DefaultTimer,
209 +
        statsReceiver = scopedStatsReceiver
210 +
      )
211 +
212 +
      val statsFilter = StatsFilter
213 +
        .typeAgnostic(
214 +
          logicalStatsReceiver, // client stats receiver, filtered and scoped to the label + logical, e.g., clnt/label/logical
215 +
          this.responseClassifier,
216 +
          StatsFilter.DefaultExceptions,
217 +
          TimeUnit.MILLISECONDS
218 +
        ).toFilter[Log.Args, Log.SuccessType]
219 +
220 +
      val servicePerEndpoint: Scribe.ServicePerEndpoint = this.logServiceOverride match {
221 +
        case Some(svc) =>
222 +
          Scribe.ServicePerEndpoint(log = svc)
223 +
        case _ =>
224 +
          Thrift.client
225 +
            .withRetryBudget(retryBudget)
226 +
            .withSessionPool.maxSize(5)
227 +
            .withSessionPool.maxWaiters(10000)
228 +
            .withSessionQualifier.noFailFast
229 +
            .withSessionQualifier.noFailureAccrual
230 +
            // Client stats receiver, filtered, will be scoped to label by Finagle, e.g., clnt/label/
231 +
            .withStatsReceiver(statsReceiver)
232 +
            // We disable Tracing for this client to prevent creating a recursive tracing storm.
233 +
            .withTracer(NullTracer)
234 +
            .withRequestTimeout(1.second) // each retry will have this timeout
235 +
            .servicePerEndpoint[Scribe.ServicePerEndpoint](this.dest, label)
236 +
      }
237 +
238 +
      val tracelessFilter = new TracelessFilter
239 +
      val scribeMetricsFilter = new ScribeMetricsFilter(stats)
240 +
      val filteredServicePerEndpoint = servicePerEndpoint.withLog(
241 +
        log = tracelessFilter
242 +
          .andThen(statsFilter)
243 +
          .andThen(retryFilter)
244 +
          // this is placed after the retry filter so
245 +
          // that we update stats on retried requests
246 +
          .andThen(scribeMetricsFilter)
247 +
          // user provided filter
248 +
          .andThen(this.filter)
249 +
          .andThen(servicePerEndpoint.log)
250 +
      )
251 +
      Thrift.Client.methodPerEndpoint(filteredServicePerEndpoint)
252 +
    }
253 +
  }
254 +
}
255 +
256 +
/**
257 +
 * Publishes entries to Scribe. Metrics are collected per-request and logically.
258 +
 *
259 +
 * Logical metrics:
260 +
 * {{{
261 +
 *   clnt/label/logical/requests
262 +
 *   clnt/label/logical/success
263 +
 *   clnt/label/logical/pending
264 +
 *   clnt/label/logical/request_latency_ms
265 +
 *   clnt/label/logical/failures
266 +
 *   clnt/label/logical/failures/com.twitter.finagle.ChannelWriteException
267 +
 *
268 +
 * }}}
269 +
 *
270 +
 * Per-request metrics:
271 +
 * {{{
272 +
 *   clnt/label/retries
273 +
 *   clnt/label/retries/budget_exhausted
274 +
 *   label/scribe/try_later
275 +
 *   label/scribe/ok
276 +
 *   label/scribe/error/com.twitter.finagle.ChannelWriteException
277 +
 * }}}
278 +
 *
279 +
 * @param category the Scribe category to which to publish entries.
280 +
 * @param stats the [[ScribeStats]] to use for recording errors.
281 +
 * @param client the [[Scribe.MethodPerEndpoint]] Finagle client for sending messages to Scribe.
282 +
 */
283 +
final class Publisher private[finagle] (
284 +
  category: String,
285 +
  stats: ScribeStats,
286 +
  client: Scribe.MethodPerEndpoint)
287 +
    extends Closable {
288 +
  import Publisher._
289 +
290 +
  /**
291 +
   * Write the given array of bytes to scribe. Bytes are UTF-8 encoded and appended with the
292 +
   * system line separator.
293 +
   * @param record byte array to write to scribe
294 +
   */
295 +
  def write(record: Array[Byte]): Future[Unit] =
296 +
    write(toLogEntry(record))
297 +
298 +
  /**
299 +
   * Write the given list of [[LogEntry]] items to scribe.
300 +
   * @param entries list of entries to write to scribe
301 +
   */
302 +
  def write(entries: Seq[LogEntry]): Future[Unit] =
303 +
    if (entries.nonEmpty) {
304 +
      client
305 +
        .log(entries)
306 +
        .unit
307 +
    } else Future.Done
308 +
309 +
  /** Proxy to handle errors */
310 +
  def handleError(e: Throwable): Unit = stats.handleError(e)
311 +
312 +
  /**
313 +
   * Close the resource with the given deadline. This deadline is advisory,
314 +
   * giving the callee some leeway, for example to drain clients or finish
315 +
   * up other tasks.
316 +
   */
317 +
  override def close(deadline: Time): Future[Unit] =
318 +
    client.asClosable.close(deadline)
319 +
320 +
  private[this] def toLogEntry(record: Array[Byte]): Seq[LogEntry] = {
321 +
    try {
322 +
      Seq(LogEntry(category = category, message = recordWithNewline(record)))
323 +
    } catch {
324 +
      case NonFatal(e) =>
325 +
        stats.handleError(e)
326 +
        Seq.empty[LogEntry]
327 +
    }
328 +
  }
329 +
}

@@ -53,6 +53,7 @@
Loading
53 53
import com.twitter.util._
54 54
import com.twitter.util.registry.GlobalRegistry
55 55
import java.net.SocketAddress
56 +
import java.util.concurrent.ExecutorService
56 57
57 58
/**
58 59
 * Factory methods to build a finagle-memcached client.
@@ -352,6 +353,10 @@
Loading
352 353
      fn: Stack[ServiceFactory[Command, Response]] => Stack[ServiceFactory[Command, Response]]
353 354
    ): Client =
354 355
      super.withStack(fn)
356 +
    override def withExecutionOffloaded(executor: ExecutorService): Client =
357 +
      super.withExecutionOffloaded(executor)
358 +
    override def withExecutionOffloaded(pool: FuturePool): Client =
359 +
      super.withExecutionOffloaded(pool)
355 360
    override def configured[P](psp: (P, Stack.Param[P])): Client = super.configured(psp)
356 361
    override def filtered(filter: Filter[Command, Response, Command, Response]): Client =
357 362
      super.filtered(filter)
@@ -424,7 +429,10 @@
Loading
424 429
      fn: Stack[ServiceFactory[Command, Response]] => Stack[ServiceFactory[Command, Response]]
425 430
    ): Server =
426 431
      super.withStack(fn)
427 -
432 +
    override def withExecutionOffloaded(executor: ExecutorService): Server =
433 +
      super.withExecutionOffloaded(executor)
434 +
    override def withExecutionOffloaded(pool: FuturePool): Server =
435 +
      super.withExecutionOffloaded(pool)
428 436
    override def configured[P](psp: (P, Stack.Param[P])): Server = super.configured(psp)
429 437
  }
430 438

@@ -44,9 +44,9 @@
Loading
44 44
      new Gauge { def remove(): Unit = () }
45 45
    }
46 46
47 -
  protected[this] def registerGauge(verbosity: Verbosity, name: Seq[String], f: => Float): Unit =
47 +
  protected[this] def registerGauge(schema: GaugeSchema, f: => Float): Unit =
48 48
    synchronized {
49 -
      _gauges += (name -> (() => f))
49 +
      _gauges += (schema.metricBuilder.name -> (() => f))
50 50
    }
51 51
52 52
  protected[this] def deregisterGauge(name: Seq[String]): Unit = synchronized {

@@ -25,6 +25,7 @@
Loading
25 25
  name: String)
26 26
    extends Command(name) {
27 27
  KeyValidation.checkKey(key)
28 +
  ExpiryValidation.checkExpiry(name, expiry)
28 29
}
29 30
30 31
sealed abstract class NonStorageCommand(name: String) extends Command(name)

@@ -6,8 +6,9 @@
Loading
6 6
import com.twitter.finagle.{Service, ServiceFactory, SimpleFilter, Stack, param}
7 7
import com.twitter.finagle.stats.{Stat, StatsReceiver, Verbosity}
8 8
import com.twitter.finagle.tracing.{Trace, Tracing}
9 -
import com.twitter.io.Reader
10 -
import com.twitter.util.Future
9 +
import com.twitter.io.{Reader, StreamTermination}
10 +
import com.twitter.util.{Future, Return, Throw, Try}
11 +
import java.util.concurrent.atomic.AtomicLong
11 12
12 13
private[finagle] object PayloadSizeFilter {
13 14
  val Role: Stack.Role = Stack.Role("HttpPayloadSize")
@@ -28,6 +29,38 @@
Loading
28 29
29 30
  val clientTraceKeyPrefix: String = "clnt/"
30 31
  val serverTraceKeyPrefix: String = "srv/"
32 +
33 +
  private val reqKey: String = "request_payload_bytes"
34 +
  private val repKey: String = "response_payload_bytes"
35 +
  private val chunkKey: String = "chunk_payload_bytes"
36 +
37 +
  private final class RecordingChunkReader(
38 +
    parent: Reader[Chunk],
39 +
    trace: Tracing,
40 +
    stat: Stat,
41 +
    traceKey: String)
42 +
      extends Reader[Chunk] { self =>
43 +
44 +
    private[this] val bytesObserved = new AtomicLong()
45 +
46 +
    private[this] val observeRead: Try[Option[Chunk]] => Unit = {
47 +
      case Return(Some(chunk)) =>
48 +
        val len = chunk.content.length
49 +
        bytesObserved.addAndGet(len)
50 +
        stat.add(len)
51 +
52 +
      case Return(None) =>
53 +
        if (trace.isActivelyTracing) trace.recordBinary(traceKey, bytesObserved.get)
54 +
55 +
      case Throw(_) => // nop
56 +
    }
57 +
58 +
    def discard(): Unit = parent.discard()
59 +
60 +
    def onClose: Future[StreamTermination] = parent.onClose
61 +
62 +
    def read(): Future[Option[Chunk]] = parent.read().respond(observeRead)
63 +
  }
31 64
}
32 65
33 66
/**
@@ -46,59 +79,35 @@
Loading
46 79
 */
47 80
private[finagle] class PayloadSizeFilter(statsReceiver: StatsReceiver, prefix: String)
48 81
    extends SimpleFilter[Request, Response] {
82 +
  import PayloadSizeFilter._
49 83
50 -
  private[this] val reqKey = "request_payload_bytes"
51 -
  private[this] val repKey = "response_payload_bytes"
52 -
  private[this] val chunkKey = "chunk_payload_bytes"
84 +
  private[this] val streamReqTraceKey: String = s"${prefix}stream/request/${chunkKey}"
85 +
  private[this] val streamRepTraceKey: String = s"${prefix}stream/response/${chunkKey}"
86 +
  private[this] val reqTraceKey: String = prefix + reqKey
87 +
  private[this] val repTraceKey: String = prefix + repKey
53 88
54 -
  private[this] val streamReqTraceKey = s"${prefix}stream/request/${chunkKey}"
55 -
  private[this] val streamRepTraceKey = s"${prefix}stream/response/${chunkKey}"
56 -
  private[this] val reqTraceKey = prefix + reqKey
57 -
  private[this] val repTraceKey = prefix + repKey
89 +
  private[this] val requestBytes: Stat = statsReceiver.stat(Verbosity.Debug, reqKey)
90 +
  private[this] val responseBytes: Stat = statsReceiver.stat(Verbosity.Debug, repKey)
58 91
59 -
  private[this] val requestBytes = statsReceiver.stat(Verbosity.Debug, reqKey)
60 -
  private[this] val responseBytes = statsReceiver.stat(Verbosity.Debug, repKey)
61 -
62 -
  private[this] val streamRequestBytes =
92 +
  private[this] val streamRequestBytes: Stat =
63 93
    statsReceiver.scope("stream").scope("request").stat(Verbosity.Debug, chunkKey)
64 -
  private[this] val streamResponseBytes =
94 +
  private[this] val streamResponseBytes: Stat =
65 95
    statsReceiver.scope("stream").scope("response").stat(Verbosity.Debug, chunkKey)
66 96
67 97
  private[this] def handleResponse(trace: Tracing): Response => Response = { rep =>
68 98
    if (rep.isChunked) {
69 99
      new ResponseProxy {
70 100
        override def response: Response = rep
71 -
        override def chunkReader: Reader[Chunk] =
72 -
          super.chunkReader
73 -
            .map(onRead(recordRepSize, trace, streamResponseBytes, streamRepTraceKey))
101 +
        override val chunkReader: Reader[Chunk] =
102 +
          new RecordingChunkReader(super.chunkReader, trace, streamResponseBytes, streamRepTraceKey)
74 103
      }
75 104
    } else {
76 -
      recordRepSize(rep.content.length, trace, responseBytes, repTraceKey)
105 +
      recordBufferedSize(rep.content.length, trace, responseBytes, repTraceKey)
77 106
      rep
78 107
    }
79 108
  }
80 109
81 -
  private[this] def onRead(
82 -
    record: (Int, Tracing, Stat, String) => Unit,
83 -
    trace: Tracing,
84 -
    stat: Stat,
85 -
    traceKey: String
86 -
  ): Chunk => Chunk = { chunk =>
87 -
    record(chunk.content.length, trace, stat, traceKey)
88 -
    chunk
89 -
  }
90 -
91 -
  private[this] def recordReqSize(
92 -
    size: Int,
93 -
    trace: Tracing,
94 -
    reqStat: Stat,
95 -
    traceKey: String
96 -
  ): Unit = {
97 -
    reqStat.add(size.toFloat)
98 -
    if (trace.isActivelyTracing) trace.recordBinary(traceKey, size)
99 -
  }
100 -
101 -
  private[this] def recordRepSize(
110 +
  private[this] def recordBufferedSize(
102 111
    size: Int,
103 112
    trace: Tracing,
104 113
    repStat: Stat,
@@ -113,11 +122,11 @@
Loading
113 122
    val request = if (req.isChunked) {
114 123
      new RequestProxy {
115 124
        override def request: Request = req
116 -
        override def chunkReader: Reader[Chunk] =
117 -
          super.chunkReader.map(onRead(recordReqSize, trace, streamRequestBytes, streamReqTraceKey))
125 +
        override val chunkReader: Reader[Chunk] =
126 +
          new RecordingChunkReader(super.chunkReader, trace, streamRequestBytes, streamReqTraceKey)
118 127
      }
119 128
    } else {
120 -
      recordReqSize(req.content.length, trace, requestBytes, reqTraceKey)
129 +
      recordBufferedSize(req.content.length, trace, requestBytes, reqTraceKey)
121 130
      req
122 131
    }
123 132
    service(request).map(handleResponse(trace))

@@ -4,7 +4,8 @@
Loading
4 4
import com.twitter.finagle.loadbalancer.LoadBalancerFactory
5 5
import com.twitter.finagle.partitioning.param
6 6
import com.twitter.finagle.thrift.ThriftClientRequest
7 -
import com.twitter.finagle.{ServiceFactory, Stack}
7 +
import com.twitter.finagle.{FailureFlags, ServiceFactory, Stack}
8 +
import com.twitter.logging.{HasLogLevel, Level}
8 9
9 10
/**
10 11
 * ThriftPartitioningService applies [[PartitioningStrategy]] on the Thrift client.
@@ -20,6 +21,23 @@
Loading
20 21
      Stack.Param(Strategy(Disabled))
21 22
  }
22 23
24 +
  /**
25 +
   * Failed to get PartitionIds/HashKeys and Requests from [[PartitioningStrategy]].
26 +
   */
27 +
  final class PartitioningStrategyException(
28 +
    message: String,
29 +
    cause: Throwable = null,
30 +
    val flags: Long = FailureFlags.Empty)
31 +
      extends Exception(message, cause)
32 +
      with FailureFlags[PartitioningStrategyException]
33 +
      with HasLogLevel {
34 +
    def this(cause: Throwable) = this(null, cause)
35 +
    def logLevel: Level = Level.ERROR
36 +
37 +
    protected def copyWithFlags(flags: Long): PartitioningStrategyException =
38 +
      new PartitioningStrategyException(message, cause, flags)
39 +
  }
40 +
23 41
  /**
24 42
   * A helper class to provide helper methods for different Protocols when marshalling
25 43
   * requests and responses. Now distinctions are between Thrift messages and

@@ -91,6 +91,8 @@
Loading
91 91
      def apply(conn: ClientConnection): Future[Service[Req, Rep]] = noRelease
92 92
      def close(deadline: Time): Future[Unit] = service.close(deadline)
93 93
94 +
      override def status: Status = service.status
95 +
94 96
      override def toString: String = service.toString
95 97
    }
96 98

@@ -1,5 +1,7 @@
Loading
1 1
package com.twitter.finagle.netty4.param
2 2
3 +
import com.twitter.finagle.netty4.param.WorkerPool.toEventLoopGroup
4 +
import com.twitter.finagle.stats.FinagleStatsReceiver
3 5
import com.twitter.finagle.Stack
4 6
import com.twitter.finagle.netty4.{numWorkers, useNativeEpoll}
5 7
import com.twitter.finagle.util.BlockingTimeTrackingThreadFactory
@@ -8,6 +10,7 @@
Loading
8 10
import io.netty.channel.nio.NioEventLoopGroup
9 11
import io.netty.util.concurrent.DefaultThreadFactory
10 12
import java.util.concurrent.{Executor, Executors, ThreadFactory}
13 +
import java.util.concurrent.atomic.AtomicInteger
11 14
12 15
/**
13 16
 * A class eligible for configuring the [[io.netty.channel.EventLoopGroup]] used
@@ -20,11 +23,7 @@
Loading
20 23
 */
21 24
case class WorkerPool(eventLoopGroup: EventLoopGroup) {
22 25
  def this(executor: Executor, numWorkers: Int) =
23 -
    this(
24 -
      if (useNativeEpoll() && Epoll.isAvailable)
25 -
        WorkerPool.mkEpollEventLoopGroup(numWorkers, executor)
26 -
      else WorkerPool.mkNioEventLoopGroup(numWorkers, executor)
27 -
    )
26 +
    this(toEventLoopGroup(executor, numWorkers))
28 27
29 28
  def mk(): (WorkerPool, Stack.Param[WorkerPool]) =
30 29
    (this, WorkerPool.workerPoolParam)
@@ -32,6 +31,19 @@
Loading
32 31
33 32
object WorkerPool {
34 33
34 +
  private[this] val workerPoolSize = new AtomicInteger(0)
35 +
36 +
  // We hold onto the reference so the gauge doesn't get GC'd
37 +
  private[this] val workerGauge = FinagleStatsReceiver.addGauge("netty4", "worker_threads") {
38 +
    workerPoolSize.get
39 +
  }
40 +
41 +
  private def toEventLoopGroup(executor: Executor, numWorkers: Int): EventLoopGroup = {
42 +
    workerPoolSize.addAndGet(numWorkers)
43 +
    if (useNativeEpoll() && Epoll.isAvailable) mkEpollEventLoopGroup(numWorkers, executor)
44 +
    else mkNioEventLoopGroup(numWorkers, executor)
45 +
  }
46 +
35 47
  // This uses the netty DefaultThreadFactory to create thread pool threads. This factory creates
36 48
  // special FastThreadLocalThreads that netty has specific optimizations for.
37 49
  // Microbenchmarks put allocations via the netty allocator pools at ~8% faster on

@@ -83,7 +83,18 @@
Loading
83 83
        (this, OppTls.param)
84 84
    }
85 85
    object OppTls {
86 -
      implicit val param = Stack.Param(OppTls(None))
86 +
      implicit val param = new Stack.Param[OppTls] {
87 +
        val default: OppTls = OppTls(None)
88 +
89 +
        // override this to have a "cleaner" output in the registry
90 +
        override def show(value: OppTls): Seq[(String, () => String)] = {
91 +
          val levelStr = value match {
92 +
            case OppTls(Some(oppTls)) => oppTls.value.toString
93 +
            case OppTls(None) => "none"
94 +
          }
95 +
          Seq(("opportunisticTlsLevel", () => levelStr))
96 +
        }
97 +
      }
87 98
88 99
      /** Determine whether opportunistic TLS is configured to `Desired` or `Required`. */
89 100
      def enabled(params: Stack.Params): Boolean = params[OppTls].level match {

@@ -45,6 +45,7 @@
Loading
45 45
      ): Future[method.SuccessType] = {
46 46
        val request = encodeRequest(method.name, args, pf, method.oneway)
47 47
        val serdeCtx = new ClientDeserializeCtx[method.SuccessType](args, decodeRepFn)
48 +
        serdeCtx.rpcName(method.name)
48 49
        Contexts.local.let(ClientDeserializeCtx.Key, serdeCtx) {
49 50
          service(request).flatMap { response => Future.const(serdeCtx.deserialize(response)) }
50 51
        }

@@ -1,33 +1,17 @@
Loading
1 1
package com.twitter.finagle.thrift.exp.partitioning
2 2
3 -
import com.twitter.finagle.partitioning.{PartitionNodeManager, PartitioningService}
3 +
import com.twitter.finagle.loadbalancer.{LoadBalancerFactory, TrafficDistributor}
4 +
import com.twitter.finagle.param.Label
5 +
import com.twitter.finagle.partitioning.PartitioningService
4 6
import com.twitter.finagle.thrift.ClientDeserializeCtx
5 -
import com.twitter.finagle.thrift.exp.partitioning.ThriftPartitioningService.ReqRepMarshallable
6 -
import com.twitter.finagle.{FailureFlags, Service, ServiceFactory, Stack}
7 -
import com.twitter.logging.{HasLogLevel, Level}
8 -
import com.twitter.scrooge.{ThriftStruct, ThriftStructIface}
9 -
import com.twitter.util.Future
10 -
import scala.util.control.NonFatal
11 -
12 -
private object ThriftCustomPartitioningService {
13 -
14 -
  /**
15 -
   * Failed to get Partition Ids and Requests from [[CustomPartitioningStrategy]].
16 -
   */
17 -
  final class PartitioningStrategyException(
18 -
    message: String,
19 -
    cause: Throwable = null,
20 -
    val flags: Long = FailureFlags.Empty)
21 -
      extends Exception(message, cause)
22 -
      with FailureFlags[PartitioningStrategyException]
23 -
      with HasLogLevel {
24 -
    def this(cause: Throwable) = this(null, cause)
25 -
    def logLevel: Level = Level.ERROR
26 -
27 -
    protected def copyWithFlags(flags: Long): PartitioningStrategyException =
28 -
      new PartitioningStrategyException(message, cause, flags)
29 -
  }
7 +
import com.twitter.finagle.thrift.exp.partitioning.ThriftPartitioningService.{
8 +
  PartitioningStrategyException,
9 +
  ReqRepMarshallable
30 10
}
11 +
import com.twitter.finagle.{Address, Service, ServiceFactory, Stack}
12 +
import com.twitter.scrooge.ThriftStructIface
13 +
import com.twitter.util.{Future, Time}
14 +
import scala.util.control.NonFatal
31 15
32 16
/**
33 17
 * This custom partitioning service integrates with the user supplied
@@ -35,63 +19,62 @@
Loading
35 19
 * partitioning topologies.
36 20
 * @see [[PartitioningService]].
37 21
 */
38 -
class ThriftCustomPartitioningService[Req, Rep](
22 +
private[finagle] class ThriftCustomPartitioningService[Req, Rep](
39 23
  underlying: Stack[ServiceFactory[Req, Rep]],
40 24
  thriftMarshallable: ReqRepMarshallable[Req, Rep],
41 25
  params: Stack.Params,
42 -
  customStrategy: CustomPartitioningStrategy)
26 +
  configuredStrategy: CustomPartitioningStrategy)
43 27
    extends PartitioningService[Req, Rep] {
44 28
45 -
  import ThriftCustomPartitioningService._
29 +
  private[this] val customStrategy = configuredStrategy match {
30 +
    case strat: ClientClusterStrategy =>
31 +
      new ClientCustomStrategy[Set[Address]](
32 +
        strat.getPartitionIdAndRequestFn,
33 +
        strat.getLogicalPartitionIdFn,
34 +
        TrafficDistributor
35 +
          .varAddrToActivity(params[LoadBalancerFactory.Dest].va, params[Label].label))
36 +
    case _ => configuredStrategy
37 +
  }
46 38
47 -
  private[this] val nodeManager =
48 -
    new PartitionNodeManager(underlying, customStrategy.getLogicalPartition, params)
39 +
  private[this] val nodeManager = customStrategy.newNodeManager(underlying, params)
49 40
50 -
  // the precondition is that this request is all for a single partition
51 -
  final protected def getPartitionFor(partitionedRequest: Req): Future[Service[Req, Rep]] = {
52 -
    getPartitionIdAndRequestMap(partitionedRequest).flatMap { partitionMap =>
53 -
      if (partitionMap.isEmpty) {
54 -
        // this should not happen, isSingletonPartition guards against this
55 -
        noPartitionInformationHandler(partitionedRequest)
56 -
      } else {
57 -
        partitionServiceForPartitionId(partitionMap.head._1)
58 -
      }
59 -
    }
60 -
  }
41 +
  private[this] val serializer = new ThriftRequestSerializer(params)
42 +
43 +
  private[this] def rpcName: String = ClientDeserializeCtx.get.rpcName.getOrElse("N/A")
61 44
62 45
  final protected def noPartitionInformationHandler(req: Req): Future[Nothing] = {
63 46
    val ex = new PartitioningStrategyException(
64 -
      s"No Partitioning Ids for the thrift method: ${ClientDeserializeCtx.get.rpcName
65 -
        .getOrElse("N/A")}")
47 +
      s"No Partitioning Ids for the thrift method: $rpcName")
66 48
    Future.exception(ex)
67 49
  }
68 50
69 51
  // for fan-out requests
70 -
  final protected def partitionRequest(
52 +
  final protected[finagle] def partitionRequest(
71 53
    original: Req
72 54
  ): Future[Map[Req, Future[Service[Req, Rep]]]] = {
73 -
    val serializer = new ThriftRequestSerializer(params)
74 -
    ClientDeserializeCtx.get.rpcName match {
75 -
      case Some(rpcName) =>
76 -
        getPartitionIdAndRequestMap(original).map { idsAndRequests =>
77 -
          idsAndRequests.map {
78 -
            case (id, request) =>
79 -
              val thriftClientRequest = serializer.serialize(
80 -
                rpcName,
81 -
                request.asInstanceOf[ThriftStruct],
82 -
                thriftMarshallable.isOneway(original))
83 -
84 -
              val partitionedReq =
85 -
                thriftMarshallable.framePartitionedRequest(thriftClientRequest, original)
86 -
87 -
              // we assume NodeManager updates always happen before getPartitionIdAndRequestMap
88 -
              // updates. When updating the partitioning topology, it should do proper locking
89 -
              // before returning a lookup map.
90 -
              (partitionedReq, partitionServiceForPartitionId(id))
91 -
          }
92 -
        }
93 -
      case None =>
94 -
        Future.exception(new IllegalArgumentException("cannot find the thrift method rpcName"))
55 +
    val snapPartitioner = nodeManager.snapshotSharder()
56 +
57 +
    val partitionIdAndRequest = getPartitionIdAndRequestMap(snapPartitioner.partitionFunction)
58 +
59 +
    partitionIdAndRequest.flatMap { idsAndRequests =>
60 +
      if (idsAndRequests.isEmpty) {
61 +
        noPartitionInformationHandler(original)
62 +
      } else if (idsAndRequests.size == 1) {
63 +
        // optimization: won't serialize request if it is a singleton partition
64 +
        Future.value(
65 +
          Map(original -> snapPartitioner.getServiceByPartitionId(idsAndRequests.head._1)))
66 +
      } else {
67 +
        Future.value(idsAndRequests.map {
68 +
          case (id, request) =>
69 +
            val thriftClientRequest =
70 +
              serializer.serialize(rpcName, request, thriftMarshallable.isOneway(original))
71 +
72 +
            val partitionedReq =
73 +
              thriftMarshallable.framePartitionedRequest(thriftClientRequest, original)
74 +
75 +
            (partitionedReq, snapPartitioner.getServiceByPartitionId(id))
76 +
        })
77 +
      }
95 78
    }
96 79
  }
97 80
@@ -99,22 +82,30 @@
Loading
99 82
    originalReq: Req,
100 83
    results: PartitioningService.PartitionedResults[Req, Rep]
101 84
  ): Rep = {
102 -
    val responseMerger = customStrategy match {
103 -
      case clientCustomStrategy: ClientCustomStrategy =>
104 -
        ClientDeserializeCtx.get.rpcName.flatMap { rpcName =>
105 -
          clientCustomStrategy.responseMergerRegistry.get(rpcName)
106 -
        } match {
107 -
          case Some(merger) => merger
108 -
          case None =>
109 -
            throw new IllegalArgumentException(
110 -
              s"cannot find the response merger for thrift method: " +
111 -
                s"${ClientDeserializeCtx.get.rpcName.getOrElse("N/A")}"
112 -
            )
113 -
        }
85 +
    val mergerOption = customStrategy match {
86 +
      case clientCustomStrategy: ClientCustomStrategy[_] =>
87 +
        clientCustomStrategy.responseMergerRegistry.get(rpcName)
88 +
      case mbCustomStrategy: MethodBuilderCustomStrategy[_, _] =>
89 +
        mbCustomStrategy
90 +
        //upcasting, MethodBuilderCustomStrategy[Req <: ThriftStructIface, _]
91 +
          .asInstanceOf[MethodBuilderCustomStrategy[_, Any]]
92 +
          .responseMerger
93 +
      case clusterStrategy: ClientClusterStrategy =>
94 +
        throw new IllegalStateException(
95 +
          s"found a ClientClusterStrategy $clusterStrategy after it" +
96 +
            " should have been converted to a ClientCustomStrategy.  This state should be " +
97 +
            "impossible to reach. It indicates a serious bug.")
98 +
    }
99 +
100 +
    val responseMerger = mergerOption match {
101 +
      case Some(merger) => merger
102 +
      case None =>
103 +
        throw new IllegalArgumentException(
104 +
          s"cannot find the response merger for thrift method: $rpcName"
105 +
        )
114 106
    }
115 107
116 108
    val mergedResponse = ThriftPartitioningUtil.mergeResponses(
117 -
      originalReq,
118 109
      results,
119 110
      responseMerger,
120 111
      thriftMarshallable.fromResponseToBytes)
@@ -126,37 +117,44 @@
Loading
126 117
    thriftMarshallable.emptyResponse
127 118
  }
128 119
129 -
  final protected def isSinglePartition(request: Req): Future[Boolean] = {
130 -
    getPartitionIdAndRequestMap(request).flatMap { idsAndRequests =>
131 -
      val partitionIds = idsAndRequests.map(_._1).toSeq
132 -
      if (partitionIds.isEmpty) {
133 -
        noPartitionInformationHandler(request)
134 -
      } else {
135 -
        Future.value(partitionIds.size == 1)
136 -
      }
137 -
    }
138 -
  }
139 -
140 -
  private[this] def getPartitionIdAndRequestMap(req: Req): Future[Map[Int, ThriftStructIface]] = {
120 +
  // note: this function should be only evaluate once per-request
121 +
  private[partitioning] def getPartitionIdAndRequestMap(
122 +
    pf: ClientCustomStrategy.ToPartitionedMap
123 +
  ): Future[Map[Int, ThriftStructIface]] = {
141 124
    val inputArg = ClientDeserializeCtx.get.request.asInstanceOf[ThriftStructIface]
142 125
    try {
143 126
      val getPartitionIdAndRequest = { ts: ThriftStructIface =>
144 127
        customStrategy match {
145 -
          case clientCustomStrategy: ClientCustomStrategy =>
146 -
            clientCustomStrategy.getPartitionIdAndRequest
147 -
              .applyOrElse(ts, ClientCustomStrategy.defaultPartitionIdAndRequest)
128 +
          case _: ClientCustomStrategy[_] =>
129 +
            pf.applyOrElse(ts, ClientCustomStrategy.defaultPartitionIdAndRequest)
130 +
          case mbCustomStrategy: MethodBuilderCustomStrategy[_, _] =>
131 +
            mbCustomStrategy
132 +
            //upcasting, MethodBuilderCustomStrategy[Req <: ThriftStructIface, _]
133 +
              .asInstanceOf[MethodBuilderCustomStrategy[ThriftStructIface, _]]
134 +
              .getPartitionIdAndRequest(ts)
135 +
          case clusterStrategy: ClientClusterStrategy =>
136 +
            throw new IllegalStateException(
137 +
              s"found a ClientClusterStrategy $clusterStrategy after it should have been " +
138 +
                "converted to a ClientCustomStrategy.  This state should be impossible" +
139 +
                " to reach. It indicates a serious bug.")
148 140
        }
149 141
      }
150 -
      // CustomPartitioningStrategy.defaultPartitionIdAndRequest set a Future.never
142 +
      // ClientCustomStrategy.defaultPartitionIdAndRequest throws a Future.exception
151 143
      // for undefined endpoints(methods) in PartitioningStrategy. It indicates
152 144
      // those requests for certain endpoint won't be served in PartitioningService.
153 145
      getPartitionIdAndRequest(inputArg)
154 146
    } catch {
147 +
      case castEx: ClassCastException =>
148 +
        // applied the wrong request type to getPartitionIdAndRequest
149 +
        Future.exception(
150 +
          new PartitioningStrategyException(
151 +
            "MethodBuilder Strategy request type doesn't match with the actual request type, " +
152 +
              "please check the MethodBuilderCustomStrategy type.",
153 +
            castEx))
155 154
      case NonFatal(e) => Future.exception(new PartitioningStrategyException(e))
156 155
    }
157 156
  }
158 157
159 -
  private[this] def partitionServiceForPartitionId(partitionId: Int): Future[Service[Req, Rep]] = {
160 -
    nodeManager.getServiceByPartitionId(partitionId)
161 -
  }
158 +
  override def close(deadline: Time): Future[Unit] =
159 +
    Future.join(Seq(nodeManager.close(deadline), super.close(deadline)))
162 160
}

@@ -19,7 +19,12 @@
Loading
19 19
  SslContextClientEngineFactory
20 20
}
21 21
import com.twitter.finagle.ssl.TrustCredentials
22 -
import com.twitter.finagle.stats.{NullStatsReceiver, StatsReceiver}
22 +
import com.twitter.finagle.stats.{
23 +
  NullStatsReceiver,
24 +
  RoleConfiguredStatsReceiver,
25 +
  StatsReceiver,
26 +
  Client => ClientRole
27 +
}
23 28
import com.twitter.finagle.transport.Transport
24 29
import com.twitter.finagle.util._
25 30
import com.twitter.util
@@ -1121,7 +1126,7 @@
Loading
1121 1126
  private[finagle] lazy val statsReceiver = {
1122 1127
    val Stats(sr) = params[Stats]
1123 1128
    val Label(label) = params[Label]
1124 -
    sr.scope(label)
1129 +
    new RoleConfiguredStatsReceiver(sr.scope(label), ClientRole)
1125 1130
  }
1126 1131
1127 1132
  /**
@@ -1233,7 +1238,8 @@
Loading
1233 1238
      val Stats(statsReceiver) = statsP
1234 1239
      val ExceptionStatsHandler(categorizer) = exceptionStatsHandlerP
1235 1240
1236 -
      val stats = new StatsFilter[Req, Rep](statsReceiver.scope("tries"), categorizer)
1241 +
      val stats =
1242 +
        new StatsFilter[Req, Rep](statsReceiver.scope("tries"), categorizer)
1237 1243
      stats.andThen(next)
1238 1244
    }
1239 1245
  }

@@ -34,20 +34,21 @@
Loading
34 34
  }
35 35
36 36
  override def addGauge(schema: GaugeSchema)(f: => Float): Gauge = {
37 -
    registerGauge(schema.metricBuilder.verbosity, schema.metricBuilder.name, f)
37 +
    registerGauge(schema, f)
38 38
39 39
    // dummy gauge to make type signature happy.
40 40
    new Gauge { def remove(): Unit = () }
41 41
  }
42 42
43 -
  protected[this] def registerGauge(verbosity: Verbosity, name: Seq[String], f: => Float): Unit =
43 +
  protected[this] def registerGauge(schema: GaugeSchema, f: => Float): Unit =
44 44
    synchronized {
45 -
      deregisterGauge(name)
45 +
      deregisterGauge(schema.metricBuilder.name)
46 46
47 -
      val level = if (verbosity == Verbosity.Debug) Level.FINEST else Level.INFO
47 +
      val level =
48 +
        if (schema.metricBuilder.verbosity == Verbosity.Debug) Level.FINEST else Level.INFO
48 49
49 -
      timerTasks(name) = timer.schedule(10.seconds) {
50 -
        logger.log(level, "%s %2f".format(formatName(name), f))
50 +
      timerTasks(schema.metricBuilder.name) = timer.schedule(10.seconds) {
51 +
        logger.log(level, "%s %2f".format(formatName(schema.metricBuilder.name), f))
51 52
      }
52 53
    }
53 54

@@ -25,7 +25,7 @@
Loading
25 25
   * The failing signal currently is gathered from
26 26
   * [[com.twitter.finagle.partitioning.ConsistentHashingFailureAccrualFactory]]
27 27
   *
28 -
   * @note: When turning on, ejection is based on the failure accrual mentioned above, so your
28 +
   * @note When turning on, ejection is based on the failure accrual mentioned above, so your
29 29
   *       cluster may get different views of the same host. With partitioning strategy updates,
30 30
   *       this can introduce inter-process inconsistencies between hash rings.
31 31
   *       In many cases, it's better to eject failing host via a separate mechanism that's

@@ -0,0 +1,42 @@
Loading
1 +
package com.twitter.finagle.partitioning
2 +
3 +
import com.twitter.util.Future
4 +
import com.twitter.finagle.{Service, ServiceFactory}
5 +
6 +
/**
7 +
 * Represents the snapshot of the information needed to determine how to
8 +
 * carve up the request, and where to send those carved-up slices to.
9 +
 */
10 +
case class SnapPartitioner[Req, Rep, B >: PartialFunction[Any, Future[Nothing]]](
11 +
  partitionFunction: B,
12 +
  private[partitioning] val partitionMapping: Map[Int, ServiceFactory[Req, Rep]]) {
13 +
14 +
  /**
15 +
   * When given a partitionId, asynchronously returns a [[Service]] that sends
16 +
   * requests to a shard within that partition.
17 +
   *
18 +
   * @note we assume that the factory has implemented FactoryToService under the covers,
19 +
   * so the returned service does not need to be closed by the caller.
20 +
   */
21 +
  def getServiceByPartitionId(partitionId: Int): Future[Service[Req, Rep]] =
22 +
    partitionMapping.get(partitionId) match {
23 +
      case Some(factory) => factory()
24 +
      case None =>
25 +
        Future.exception(
26 +
          new PartitionNodeManager.NoPartitionException(
27 +
            s"No partition: $partitionId found in the node manager"))
28 +
    }
29 +
}
30 +
31 +
object SnapPartitioner {
32 +
33 +
  /**
34 +
   * Provides a sensible default if you don't have enough information to decide
35 +
   * how to partition.
36 +
   */
37 +
  private[partitioning] def uninitialized[
38 +
    Req,
39 +
    Rep,
40 +
    B >: PartialFunction[Any, Future[Nothing]]
41 +
  ]: SnapPartitioner[Req, Rep, B] = SnapPartitioner(PartialFunction.empty, Map.empty)
42 +
}

@@ -45,7 +45,7 @@
Loading
45 45
  // Therefore, we've assigned the min to 12 to further decrease the probability of having a
46 46
  // aperture without any healthy nodes.
47 47
  // Note: the flag will be removed and replaced with a constant after tuning.
48 -
  private val MinDeterministicAperture: Int = {
48 +
  private[aperture] val MinDeterministicAperture: Int = {
49 49
    val min = minDeterminsticAperture()
50 50
    if (1 < min) min
51 51
    else {
@@ -85,6 +85,24 @@
Loading
85 85
    // point stability issues.
86 86
    math.min(1.0, width)
87 87
  }
88 +
89 +
  // Only an Inet address of the factory is considered and all
90 +
  // other address types are ignored.
91 +
  private[aperture] def computeVectorHash(it: Iterator[Address]): Int = {
92 +
    // A specialized reimplementation of MurmurHash3.listHash
93 +
    var n = 0
94 +
    var h = MurmurHash3.arraySeed
95 +
    while (it.hasNext) it.next() match {
96 +
      case Inet(addr, _) if !addr.isUnresolved =>
97 +
        val d = MurmurHash3.bytesHash(addr.getAddress.getAddress)
98 +
        h = MurmurHash3.mix(h, d)
99 +
        n += 1
100 +
101 +
      case _ => // no-op
102 +
    }
103 +
104 +
    MurmurHash3.finalizeHash(h, n)
105 +
  }
88 106
}
89 107
90 108
/**
@@ -197,24 +215,11 @@
Loading
197 215
  }
198 216
199 217
  @volatile private[this] var _vectorHash: Int = -1
200 -
  // Make a hash of the passed in `vec` and set `vectorHash`.
201 -
  // Only an Inet address of the factory is considered and all
202 -
  // other address types are ignored.
203 -
  private[this] def updateVectorHash(vec: Seq[Node]): Unit = {
204 -
    // A specialized reimplementation of MurmurHash3.listHash
205 -
    val it = vec.iterator
206 -
    var n = 0
207 -
    var h = MurmurHash3.arraySeed
208 -
    while (it.hasNext) it.next().factory.address match {
209 -
      case Inet(addr, _) if !addr.isUnresolved =>
210 -
        val d = MurmurHash3.bytesHash(addr.getAddress.getAddress)
211 -
        h = MurmurHash3.mix(h, d)
212 -
        n += 1
213 218
214 -
      case _ => // no-op
215 -
    }
216 -
217 -
    _vectorHash = MurmurHash3.finalizeHash(h, n)
219 +
  // Make a hash of the passed in `vec` and set `vectorHash`.
220 +
  private[this] def updateVectorHash(vec: Vector[Node]): Unit = {
221 +
    val addrs = vec.iterator.map(_.factory.address)
222 +
    _vectorHash = Aperture.computeVectorHash(addrs)
218 223
  }
219 224
220 225
  protected[this] def vectorHash: Int = _vectorHash

@@ -127,12 +127,6 @@
Loading
127 127
    )
128 128
  }
129 129
130 -
  protected def isSinglePartition(request: Command): Future[Boolean] = request match {
131 -
    case _: StorageCommand | _: ArithmeticCommand | _: Delete => Future.True
132 -
    case _ if allKeysForSinglePartition(request) => Future.True
133 -
    case _ => Future.False
134 -
  }
135 -
136 130
  final protected def noPartitionInformationHandler(req: Command): Future[Nothing] = {
137 131
    val ex = new NoPartitioningKeys(s"NoPartitioningKeys in for the thrift method: ${req.name}")
138 132
    if (logger.isLoggable(Level.DEBUG))

@@ -1,9 +1,16 @@
Loading
1 1
package com.twitter.finagle.thrift.exp.partitioning
2 2
3 +
import com.twitter.finagle.partitioning.PartitionNodeManager
4 +
import com.twitter.finagle.thrift.exp.partitioning.ClientCustomStrategy.ToPartitionedMap
3 5
import com.twitter.finagle.thrift.exp.partitioning.PartitioningStrategy._
4 -
import com.twitter.finagle.thrift.exp.partitioning.ThriftCustomPartitioningService.PartitioningStrategyException
6 +
import com.twitter.finagle.thrift.exp.partitioning.ThriftPartitioningService.PartitioningStrategyException
7 +
import com.twitter.finagle.{Address, ServiceFactory, Stack}
5 8
import com.twitter.scrooge.{ThriftMethodIface, ThriftStructIface}
6 -
import com.twitter.util.{Future, Try}
9 +
import com.twitter.util.{Activity, Future, Try}
10 +
import java.lang.{Integer => JInteger}
11 +
import java.util.function.{BiFunction, IntUnaryOperator, Function => JFunction}
12 +
import java.util.{List => JList, Map => JMap, Set => JSet}
13 +
import scala.collection.JavaConverters._
7 14
import scala.collection.mutable
8 15
9 16
object PartitioningStrategy {
@@ -53,20 +60,10 @@
Loading
53 60
   */
54 61
  type ResponseMerger[Rep] = (Seq[Rep], Seq[Throwable]) => Try[Rep]
55 62
56 -
  object RequestMergerRegistry {
57 -
58 -
    /**
59 -
     * Create an empty RequestMergerRegistry.
60 -
     * @note The created RequestMergerRegistry is NOT thread safe, it carries an assumption
61 -
     *       that registries are written during client initialization.
62 -
     */
63 -
    def create(): RequestMergerRegistry = new RequestMergerRegistry()
64 -
  }
65 -
66 63
  /**
67 64
   * Maintain a map of method name to method's [[RequestMerger]].
68 65
   */
69 -
  class RequestMergerRegistry {
66 +
  final class RequestMergerRegistry private[partitioning] {
70 67
71 68
    // reqMerger Map is not thread safe, assuming `add` only be called during client
72 69
    // initialization and the reqMerger remains the same as request threads `get` from it.
@@ -75,6 +72,8 @@
Loading
75 72
76 73
    /**
77 74
     * Register a RequestMerger for a ThriftMethod.
75 +
     *
76 +
     * For a Java-friendly way to do the same thing, see `addResponseMerger`
78 77
     * @param method  ThriftMethod is a method endpoint defined in a thrift service
79 78
     * @param merger  see [[RequestMerger]]
80 79
     */
@@ -87,27 +86,30 @@
Loading
87 86
    }
88 87
89 88
    /**
90 -
     * Get a RequestMerger for a ThriftMethod.
91 -
     * @param methodName  The Thrift method name
89 +
     * Register a RequestMerger for a ThriftMethod.
90 +
     *
91 +
     * The same as add, but easier to use from Java.
92 +
     * @param method  ThriftMethod is a method endpoint defined in a thrift service
93 +
     * @param merger  see [[RequestMerger]]
92 94
     */
93 -
    def get(methodName: String): Option[RequestMerger[ThriftStructIface]] =
94 -
      reqMergers.get(methodName)
95 -
  }
96 -
97 -
  object ResponseMergerRegistry {
95 +
    def addRequestMerger[Req <: ThriftStructIface](
96 +
      method: ThriftMethodIface,
97 +
      // Needed for 2.11 compat. can be a scala fn A => B once we drop support
98 +
      merger: JFunction[JList[Req], Req]
99 +
    ): RequestMergerRegistry = add(method, { seq: Seq[Req] => seq.asJava }.andThen(merger.apply _))
98 100
99 101
    /**
100 -
     * Create an empty ResponseMergerRegistry.
101 -
     * @note The created ResponseMergerRegistry is NOT thread safe, it carries an assumption
102 -
     *       that registries are written during client initialization.
102 +
     * Get a RequestMerger for a ThriftMethod.
103 +
     * @param methodName  The Thrift method name
103 104
     */
104 -
    def create(): ResponseMergerRegistry = new ResponseMergerRegistry()
105 +
    private[finagle] def get(methodName: String): Option[RequestMerger[ThriftStructIface]] =
106 +
      reqMergers.get(methodName)
105 107
  }
106 108
107 109
  /**
108 110
   * Maintain a map of method name to method's [[ResponseMerger]].
109 111
   */
110 -
  class ResponseMergerRegistry {
112 +
  final class ResponseMergerRegistry private[partitioning] {
111 113
112 114
    // repMergers Map is not thread safe, assuming `add` is only called during client
113 115
    // initialization and the repMergers remains the same as request threads `get` from it.
@@ -115,6 +117,8 @@
Loading
115 117
116 118
    /**
117 119
     * Register a ResponseMerger for a ThriftMethod.
120 +
     *
121 +
     * For a Java-friendly way to do the same thing, see `addResponseMerger`
118 122
     * @param method  ThriftMethod is a method endpoint defined in a thrift service
119 123
     * @param merger  see [[ResponseMerger]]
120 124
     */
@@ -123,11 +127,28 @@
Loading
123 127
      this
124 128
    }
125 129
130 +
    /**
131 +
     * Register a ResponseMerger for a ThriftMethod.
132 +
     *
133 +
     * The same as add, but easier to use from Java.
134 +
     * @param method  ThriftMethod is a method endpoint defined in a thrift service
135 +
     * @param merger  see [[ResponseMerger]]
136 +
     */
137 +
    def addResponseMerger[Rep](
138 +
      method: ThriftMethodIface,
139 +
      merger: BiFunction[JList[Rep], JList[Throwable], Try[Rep]]
140 +
    ): ResponseMergerRegistry =
141 +
      add(
142 +
        method,
143 +
        { (reps: Seq[Rep], errs: Seq[Throwable]) =>
144 +
          merger.apply(reps.asJava, errs.asJava)
145 +
        })
146 +
126 147
    /**
127 148
     * Get a ResponseMerger for a ThriftMethod.
128 149
     * @param methodName  The Thrift method name
129 150
     */
130 -
    def get(methodName: String): Option[ResponseMerger[Any]] =
151 +
    private[finagle] def get(methodName: String): Option[ResponseMerger[Any]] =
131 152
      repMergers.get(methodName)
132 153
  }
133 154
}
@@ -135,52 +156,50 @@
Loading
135 156
/**
136 157
 * Service partitioning strategy to apply on the clients in order to let clients route
137 158
 * requests accordingly. Two particular partitioning strategies are going to be supported,
138 -
 * [[HashingPartitioningStrategy]] and [[CustomPartitioningStrategy]].
159 +
 * [[HashingPartitioningStrategy]] and [[CustomPartitioningStrategy]], each one supports
160 +
 * both configuring Finagle Client Stack and ThriftMux MethodBuilder.
139 161
 * Either one will need developers to provide a concrete function to give each request an
140 162
 * indicator of destination, for example a hashing key or a partition address.
141 163
 * Messaging fan-out is supported by leveraging RequestMerger and ResponseMerger.
142 164
 */
143 -
sealed trait PartitioningStrategy {
165 +
sealed trait PartitioningStrategy
144 166
145 -
  /**
146 -
   * A ResponseMergerRegistry implemented by client to supply [[ResponseMerger]]s.
147 -
   * For message fan-out cases.
148 -
   * @see [[ResponseMerger]]
149 -
   */
150 -
  val responseMergerRegistry: ResponseMergerRegistry = ResponseMergerRegistry.create()
151 -
}
167 +
sealed trait HashingPartitioningStrategy extends PartitioningStrategy
152 168
153 -
sealed trait HashingPartitioningStrategy extends PartitioningStrategy {
169 +
sealed trait CustomPartitioningStrategy extends PartitioningStrategy {
170 +
  private[finagle] def newNodeManager[Req, Rep](
171 +
    underlying: Stack[ServiceFactory[Req, Rep]],
172 +
    params: Stack.Params
173 +
  ): PartitionNodeManager[Req, Rep, _, ClientCustomStrategy.ToPartitionedMap]
154 174
155 175
  /**
156 -
   * A RequestMergerRegistry implemented by client to supply [[RequestMerger]]s.
157 -
   * For message fan-out cases.
158 -
   * @see [[RequestMerger]]
176 +
   * A ResponseMergerRegistry implemented by client to supply [[ResponseMerger]]s
177 +
   * for message fan-out cases.
178 +
   *
179 +
   * @see [[ResponseMerger]]
159 180
   */
160 -
  val requestMergerRegistry: RequestMergerRegistry = RequestMergerRegistry.create()
181 +
  val responseMergerRegistry: ResponseMergerRegistry = new ResponseMergerRegistry()
161 182
}
162 183
163 -
sealed trait CustomPartitioningStrategy extends PartitioningStrategy {
184 +
private[finagle] object Disabled extends PartitioningStrategy
185 +
186 +
object ClientHashingStrategy {
187 +
  // input: original thrift request
188 +
  // output: a Map of hashing keys and split requests
189 +
  type ToPartitionedMap = PartialFunction[ThriftStructIface, Map[Any, ThriftStructIface]]
164 190
165 191
  /**
166 -
   * Gets the logical partition identifier from a host identifier, host identifiers are derived
167 -
   * from [[ZkMetadata]] shardId. Indicates which logical partition a physical host belongs to,
168 -
   * multiple hosts can belong to the same partition, for example:
169 -
   * {{{
170 -
   *  val getLogicalPartition: Int => Int = {
171 -
   *    case a if Range(0, 10).contains(a) => 0
172 -
   *    case b if Range(10, 20).contains(b) => 1
173 -
   *    case c if Range(20, 30).contains(c) => 2
174 -
   *    case _ => throw ...
175 -
   *  }
176 -
   * }}}
177 -
   * The default is each host is a partition.
192 +
   * The Java-friendly way to create a [[ClientHashingStrategy]].
193 +
   * Scala users should construct a [[ClientHashStrategy]] directly.
194 +
   *
195 +
   * @note [[com.twitter.util.Function]] may be useful in helping create a [[scala.PartialFunction]].
178 196
   */
179 -
  def getLogicalPartition(instance: Int): Int = instance
180 -
}
181 -
private[partitioning] object Disabled extends PartitioningStrategy
182 -
183 -
object ClientHashingStrategy {
197 +
  def create(
198 +
    toPartitionedMap: PartialFunction[
199 +
      ThriftStructIface,
200 +
      JMap[Any, ThriftStructIface]
201 +
    ]
202 +
  ): ClientHashingStrategy = new ClientHashingStrategy(toPartitionedMap.andThen(_.asScala.toMap))
184 203
185 204
  /**
186 205
   * Thrift requests not specifying hashing keys will fall in here. This allows a
@@ -188,63 +207,547 @@
Loading
188 207
   * Un-specified endpoints should not be called from this client, otherwise, throw
189 208
   * [[com.twitter.finagle.partitioning.ConsistentHashPartitioningService.NoPartitioningKeys]].
190 209
   */
191 -
  val defaultHashingKeyAndRequest: ThriftStructIface => Map[Any, ThriftStructIface] = args =>
192 -
    Map(None -> args)
210 +
  private[finagle] val defaultHashingKeyAndRequest: ThriftStructIface => Map[
211 +
    Any,
212 +
    ThriftStructIface
213 +
  ] = args => Map(None -> args)
193 214
}
194 215
195 216
/**
196 217
 * An API to set a consistent hashing partitioning strategy for a Thrift/ThriftMux Client.
218 +
 * For a Java-friendly way to do the same thing, see `ClientHashingStrategy.create`
219 +
 *
220 +
 * @param getHashingKeyAndRequest A PartialFunction implemented by client that
221 +
 *        provides the partitioning logic on a request. It takes a Thrift object
222 +
 *        request, and returns a Map of hashing keys to sub-requests. If we
223 +
 *        don't need to fan-out, it should return one element: hashing key to
224 +
 *        the original request.  This PartialFunction can take multiple Thrift
225 +
 *        request types of one Thrift service (different method endpoints of one
226 +
 *        service).
197 227
 */
198 -
abstract class ClientHashingStrategy extends HashingPartitioningStrategy {
199 -
  // input: original thrift request
200 -
  // output: a Map of hashing keys and split requests
201 -
  type ToPartitionedMap = PartialFunction[ThriftStructIface, Map[Any, ThriftStructIface]]
228 +
final class ClientHashingStrategy(
229 +
  val getHashingKeyAndRequest: ClientHashingStrategy.ToPartitionedMap)
230 +
    extends HashingPartitioningStrategy {
202 231
203 232
  /**
204 -
   * A PartialFunction implemented by client that provides the partitioning logic on
205 -
   * a request. It takes a Thrift object request, and returns a Map of hashing keys to
206 -
   * sub-requests. If no fan-out needs, it should return one element: hashing key to the
207 -
   * original request.
208 -
   * This PartialFunction can take multiple Thrift request types of one Thrift service
209 -
   * (different method endpoints of one service).
233 +
   * A RequestMergerRegistry implemented by client to supply [[RequestMerger]]s
234 +
   * for message fan-out cases.
235 +
   * @see [[RequestMerger]]
210 236
   */
211 -
  def getHashingKeyAndRequest: ToPartitionedMap
237 +
  val requestMergerRegistry: RequestMergerRegistry = new RequestMergerRegistry()
238 +
239 +
  /**
240 +
   * A ResponseMergerRegistry implemented by client to supply [[ResponseMerger]]s
241 +
   * for message fan-out cases.
242 +
   * @see [[ResponseMerger]]
243 +
   */
244 +
  val responseMergerRegistry: ResponseMergerRegistry = new ResponseMergerRegistry()
245 +
}
246 +
247 +
object MethodBuilderHashingStrategy {
248 +
  // input: original thrift request
249 +
  // output: a Map of hashing keys and split requests
250 +
  type ToPartitionedMap[Req] = Req => Map[Any, Req]
251 +
}
252 +
253 +
/**
254 +
 * An API to set a hashing partitioning strategy for a client MethodBuilder.
255 +
 * For a Java-friendly way to do the same thing, see `MethodBuilderHashingStrategy.create`
256 +
 *
257 +
 * @param getHashingKeyAndRequest A function for the partitioning logic. MethodBuilder is
258 +
 *                                customized per-method so that this method only takes one
259 +
 *                                Thrift request type.
260 +
 * @param requestMerger           Supplies a [[RequestMerger]] for messaging fan-out.
261 +
 *                                Non-fan-out case the default is [[None]].
262 +
 * @param responseMerger          Supplies a [[ResponseMerger]] for messaging fan-out.
263 +
 *                                Non-fan-out case the default is [[None]].
264 +
 */
265 +
final class MethodBuilderHashingStrategy[Req <: ThriftStructIface, Rep](
266 +
  val getHashingKeyAndRequest: MethodBuilderHashingStrategy.ToPartitionedMap[Req],
267 +
  val requestMerger: Option[RequestMerger[Req]],
268 +
  val responseMerger: Option[ResponseMerger[Rep]])
269 +
    extends HashingPartitioningStrategy {
270 +
271 +
  def this(getHashingKeyAndRequest: MethodBuilderHashingStrategy.ToPartitionedMap[Req]) =
272 +
    this(getHashingKeyAndRequest, None, None)
212 273
}
213 274
214 275
object ClientCustomStrategy {
276 +
  // input: original thrift request
277 +
  // output: Future Map of partition ids and split requests
278 +
  type ToPartitionedMap = PartialFunction[ThriftStructIface, Future[Map[Int, ThriftStructIface]]]
279 +
280 +
  /**
281 +
   * Constructs a [[ClientCustomStrategy]] that does not reshard.
282 +
   *
283 +
   * This is appropriate for simple custom strategies where you never need to
284 +
   * change which shard a given request would go to, and you neither add nor
285 +
   * remove shards.
286 +
   *
287 +
   * Java users should see [[ClientCustomStrategies$]] for an easier to use API.
288 +
   *
289 +
   * @param getPartitionIdAndRequest A PartialFunction implemented by client that
290 +
   *        provides the partitioning logic on a request. It takes a Thrift object
291 +
   *        request, and returns Future Map of partition ids to sub-requests. If
292 +
   *        we don't need to fan-out, it should return one element: partition id
293 +
   *        to the original request.  This PartialFunction can take multiple
294 +
   *        Thrift request types of one Thrift service (different method endpoints
295 +
   *        of one service).  In this context, the returned partition id is also
296 +
   *        the shard id.  Each instance is its own partition.
297 +
   */
298 +
  def noResharding(
299 +
    getPartitionIdAndRequest: ClientCustomStrategy.ToPartitionedMap
300 +
  ): CustomPartitioningStrategy =
301 +
    noResharding(getPartitionIdAndRequest, identity[Int])
302 +
303 +
  /**
304 +
   * Constructs a [[ClientCustomStrategy]] that does not reshard.
305 +
   *
306 +
   * This is appropriate for simple custom strategies where you never need to
307 +
   * change which shard a given request would go to, and you neither add nor
308 +
   * remove shards.
309 +
   *
310 +
   * Java users should see [[ClientCustomStrategies$]] for an easier to use API.
311 +
   *
312 +
   * @param getPartitionIdAndRequest A PartialFunction implemented by client that
313 +
   *        provides the partitioning logic on a request. It takes a Thrift object
314 +
   *        request, and returns Future Map of partition ids to sub-requests. If
315 +
   *        we don't need to fan-out, it should return one element: partition id
316 +
   *        to the original request.  This PartialFunction can take multiple
317 +
   *        Thrift request types of one Thrift service (different method endpoints
318 +
   *        of one service).
319 +
   * @param getLogicalPartitionId Gets the logical partition identifier from a host
320 +
   *        identifier, host identifiers are derived from [[ZkMetadata]]
321 +
   *        shardId. Indicates which logical partition a physical host belongs to,
322 +
   *        multiple hosts can belong to the same partition, for example:
323 +
   *        {{{
324 +
   *          {
325 +
   *            case a if Range(0, 10).contains(a) => 0
326 +
   *            case b if Range(10, 20).contains(b) => 1
327 +
   *            case c if Range(20, 30).contains(c) => 2
328 +
   *            case _ => throw ...
329 +
   *          }
330 +
   *        }}}
331 +
   */
332 +
  def noResharding(
333 +
    getPartitionIdAndRequest: ClientCustomStrategy.ToPartitionedMap,
334 +
    getLogicalPartitionId: Int => Int
335 +
  ): CustomPartitioningStrategy =
336 +
    new ClientCustomStrategy[Unit](
337 +
      _ => getPartitionIdAndRequest,
338 +
      _ => getLogicalPartitionId,
339 +
      Activity.value(()))
340 +
341 +
  /**
342 +
   * Constructs a [[ClientCustomStrategy]] that reshards based on the remote cluster state.
343 +
   *
344 +
   * This is appropriate for simple custom strategies where you only need to
345 +
   * know information about the remote cluster in order to reshard. For example,
346 +
   * if you want to be able to add or remove capacity safely.
347 +
   *
348 +
   * Java users should see [[ClientCustomStrategies$]] for an easier to use API.
349 +
   *
350 +
   * @param getPartitionIdAndRequestFn A function that given the current state of the
351 +
   *        remote cluster, returns a function that gets the logical partition
352 +
   *        identifier from a host identifier, host identifiers are derived from
353 +
   *        [[ZkMetadata]] shardId. Indicates which logical partition a physical
354 +
   *        host belongs to, multiple hosts can belong to the same partition,
355 +
   *        for example:
356 +
   *        {{{
357 +
   *          {
358 +
   *            case a if Range(0, 10).contains(a) => 0
359 +
   *            case b if Range(10, 20).contains(b) => 1
360 +
   *            case c if Range(20, 30).contains(c) => 2
361 +
   *            case _ => throw ...
362 +
   *          }
363 +
   *        }}}
364 +
   *        Note that this function must be pure (ie referentially transparent).
365 +
   *        It cannot change based on anything other than the state of the
366 +
   *        remote cluster it is provided with, or else it will malfunction.
367 +
   */
368 +
  def clusterResharding(
369 +
    getPartitionIdAndRequestFn: Set[Address] => ClientCustomStrategy.ToPartitionedMap
370 +
  ): CustomPartitioningStrategy =
371 +
    clusterResharding(getPartitionIdAndRequestFn, _ => identity[Int])
372 +
373 +
  /**
374 +
   * Constructs a [[ClientCustomStrategy]] that reshards based on the remote cluster state.
375 +
   *
376 +
   * This is appropriate for simple custom strategies where you only need to
377 +
   * know information about the remote cluster in order to reshard. For example,
378 +
   * if you want to be able to add or remove capacity safely.
379 +
   *
380 +
   * Java users should see [[ClientCustomStrategies$]] for an easier to use API.
381 +
   *
382 +
   * @param getPartitionIdAndRequestFn A function that given the current state of
383 +
   *        the remote cluster, returns a PartialFunction implemented by client
384 +
   *        that provides the partitioning logic on a request. It takes a Thrift
385 +
   *        object request, and returns Future Map of partition ids to
386 +
   *        sub-requests. If we don't need to fan-out, it should return one
387 +
   *        element: partition id to the original request.  This PartialFunction
388 +
   *        can take multiple Thrift request types of one Thrift service
389 +
   *        (different method endpoints of one service).  Note that this function
390 +
   *        must be pure (ie referentially transparent).  It cannot change
391 +
   *        based on anything other than the state of the remote cluster it is
392 +
   *        provided with, or else it will malfunction.
393 +
   * @param getLogicalPartitionIdFn A function that given the current state of the
394 +
   *        remote cluster, returns a function that gets the logical partition
395 +
   *        identifier from a host identifier, host identifiers are derived from
396 +
   *        [[ZkMetadata]] shardId. Indicates which logical partition a physical
397 +
   *        host belongs to, multiple hosts can belong to the same partition,
398 +
   *        for example:
399 +
   *        {{{
400 +
   *          {
401 +
   *            case a if Range(0, 10).contains(a) => 0
402 +
   *            case b if Range(10, 20).contains(b) => 1
403 +
   *            case c if Range(20, 30).contains(c) => 2
404 +
   *            case _ => throw ...
405 +
   *          }
406 +
   *        }}}
407 +
   *        Note that this function must be pure (ie referentially transparent).
408 +
   *        It cannot change based on anything other than the state of the
409 +
   *        remote cluster it is provided with, or else it will malfunction.
410 +
   */
411 +
  def clusterResharding(
412 +
    getPartitionIdAndRequestFn: Set[Address] => ClientCustomStrategy.ToPartitionedMap,
413 +
    getLogicalPartitionIdFn: Set[Address] => Int => Int
414 +
  ): CustomPartitioningStrategy =
415 +
    new ClientClusterStrategy(getPartitionIdAndRequestFn, getLogicalPartitionIdFn)
416 +
417 +
  /**
418 +
   * Constructs a [[ClientCustomStrategy]] that reshards based on the remote cluster state.
419 +
   *
420 +
   * This is appropriate for simple custom strategies where you only need to
421 +
   * know information about the remote cluster in order to reshard. For example,
422 +
   * if you want to be able to add or remove capacity safely.
423 +
   *
424 +
   * Java users should see [[ClientCustomStrategies$]] for an easier to use API.
425 +
   *
426 +
   * @param getPartitionIdAndRequestFn A function that given the current state of
427 +
   *        `observable`, returns a PartialFunction implemented by client that
428 +
   *        provides the partitioning logic on a request. It takes a Thrift
429 +
   *        object request, and returns Future Map of partition ids to
430 +
   *        sub-requests. If we don't need to fan-out, it should return one
431 +
   *        element: partition id to the original request.  This PartialFunction
432 +
   *        can take multiple Thrift request types of one Thrift service
433 +
   *        (different method endpoints of one service).  Note that this
434 +
   *        function must be pure (ie referentially transparent).  It cannot
435 +
   *        change based on anything other than the state of `observable`, or
436 +
   *        else it will malfunction.
437 +
   * @param observable The state that is used for deciding how to reshard the
438 +
   *        cluster.
439 +
   */
440 +
  def resharding[A](
441 +
    getPartitionIdAndRequestFn: A => ClientCustomStrategy.ToPartitionedMap,
442 +
    observable: Activity[A]
443 +
  ): CustomPartitioningStrategy =
444 +
    resharding[A](getPartitionIdAndRequestFn, (_: A) => (identity[Int](_)), observable)
445 +
446 +
  /**
447 +
   * Constructs a [[ClientCustomStrategy]] that reshards based on the remote cluster state.
448 +
   *
449 +
   * This is appropriate for simple custom strategies where you only need to
450 +
   * know information about the remote cluster in order to reshard. For example,
451 +
   * if you want to be able to add or remove capacity safely.
452 +
   *
453 +
   * Java users should see [[ClientCustomStrategies$]] for an easier to use API.
454 +
   *
455 +
   * @param getPartitionIdAndRequestFn A function that given the current state of
456 +
   *        `observable`, returns a PartialFunction implemented by client that
457 +
   *        provides the partitioning logic on a request. It takes a Thrift
458 +
   *        object request, and returns Future Map of partition ids to
459 +
   *        sub-requests. If we don't need to fan-out, it should return one
460 +
   *        element: partition id to the original request.  This PartialFunction
461 +
   *        can take multiple Thrift request types of one Thrift service
462 +
   *        (different method endpoints of one service).  Note that this
463 +
   *        function must be pure (ie referentially transparent).  It cannot
464 +
   *        change based on anything other than the state of `observable`, or
465 +
   *        else it will malfunction.
466 +
   * @param getLogicalPartitionIdFn A function that given the current state
467 +
   *        `observable`, returns a function that gets the logical partition
468 +
   *        identifier from a host identifier, host identifiers are derived from
469 +
   *        [[ZkMetadata]] shardId. Indicates which logical partition a physical
470 +
   *        host belongs to, multiple hosts can belong to the same partition,
471 +
   *        for example:
472 +
   *        {{{
473 +
   *          {
474 +
   *            case a if Range(0, 10).contains(a) => 0
475 +
   *            case b if Range(10, 20).contains(b) => 1
476 +
   *            case c if Range(20, 30).contains(c) => 2
477 +
   *            case _ => throw ...
478 +
   *          }
479 +
   *        }}}
480 +
   *        Note that this function must be pure (ie referentially transparent).
481 +
   *        It cannot change based on anything other than the state of
482 +
   *        `observable`, or else it will malfunction.
483 +
   * @param observable The state that is used for deciding how to reshard the
484 +
   *        cluster.
485 +
   */
486 +
  def resharding[A](
487 +
    getPartitionIdAndRequestFn: A => ClientCustomStrategy.ToPartitionedMap,
488 +
    getLogicalPartitionIdFn: A => Int => Int,
489 +
    observable: Activity[A]
490 +
  ): CustomPartitioningStrategy =
491 +
    new ClientCustomStrategy(getPartitionIdAndRequestFn, getLogicalPartitionIdFn, observable)
215 492
216 493
  /**
217 494
   * Thrift requests not specifying partition ids will fall in here. This allows a
218 495
   * Thrift/ThriftMux partition aware client to serve a part of endpoints of a service.
219 496
   * Un-specified endpoints should not be called from this client, otherwise, throw
220 -
   * [[com.twitter.finagle.thrift.exp.partitioning.ThriftCustomPartitioningService.PartitioningStrategyException]].
221 -
   */
222 -
  val defaultPartitionIdAndRequest: ThriftStructIface => Future[Map[Int, ThriftStructIface]] =
223 -
    _ =>
224 -
      Future.exception(
225 -
        new PartitioningStrategyException(
226 -
          "An unspecified endpoint has been applied to the partitioning service, please check " +
227 -
            "your ClientCustomStrategy.getPartitionIdAndRequest see if the endpoint is defined"))
497 +
   * [[com.twitter.finagle.thrift.exp.partitioning.ThriftPartitioningService.PartitioningStrategyException]].
498 +
   */
499 +
  private[finagle] val defaultPartitionIdAndRequest: ThriftStructIface => Future[
500 +
    Map[Int, ThriftStructIface]
501 +
  ] = { _ =>
502 +
    Future.exception(
503 +
      new PartitioningStrategyException(
504 +
        "An unspecified endpoint has been applied to the partitioning service, please check " +
505 +
          "your ClientCustomStrategy.getPartitionIdAndRequest see if the endpoint is defined"))
506 +
  }
228 507
}
229 508
230 509
/**
231 -
 * An API to set a custom partitioning strategy for a Thrift/ThriftMux Client.
510 +
 * The java-friendly way to create a [[ClientCustomStrategy]].
511 +
 * Scala users should instead use the parallel methods on [[ClientCustomStrategy$]].
512 +
 *
513 +
 * @note [[com.twitter.util.Function]] may be useful in helping create a [[scala.PartialFunction]].
232 514
 */
233 -
abstract class ClientCustomStrategy extends CustomPartitioningStrategy {
234 -
  // input: original thrift request
235 -
  // output: Future Map of partition ids and split requests
236 -
  type ToPartitionedMap = PartialFunction[ThriftStructIface, Future[Map[Int, ThriftStructIface]]]
515 +
object ClientCustomStrategies {
516 +
517 +
  type ToPartitionedMap = PartialFunction[
518 +
    ThriftStructIface,
519 +
    Future[JMap[JInteger, ThriftStructIface]]
520 +
  ]
521 +
522 +
  private[this] def toJavaSet[A]: PartialFunction[Set[A], JSet[A]] = { case set => set.asJava }
523 +
  private[this] val toScalaFutureMap: Future[JMap[JInteger, ThriftStructIface]] => Future[
524 +
    Map[Int, ThriftStructIface]
525 +
  ] = (_.map(_.asScala.toMap.map {
526 +
    case (k, v) => (k.toInt, v)
527 +
  }))
528 +
529 +
  /**
530 +
   * The java-friendly way to create a [[ClientCustomStrategy]].
531 +
   * Scala users should instead use the parallel methods on [[ClientCustomStrategy$]].
532 +
   *
533 +
   * @note [[com.twitter.util.Function]] may be useful in helping create a [[scala.PartialFunction]].
534 +
   */
535 +
  def noResharding(toPartitionedMap: ToPartitionedMap): CustomPartitioningStrategy =
536 +
    ClientCustomStrategy.noResharding(toPartitionedMap.andThen(toScalaFutureMap))
537 +
538 +
  /**
539 +
   * The java-friendly way to create a [[ClientCustomStrategy]].
540 +
   * Scala users should instead use the parallel methods on [[ClientCustomStrategy$]].
541 +
   *
542 +
   * @note [[com.twitter.util.Function]] may be useful in helping create a [[scala.PartialFunction]].
543 +
   */
544 +
  def noResharding(
545 +
    toPartitionedMap: ToPartitionedMap,
546 +
    getLogicalPartitionId: IntUnaryOperator
547 +
  ): CustomPartitioningStrategy = ClientCustomStrategy.noResharding(
548 +
    toPartitionedMap.andThen(toScalaFutureMap),
549 +
    getLogicalPartitionId.applyAsInt _)
550 +
551 +
  /**
552 +
   * The java-friendly way to create a [[ClientCustomStrategy]].
553 +
   * Scala users should instead use the parallel methods on [[ClientCustomStrategy$]].
554 +
   *
555 +
   * @note [[com.twitter.util.Function]] may be useful in helping create a [[scala.PartialFunction]].
556 +
   */
557 +
  def clusterResharding(
558 +
    getPartitionIdAndRequestFn: JFunction[JSet[Address], ToPartitionedMap]
559 +
  ): CustomPartitioningStrategy =
560 +
    ClientCustomStrategy.clusterResharding(
561 +
      toJavaSet[Address]
562 +
        .andThen(getPartitionIdAndRequestFn.apply(_).andThen(toScalaFutureMap)))
563 +
564 +
  /**
565 +
   * The java-friendly way to create a [[ClientCustomStrategy]].
566 +
   * Scala users should instead use the parallel methods on [[ClientCustomStrategy$]].
567 +
   *
568 +
   * @note [[com.twitter.util.Function]] may be useful in helping create a [[scala.PartialFunction]].
569 +
   */
570 +
  def clusterResharding(
571 +
    getPartitionIdAndRequestFn: JFunction[JSet[Address], ToPartitionedMap],
572 +
    getLogicalPartitionIdFn: JFunction[JSet[Address], IntUnaryOperator]
573 +
  ): CustomPartitioningStrategy =
574 +
    ClientCustomStrategy.clusterResharding(
575 +
      toJavaSet[Address]
576 +
        .andThen(getPartitionIdAndRequestFn.apply(_).andThen(toScalaFutureMap)),
577 +
      toJavaSet[Address].andThen(getLogicalPartitionIdFn.apply _).andThen(op => op.applyAsInt _)
578 +
    )
237 579
238 580
  /**
239 -
   * A PartialFunction implemented by client that provides the partitioning logic on
240 -
   * a request. It takes a Thrift object request, and returns Future Map of partition ids to
241 -
   * sub-requests. If no fan-out needs, it should return one element: partition id to the
242 -
   * original request.
243 -
   * This PartialFunction can take multiple Thrift request types of one Thrift service
244 -
   * (different method endpoints of one service).
581 +
   * The java-friendly way to create a [[ClientCustomStrategy]].
582 +
   * Scala users should instead use the parallel methods on [[ClientCustomStrategy$]].
245 583
   *
246 -
   * @note  When updating the partition topology dynamically, there is a potential one-time
247 -
   *        mismatch if a Service Discovery update happens after getPartitionIdAndRequest.
584 +
   * @note [[com.twitter.util.Function]] may be useful in helping create a [[scala.PartialFunction]].
248 585
   */
249 -
  def getPartitionIdAndRequest: ToPartitionedMap
586 +
  def resharding[A](
587 +
    getPartitionIdAndRequestFn: JFunction[A, ToPartitionedMap],
588 +
    observable: Activity[A]
589 +
  ): CustomPartitioningStrategy =
590 +
    ClientCustomStrategy
591 +
      .resharding[A](
592 +
        { a: A =>
593 +
          getPartitionIdAndRequestFn.apply(a).andThen(toScalaFutureMap)
594 +
        },
595 +
        observable)
596 +
597 +
  /**
598 +
   * The java-friendly way to create a [[ClientCustomStrategy]].
599 +
   * Scala users should instead use the parallel methods on [[ClientCustomStrategy$]].
600 +
   *
601