@@ -82,7 +82,16 @@
Loading
82 82
 *
83 83
 * @param underlying          Finagle client stack
84 84
 *
85 -
 * @param getLogicalPartition Getting the logical partition identifier from a host identifier.
85 +
 * @param observable          The state that determines the sharding scheme we use.  The updated values
86 +
 *                            are used by `getPartitionFunctionPerState` and `getLogicalPartitionPerState`
87 +
 *                            as soon as they're updated.
88 +
 *
89 +
 * @param getPartitionFunctionPerState When given a state, gets the partitioning function, which can be used
90 +
 *                            to describe which partitions a request should be subdivided into, and how a request
91 +
 *                            should be sliced and diced for each partition.
92 +
 *                            Note that this function must be referentially transparent.
93 +
 *
94 +
 * @param getLogicalPartitionPerState When given a state, gets the logical partition identifier from a host identifier.
86 95
 *                            Reverse lookup. Indicates which logical partition a physical host
87 96
 *                            belongs to, this is provided by client configuration when needed,
88 97
 *                            multiple hosts can belong to the same partition, for example:
@@ -97,12 +106,30 @@
Loading
97 106
 *                            if not provided, each host is a partition.
98 107
 *                            Host identifiers are derived from [[ZkMetadata]] shardId, logical
99 108
 *                            partition identifiers are defined by users in [[PartitioningStrategy]]
109 +
 *                            Note that this function must be referentially transparent.
100 110
 *
101 111
 * @param params              Configured Finagle client params
112 +
 *
113 +
 * @tparam Req the request type
114 +
 *
115 +
 * @tparam Rep the response type
116 +
 *
117 +
 * @tparam A   parameterizes the observable.  this is the type of the state that determines how
118 +
 *             requests get partitioned.
119 +
 *
120 +
 * @tparam B   the type of a partitioning function that will be snapshotted given a new state of
121 +
 *             type B.
102 122
 */
103 -
private[finagle] class PartitionNodeManager[Req, Rep](
123 +
private[finagle] class PartitionNodeManager[
124 +
  Req,
125 +
  Rep,
126 +
  A,
127 +
  B >: PartialFunction[Any, Future[Nothing]]
128 +
](
104 129
  underlying: Stack[ServiceFactory[Req, Rep]],
105 -
  getLogicalPartition: Int => Int,
130 +
  observable: Activity[A],
131 +
  getPartitionFunctionPerState: A => B,
132 +
  getLogicalPartitionPerState: A => Int => Int,
106 133
  params: Stack.Params)
107 134
    extends Closable { self =>
108 135
@@ -115,21 +142,24 @@
Loading
115 142
    stats.scope("partitioner")
116 143
  }
117 144
118 -
  private[this] val partitionServiceNodes =
119 -
    new AtomicReference[Map[Int, ServiceFactory[Req, Rep]]]()
145 +
  // we initialize this immediately because we filter the event that keeps
146 +
  // track of partitions later.  if some of them are empty, this will be null.
147 +
  // it should at least be shaped right, so that the exceptions we get are more
148 +
  // useful than NPEs
149 +
  // TODO we should return a DelayingServiceFactory until `observable` is no
150 +
  // longer pending
151 +
  private[this] val partitionServiceNodes = new AtomicReference(
152 +
    SnapPartitioner.uninitialized[Req, Rep, B]
153 +
  )
120 154
121 155
  private[this] val partitionerMetrics =
122 -
    statsReceiver.addGauge("nodes") { partitionServiceNodes.get.size }
156 +
    statsReceiver.addGauge("nodes") { partitionServiceNodes.get.partitionMapping.size }
123 157
124 158
  // Keep track of addresses in the current set that already have associate instances
125 159
  private[this] val destActivity = varAddrToActivity(params[LoadBalancerFactory.Dest].va, label)
126 160
127 -
  // listen to the WeightedAddress changes, transform the changes to a stream of
128 -
  // partition id (includes errors) to [[CachedServiceFactory]].
129 -
  private[this] val partitionAddressChanges: Event[
130 -
    Activity.State[Map[Try[Int], CachedServiceFactory[Req, Rep]]]
131 -
  ] = {
132 -
    val cachedServiceFactoryDiffOps = new DiffOps[Address, CachedServiceFactory[Req, Rep]] {
161 +
  private[this] val cachedServiceFactoryDiffOps =
162 +
    new DiffOps[Address, CachedServiceFactory[Req, Rep]] {
133 163
      def remove(factory: CachedServiceFactory[Req, Rep]): Unit = factory.close()
134 164
135 165
      def add(addresses: Set[Address]): CachedServiceFactory[Req, Rep] =
@@ -144,79 +174,88 @@
Loading
144 174
      }
145 175
    }
146 176
147 -
    val getShardIdFromAddress: Address => Try[Int] = { address =>
148 -
      val addressMetadata = address match {
149 -
        case WeightedAddress(Address.Inet(_, metadata), _) => metadata
150 -
        case Address.ServiceFactory(_, metadata) => metadata
177 +
  private[this] val getShardIdFromAddress: A => Address => Try[Int] = {
178 +
    state =>
179 +
      { addr =>
180 +
        val metadata = addr match {
181 +
          case WeightedAddress(Address.Inet(_, metadata), _) => metadata
182 +
          case Address.ServiceFactory(_, metadata) => metadata
183 +
        }
184 +
        ZkMetadata.fromAddrMetadata(metadata).flatMap(_.shardId) match {
185 +
          case Some(id) =>
186 +
            try {
187 +
              val partitionId = getLogicalPartitionPerState(state)(id)
188 +
              Return(partitionId)
189 +
            } catch {
190 +
              case NonFatal(e) =>
191 +
                logger.log(Level.ERROR, "getLogicalPartition failed with: ", e)
192 +
                Throw(e)
193 +
            }
194 +
          case None =>
195 +
            val ex = new NoShardIdException(s"cannot get shardId from $metadata")
196 +
            logger.log(Level.ERROR, "getLogicalPartition failed with: ", ex)
197 +
            Throw(ex)
198 +
        }
151 199
      }
152 -
      ZkMetadata.fromAddrMetadata(addressMetadata).flatMap(_.shardId) match {
153 -
        case Some(id) =>
154 -
          try {
155 -
            val partitionId = getLogicalPartition(id)
156 -
            Return(partitionId)
157 -
          } catch {
158 -
            case NonFatal(e) =>
159 -
              logger.log(Level.ERROR, "getLogicalPartition failed with: ", e)
160 -
              Throw(e)
161 -
          }
162 -
        case None =>
163 -
          val ex = new NoShardIdException(s"cannot get shardId from $addressMetadata")
164 -
          logger.log(Level.ERROR, "getLogicalPartition failed with: ", ex)
165 -
          Throw(ex)
166 -
      }
167 -
    }
200 +
  }
168 201
169 -
    val init = Map.empty[Try[Int], CachedServiceFactory[Req, Rep]]
170 -
    safelyScanLeft(init, destActivity.states) { (partitionNodes, activeSet) =>
171 -
      updatePartitionMap[Try[Int], CachedServiceFactory[Req, Rep], Address](
172 -
        partitionNodes,
173 -
        activeSet,
174 -
        getShardIdFromAddress,
175 -
        cachedServiceFactoryDiffOps
176 -
      )
202 +
  // listen to the WeightedAddress changes, transform the changes to a stream of
203 +
  // partition id (includes errors) to [[CachedServiceFactory]].
204 +
  private[this] val partitionAddressChanges: Activity[
205 +
    (B, Map[Try[Int], CachedServiceFactory[Req, Rep]])
206 +
  ] = {
207 +
    val init: (B, Map[Try[Int], CachedServiceFactory[Req, Rep]]) =
208 +
      (PartialFunction.empty, Map.empty)
209 +
    safelyScanLeft(init, destActivity.join(observable)) {
210 +
      case ((_, partitionNodes), (activeSet, state)) =>
211 +
        (
212 +
          getPartitionFunctionPerState(state),
213 +
          updatePartitionMap[Try[Int], CachedServiceFactory[Req, Rep], Address](
214 +
            partitionNodes,
215 +
            activeSet,
216 +
            getShardIdFromAddress(state),
217 +
            cachedServiceFactoryDiffOps
218 +
          ))
177 219
    }
178 220
  }
179 221
180 222
  // Transform the stream of [[CachedServiceFactory]] to ServiceFactory and filter out
181 223
  // the failed partition id
182 -
  private[this] val partitionNodesChange: Event[Map[Int, ServiceFactory[Req, Rep]]] = {
183 -
    val init = Map.empty[Int, ServiceFactory[Req, Rep]]
224 +
  private[this] val partitionNodesChange: Event[SnapPartitioner[Req, Rep, B]] = {
225 +
    val init = SnapPartitioner.uninitialized[Req, Rep, B]
184 226
    partitionAddressChanges
227 +
      .states
185 228
      .foldLeft(init) {
186 -
        case (_, Activity.Ok(partitions)) =>
229 +
        case (_, Activity.Ok((partitionFn, partitions))) =>
187 230
          // this could possibly be an empty update if getLogicalPartition returns all Throws
188 -
          partitions.filter(_._1.isReturn).map {
189 -
            case (key, sf) => (key.get() -> sf.factory)
190 -
          }
231 +
          SnapPartitioner(
232 +
            partitionFn,
233 +
            partitions.collect {
234 +
              case (Return(key), sf) => (key -> sf.factory)
235 +
            })
191 236
        case (staleState, _) => staleState
192 -
      }.filter(_.nonEmpty)
237 +
      }.filter(_.partitionMapping.nonEmpty)
193 238
  }
194 239
195 240
  private[this] val nodeWatcher: Closable =
196 241
    partitionNodesChange.register(Witness(partitionServiceNodes))
197 242
198 243
  /**
199 -
   * Returns a Future of [[Service]] which maps to the given partitionId.
200 -
   *
201 -
   * Note: The caller is responsible for relinquishing the use of the returned [[Service]
202 -
   * by calling [[Service#close()]]. Close this node manager will close all underlying services.
203 -
   *
204 -
   * @param partitionId logical partition id
244 +
   * Returns a [[SnapPartitioner]] which describes how to partition requests.
205 245
   */
206 -
  def getServiceByPartitionId(partitionId: Int): Future[Service[Req, Rep]] = {
207 -
    partitionServiceNodes.get.get(partitionId) match {
208 -
      case Some(factory) => factory()
209 -
      case None =>
210 -
        Future.exception(
211 -
          new NoPartitionException(s"No partition: $partitionId found in the node manager"))
212 -
    }
213 -
  }
246 +
  def snapshotSharder(): SnapPartitioner[Req, Rep, B] = partitionServiceNodes.get
214 247
215 248
  /**
216 -
   * When close the node manager, all underlying services are closed.
249 +
   * When we close the node manager, all underlying services are closed.
217 250
   */
218 -
  def close(deadline: Time): Future[Unit] = self.synchronized {
219 -
    nodeWatcher.close(deadline)
220 -
    Closable.all(partitionServiceNodes.get.values.toSeq: _*).close(deadline)
251 +
  def close(deadline: Time): Future[Unit] = {
252 +
    partitionerMetrics.remove()
253 +
    // we want to ensure that nodeWatcher stops updating the partitionServiceNodes
254 +
    // before we start closing them
255 +
    Closable
256 +
      .sequence(
257 +
        nodeWatcher,
258 +
        Closable.all(partitionServiceNodes.get.partitionMapping.values.toSeq: _*)
259 +
      ).close(deadline)
221 260
  }
222 261
}

@@ -1,14 +1,17 @@
Loading
1 1
package com.twitter.finagle.thrift.exp.partitioning
2 2
3 +
import com.twitter.finagle.partitioning.PartitionNodeManager
4 +
import com.twitter.finagle.thrift.exp.partitioning.ClientCustomStrategy.ToPartitionedMap
3 5
import com.twitter.finagle.thrift.exp.partitioning.PartitioningStrategy._
4 6
import com.twitter.finagle.thrift.exp.partitioning.ThriftPartitioningService.PartitioningStrategyException
7 +
import com.twitter.finagle.{Address, ServiceFactory, Stack}
5 8
import com.twitter.scrooge.{ThriftMethodIface, ThriftStructIface}
6 -
import com.twitter.util.{Future, Try}
9 +
import com.twitter.util.{Activity, Future, Try}
7 10
import java.lang.{Integer => JInteger}
8 -
import java.util.function.{BiFunction, Function => JFunction, IntUnaryOperator}
9 -
import java.util.{List => JList, Map => JMap}
10 -
import scala.collection.mutable
11 +
import java.util.function.{BiFunction, IntUnaryOperator, Function => JFunction}
12 +
import java.util.{List => JList, Map => JMap, Set => JSet}
11 13
import scala.collection.JavaConverters._
14 +
import scala.collection.mutable
12 15
13 16
object PartitioningStrategy {
14 17
@@ -164,21 +167,18 @@
Loading
164 167
sealed trait HashingPartitioningStrategy extends PartitioningStrategy
165 168
166 169
sealed trait CustomPartitioningStrategy extends PartitioningStrategy {
170 +
  private[finagle] def newNodeManager[Req, Rep](
171 +
    underlying: Stack[ServiceFactory[Req, Rep]],
172 +
    params: Stack.Params
173 +
  ): PartitionNodeManager[Req, Rep, _, ClientCustomStrategy.ToPartitionedMap]
167 174
168 175
  /**
169 -
   * Gets the logical partition identifier from a host identifier, host identifiers are derived
170 -
   * from [[ZkMetadata]] shardId. Indicates which logical partition a physical host belongs to,
171 -
   * multiple hosts can belong to the same partition, for example:
172 -
   * {{{
173 -
   *  val getLogicalPartition: Int => Int = {
174 -
   *    case a if Range(0, 10).contains(a) => 0
175 -
   *    case b if Range(10, 20).contains(b) => 1
176 -
   *    case c if Range(20, 30).contains(c) => 2
177 -
   *    case _ => throw ...
178 -
   *  }
179 -
   * }}}
176 +
   * A ResponseMergerRegistry implemented by client to supply [[ResponseMerger]]s
177 +
   * for message fan-out cases.
178 +
   *
179 +
   * @see [[ResponseMerger]]
180 180
   */
181 -
  def getLogicalPartition(instance: Int): Int
181 +
  val responseMergerRegistry: ResponseMergerRegistry = new ResponseMergerRegistry()
182 182
}
183 183
184 184
private[finagle] object Disabled extends PartitioningStrategy
@@ -278,34 +278,217 @@
Loading
278 278
  type ToPartitionedMap = PartialFunction[ThriftStructIface, Future[Map[Int, ThriftStructIface]]]
279 279
280 280
  /**
281 -
   * The java-friendly way to create a [[ClientCustomStrategy]].
282 -
   * Scala users should construct a [[ClientCustomStrategy]] directly.
281 +
   * Constructs a [[ClientCustomStrategy]] that does not reshard.
283 282
   *
284 -
   * @note [[com.twitter.util.Function]] may be useful in helping create a [[scala.PartialFunction]].
283 +
   * This is appropriate for simple custom strategies where you never need to
284 +
   * change which shard a given request would go to, and you neither add nor
285 +
   * remove shards.
286 +
   *
287 +
   * Java users should see [[ClientCustomStrategies$]] for an easier to use API.
288 +
   *
289 +
   * @param getPartitionIdAndRequest A PartialFunction implemented by client that
290 +
   *        provides the partitioning logic on a request. It takes a Thrift object
291 +
   *        request, and returns Future Map of partition ids to sub-requests. If
292 +
   *        we don't need to fan-out, it should return one element: partition id
293 +
   *        to the original request.  This PartialFunction can take multiple
294 +
   *        Thrift request types of one Thrift service (different method endpoints
295 +
   *        of one service).  In this context, the returned partition id is also
296 +
   *        the shard id.  Each instance is its own partition.
285 297
   */
286 -
  def create(
287 -
    toPartitionedMap: PartialFunction[
288 -
      ThriftStructIface,
289 -
      Future[JMap[JInteger, ThriftStructIface]]
290 -
    ]
291 -
  ): ClientCustomStrategy = new ClientCustomStrategy(
292 -
    toPartitionedMap.andThen(_.map(_.asScala.toMap.map { case (k, v) => (k.toInt, v) })))
298 +
  def noResharding(
299 +
    getPartitionIdAndRequest: ClientCustomStrategy.ToPartitionedMap
300 +
  ): CustomPartitioningStrategy =
301 +
    noResharding(getPartitionIdAndRequest, identity[Int])
293 302
294 303
  /**
295 -
   * The java-friendly way to create a [[ClientCustomStrategy]].
296 -
   * Scala users should construct a [[ClientCustomStrategy]] directly.
304 +
   * Constructs a [[ClientCustomStrategy]] that does not reshard.
297 305
   *
298 -
   * @note [[com.twitter.util.Function]] may be useful in helping create a [[scala.PartialFunction]].
306 +
   * This is appropriate for simple custom strategies where you never need to
307 +
   * change which shard a given request would go to, and you neither add nor
308 +
   * remove shards.
309 +
   *
310 +
   * Java users should see [[ClientCustomStrategies$]] for an easier to use API.
311 +
   *
312 +
   * @param getPartitionIdAndRequest A PartialFunction implemented by client that
313 +
   *        provides the partitioning logic on a request. It takes a Thrift object
314 +
   *        request, and returns Future Map of partition ids to sub-requests. If
315 +
   *        we don't need to fan-out, it should return one element: partition id
316 +
   *        to the original request.  This PartialFunction can take multiple
317 +
   *        Thrift request types of one Thrift service (different method endpoints
318 +
   *        of one service).
319 +
   * @param getLogicalPartitionId Gets the logical partition identifier from a host
320 +
   *        identifier, host identifiers are derived from [[ZkMetadata]]
321 +
   *        shardId. Indicates which logical partition a physical host belongs to,
322 +
   *        multiple hosts can belong to the same partition, for example:
323 +
   *        {{{
324 +
   *          {
325 +
   *            case a if Range(0, 10).contains(a) => 0
326 +
   *            case b if Range(10, 20).contains(b) => 1
327 +
   *            case c if Range(20, 30).contains(c) => 2
328 +
   *            case _ => throw ...
329 +
   *          }
330 +
   *        }}}
299 331
   */
300 -
  def create(
301 -
    toPartitionedMap: PartialFunction[
302 -
      ThriftStructIface,
303 -
      Future[JMap[JInteger, ThriftStructIface]]
304 -
    ],
305 -
    logicalPartitionFn: IntUnaryOperator
306 -
  ): ClientCustomStrategy = new ClientCustomStrategy(
307 -
    toPartitionedMap.andThen(_.map(_.asScala.toMap.map { case (k, v) => (k.toInt, v) })),
308 -
    logicalPartitionFn.applyAsInt _)
332 +
  def noResharding(
333 +
    getPartitionIdAndRequest: ClientCustomStrategy.ToPartitionedMap,
334 +
    getLogicalPartitionId: Int => Int
335 +
  ): CustomPartitioningStrategy =
336 +
    new ClientCustomStrategy[Unit](
337 +
      _ => getPartitionIdAndRequest,
338 +
      _ => getLogicalPartitionId,
339 +
      Activity.value(()))
340 +
341 +
  /**
342 +
   * Constructs a [[ClientCustomStrategy]] that reshards based on the remote cluster state.
343 +
   *
344 +
   * This is appropriate for simple custom strategies where you only need to
345 +
   * know information about the remote cluster in order to reshard. For example,
346 +
   * if you want to be able to add or remove capacity safely.
347 +
   *
348 +
   * Java users should see [[ClientCustomStrategies$]] for an easier to use API.
349 +
   *
350 +
   * @param getPartitionIdAndRequestFn A function that given the current state of the
351 +
   *        remote cluster, returns a function that gets the logical partition
352 +
   *        identifier from a host identifier, host identifiers are derived from
353 +
   *        [[ZkMetadata]] shardId. Indicates which logical partition a physical
354 +
   *        host belongs to, multiple hosts can belong to the same partition,
355 +
   *        for example:
356 +
   *        {{{
357 +
   *          {
358 +
   *            case a if Range(0, 10).contains(a) => 0
359 +
   *            case b if Range(10, 20).contains(b) => 1
360 +
   *            case c if Range(20, 30).contains(c) => 2
361 +
   *            case _ => throw ...
362 +
   *          }
363 +
   *        }}}
364 +
   *        Note that this function must be pure (ie referentially transparent).
365 +
   *        It cannot change based on anything other than the state of the
366 +
   *        remote cluster it is provided with, or else it will malfunction.
367 +
   */
368 +
  def clusterResharding(
369 +
    getPartitionIdAndRequestFn: Set[Address] => ClientCustomStrategy.ToPartitionedMap
370 +
  ): CustomPartitioningStrategy =
371 +
    clusterResharding(getPartitionIdAndRequestFn, _ => identity[Int])
372 +
373 +
  /**
374 +
   * Constructs a [[ClientCustomStrategy]] that reshards based on the remote cluster state.
375 +
   *
376 +
   * This is appropriate for simple custom strategies where you only need to
377 +
   * know information about the remote cluster in order to reshard. For example,
378 +
   * if you want to be able to add or remove capacity safely.
379 +
   *
380 +
   * Java users should see [[ClientCustomStrategies$]] for an easier to use API.
381 +
   *
382 +
   * @param getPartitionIdAndRequestFn A function that given the current state of
383 +
   *        the remote cluster, returns a PartialFunction implemented by client
384 +
   *        that provides the partitioning logic on a request. It takes a Thrift
385 +
   *        object request, and returns Future Map of partition ids to
386 +
   *        sub-requests. If we don't need to fan-out, it should return one
387 +
   *        element: partition id to the original request.  This PartialFunction
388 +
   *        can take multiple Thrift request types of one Thrift service
389 +
   *        (different method endpoints of one service).  Note that this function
390 +
   *        must be pure (ie referentially transparent).  It cannot change
391 +
   *        based on anything other than the state of the remote cluster it is
392 +
   *        provided with, or else it will malfunction.
393 +
   * @param getLogicalPartitionIdFn A function that given the current state of the
394 +
   *        remote cluster, returns a function that gets the logical partition
395 +
   *        identifier from a host identifier, host identifiers are derived from
396 +
   *        [[ZkMetadata]] shardId. Indicates which logical partition a physical
397 +
   *        host belongs to, multiple hosts can belong to the same partition,
398 +
   *        for example:
399 +
   *        {{{
400 +
   *          {
401 +
   *            case a if Range(0, 10).contains(a) => 0
402 +
   *            case b if Range(10, 20).contains(b) => 1
403 +
   *            case c if Range(20, 30).contains(c) => 2
404 +
   *            case _ => throw ...
405 +
   *          }
406 +
   *        }}}
407 +
   *        Note that this function must be pure (ie referentially transparent).
408 +
   *        It cannot change based on anything other than the state of the
409 +
   *        remote cluster it is provided with, or else it will malfunction.
410 +
   */
411 +
  def clusterResharding(
412 +
    getPartitionIdAndRequestFn: Set[Address] => ClientCustomStrategy.ToPartitionedMap,
413 +
    getLogicalPartitionIdFn: Set[Address] => Int => Int
414 +
  ): CustomPartitioningStrategy =
415 +
    new ClientClusterStrategy(getPartitionIdAndRequestFn, getLogicalPartitionIdFn)
416 +
417 +
  /**
418 +
   * Constructs a [[ClientCustomStrategy]] that reshards based on the remote cluster state.
419 +
   *
420 +
   * This is appropriate for simple custom strategies where you only need to
421 +
   * know information about the remote cluster in order to reshard. For example,
422 +
   * if you want to be able to add or remove capacity safely.
423 +
   *
424 +
   * Java users should see [[ClientCustomStrategies$]] for an easier to use API.
425 +
   *
426 +
   * @param getPartitionIdAndRequestFn A function that given the current state of
427 +
   *        `observable`, returns a PartialFunction implemented by client that
428 +
   *        provides the partitioning logic on a request. It takes a Thrift
429 +
   *        object request, and returns Future Map of partition ids to
430 +
   *        sub-requests. If we don't need to fan-out, it should return one
431 +
   *        element: partition id to the original request.  This PartialFunction
432 +
   *        can take multiple Thrift request types of one Thrift service
433 +
   *        (different method endpoints of one service).  Note that this
434 +
   *        function must be pure (ie referentially transparent).  It cannot
435 +
   *        change based on anything other than the state of `observable`, or
436 +
   *        else it will malfunction.
437 +
   * @param observable The state that is used for deciding how to reshard the
438 +
   *        cluster.
439 +
   */
440 +
  def resharding[A](
441 +
    getPartitionIdAndRequestFn: A => ClientCustomStrategy.ToPartitionedMap,
442 +
    observable: Activity[A]
443 +
  ): CustomPartitioningStrategy =
444 +
    resharding[A](getPartitionIdAndRequestFn, (_: A) => (identity[Int](_)), observable)
445 +
446 +
  /**
447 +
   * Constructs a [[ClientCustomStrategy]] that reshards based on the remote cluster state.
448 +
   *
449 +
   * This is appropriate for simple custom strategies where you only need to
450 +
   * know information about the remote cluster in order to reshard. For example,
451 +
   * if you want to be able to add or remove capacity safely.
452 +
   *
453 +
   * Java users should see [[ClientCustomStrategies$]] for an easier to use API.
454 +
   *
455 +
   * @param getPartitionIdAndRequestFn A function that given the current state of
456 +
   *        `observable`, returns a PartialFunction implemented by client that
457 +
   *        provides the partitioning logic on a request. It takes a Thrift
458 +
   *        object request, and returns Future Map of partition ids to
459 +
   *        sub-requests. If we don't need to fan-out, it should return one
460 +
   *        element: partition id to the original request.  This PartialFunction
461 +
   *        can take multiple Thrift request types of one Thrift service
462 +
   *        (different method endpoints of one service).  Note that this
463 +
   *        function must be pure (ie referentially transparent).  It cannot
464 +
   *        change based on anything other than the state of `observable`, or
465 +
   *        else it will malfunction.
466 +
   * @param getLogicalPartitionIdFn A function that given the current state
467 +
   *        `observable`, returns a function that gets the logical partition
468 +
   *        identifier from a host identifier, host identifiers are derived from
469 +
   *        [[ZkMetadata]] shardId. Indicates which logical partition a physical
470 +
   *        host belongs to, multiple hosts can belong to the same partition,
471 +
   *        for example:
472 +
   *        {{{
473 +
   *          {
474 +
   *            case a if Range(0, 10).contains(a) => 0
475 +
   *            case b if Range(10, 20).contains(b) => 1
476 +
   *            case c if Range(20, 30).contains(c) => 2
477 +
   *            case _ => throw ...
478 +
   *          }
479 +
   *        }}}
480 +
   *        Note that this function must be pure (ie referentially transparent).
481 +
   *        It cannot change based on anything other than the state of
482 +
   *        `observable`, or else it will malfunction.
483 +
   * @param observable The state that is used for deciding how to reshard the
484 +
   *        cluster.
485 +
   */
486 +
  def resharding[A](
487 +
    getPartitionIdAndRequestFn: A => ClientCustomStrategy.ToPartitionedMap,
488 +
    getLogicalPartitionIdFn: A => Int => Int,
489 +
    observable: Activity[A]
490 +
  ): CustomPartitioningStrategy =
491 +
    new ClientCustomStrategy(getPartitionIdAndRequestFn, getLogicalPartitionIdFn, observable)
309 492
310 493
  /**
311 494
   * Thrift requests not specifying partition ids will fall in here. This allows a
@@ -323,6 +506,129 @@
Loading
323 506
  }
324 507
}
325 508
509 +
/**
510 +
 * The java-friendly way to create a [[ClientCustomStrategy]].
511 +
 * Scala users should instead use the parallel methods on [[ClientCustomStrategy$]].
512 +
 *
513 +
 * @note [[com.twitter.util.Function]] may be useful in helping create a [[scala.PartialFunction]].
514 +
 */
515 +
object ClientCustomStrategies {
516 +
517 +
  type ToPartitionedMap = PartialFunction[
518 +
    ThriftStructIface,
519 +
    Future[JMap[JInteger, ThriftStructIface]]
520 +
  ]
521 +
522 +
  private[this] def toJavaSet[A]: PartialFunction[Set[A], JSet[A]] = { case set => set.asJava }
523 +
  private[this] val toScalaFutureMap: Future[JMap[JInteger, ThriftStructIface]] => Future[
524 +
    Map[Int, ThriftStructIface]
525 +
  ] = (_.map(_.asScala.toMap.map {
526 +
    case (k, v) => (k.toInt, v)
527 +
  }))
528 +
529 +
  /**
530 +
   * The java-friendly way to create a [[ClientCustomStrategy]].
531 +
   * Scala users should instead use the parallel methods on [[ClientCustomStrategy$]].
532 +
   *
533 +
   * @note [[com.twitter.util.Function]] may be useful in helping create a [[scala.PartialFunction]].
534 +
   */
535 +
  def noResharding(toPartitionedMap: ToPartitionedMap): CustomPartitioningStrategy =
536 +
    ClientCustomStrategy.noResharding(toPartitionedMap.andThen(toScalaFutureMap))
537 +
538 +
  /**
539 +
   * The java-friendly way to create a [[ClientCustomStrategy]].
540 +
   * Scala users should instead use the parallel methods on [[ClientCustomStrategy$]].
541 +
   *
542 +
   * @note [[com.twitter.util.Function]] may be useful in helping create a [[scala.PartialFunction]].
543 +
   */
544 +
  def noResharding(
545 +
    toPartitionedMap: ToPartitionedMap,
546 +
    getLogicalPartitionId: IntUnaryOperator
547 +
  ): CustomPartitioningStrategy = ClientCustomStrategy.noResharding(
548 +
    toPartitionedMap.andThen(toScalaFutureMap),
549 +
    getLogicalPartitionId.applyAsInt _)
550 +
551 +
  /**
552 +
   * The java-friendly way to create a [[ClientCustomStrategy]].
553 +
   * Scala users should instead use the parallel methods on [[ClientCustomStrategy$]].
554 +
   *
555 +
   * @note [[com.twitter.util.Function]] may be useful in helping create a [[scala.PartialFunction]].
556 +
   */
557 +
  def clusterResharding(
558 +
    getPartitionIdAndRequestFn: JFunction[JSet[Address], ToPartitionedMap]
559 +
  ): CustomPartitioningStrategy =
560 +
    ClientCustomStrategy.clusterResharding(
561 +
      toJavaSet[Address]
562 +
        .andThen(getPartitionIdAndRequestFn.apply(_).andThen(toScalaFutureMap)))
563 +
564 +
  /**
565 +
   * The java-friendly way to create a [[ClientCustomStrategy]].
566 +
   * Scala users should instead use the parallel methods on [[ClientCustomStrategy$]].
567 +
   *
568 +
   * @note [[com.twitter.util.Function]] may be useful in helping create a [[scala.PartialFunction]].
569 +
   */
570 +
  def clusterResharding(
571 +
    getPartitionIdAndRequestFn: JFunction[JSet[Address], ToPartitionedMap],
572 +
    getLogicalPartitionIdFn: JFunction[JSet[Address], IntUnaryOperator]
573 +
  ): CustomPartitioningStrategy =
574 +
    ClientCustomStrategy.clusterResharding(
575 +
      toJavaSet[Address]
576 +
        .andThen(getPartitionIdAndRequestFn.apply(_).andThen(toScalaFutureMap)),
577 +
      toJavaSet[Address].andThen(getLogicalPartitionIdFn.apply _).andThen(op => op.applyAsInt _)
578 +
    )
579 +
580 +
  /**
581 +
   * The java-friendly way to create a [[ClientCustomStrategy]].
582 +
   * Scala users should instead use the parallel methods on [[ClientCustomStrategy$]].
583 +
   *
584 +
   * @note [[com.twitter.util.Function]] may be useful in helping create a [[scala.PartialFunction]].
585 +
   */
586 +
  def resharding[A](
587 +
    getPartitionIdAndRequestFn: JFunction[A, ToPartitionedMap],
588 +
    observable: Activity[A]
589 +
  ): CustomPartitioningStrategy =
590 +
    ClientCustomStrategy
591 +
      .resharding[A](
592 +
        { a: A =>
593 +
          getPartitionIdAndRequestFn.apply(a).andThen(toScalaFutureMap)
594 +
        },
595 +
        observable)
596 +
597 +
  /**
598 +
   * The java-friendly way to create a [[ClientCustomStrategy]].
599 +
   * Scala users should instead use the parallel methods on [[ClientCustomStrategy$]].
600 +
   *
601 +
   * @note [[com.twitter.util.Function]] may be useful in helping create a [[scala.PartialFunction]].
602 +
   */
603 +
  def resharding[A](
604 +
    getPartitionIdAndRequestFn: JFunction[A, ToPartitionedMap],
605 +
    getLogicalPartitionIdFn: JFunction[A, IntUnaryOperator],
606 +
    observable: Activity[A]
607 +
  ): CustomPartitioningStrategy =
608 +
    ClientCustomStrategy
609 +
      .resharding[A](
610 +
        { a: A =>
611 +
          getPartitionIdAndRequestFn.apply(a).andThen(toScalaFutureMap)
612 +
        },
613 +
        { a: A => getLogicalPartitionIdFn.apply(a).applyAsInt _ },
614 +
        observable)
615 +
}
616 +
617 +
private[partitioning] final class ClientClusterStrategy(
618 +
  val getPartitionIdAndRequestFn: Set[Address] => ClientCustomStrategy.ToPartitionedMap,
619 +
  val getLogicalPartitionIdFn: Set[Address] => Int => Int)
620 +
    extends CustomPartitioningStrategy {
621 +
622 +
  // we don't have a real implementation here because the understanding
623 +
  // is that implementors will not use ClientClusterStrategy directly, but
624 +
  // instead use it to derive a ClientCustomStrategy where the implementors
625 +
  // provides access to the remote cluster's state.
626 +
  override def newNodeManager[Req, Rep](
627 +
    underlying: Stack[ServiceFactory[Req, Rep]],
628 +
    params: Stack.Params
629 +
  ): PartitionNodeManager[Req, Rep, _, ToPartitionedMap] = ???
630 +
}
631 +
326 632
/**
327 633
 * An API to set a custom partitioning strategy for a Thrift/ThriftMux Client.
328 634
 * For a Java-friendly way to do the same thing, see `ClientCustomStrategy.create`
@@ -334,7 +640,7 @@
Loading
334 640
 *        to the original request.  This PartialFunction can take multiple
335 641
 *        Thrift request types of one Thrift service (different method endpoints
336 642
 *        of one service).
337 -
 * @param logicalPartitionFn Gets the logical partition identifier from a host
643 +
 * @param getLogicalPartitionId Gets the logical partition identifier from a host
338 644
 *        identifier, host identifiers are derived from [[ZkMetadata]]
339 645
 *        shardId. Indicates which logical partition a physical host belongs to,
340 646
 *        multiple hosts can belong to the same partition, for example:
@@ -351,22 +657,24 @@
Loading
351 657
 * @note  When updating the partition topology dynamically, there is a potential one-time
352 658
 *        mismatch if a Service Discovery update happens after getPartitionIdAndRequest.
353 659
 */
354 -
final class ClientCustomStrategy(
355 -
  val getPartitionIdAndRequest: ClientCustomStrategy.ToPartitionedMap,
356 -
  logicalPartitionFn: Int => Int)
357 -
    extends CustomPartitioningStrategy {
358 660
359 -
  def this(getPartitionIdAndRequest: ClientCustomStrategy.ToPartitionedMap) =
360 -
    this(getPartitionIdAndRequest, identity[Int])
661 +
final class ClientCustomStrategy[A] private[partitioning] (
662 +
  val getPartitionIdAndRequest: A => ClientCustomStrategy.ToPartitionedMap,
663 +
  val getLogicalPartitionId: A => Int => Int,
664 +
  val state: Activity[A])
665 +
    extends CustomPartitioningStrategy {
361 666
362 -
  def getLogicalPartition(instance: Int): Int = logicalPartitionFn(instance)
667 +
  def newNodeManager[Req, Rep](
668 +
    underlying: Stack[ServiceFactory[Req, Rep]],
669 +
    params: Stack.Params
670 +
  ): PartitionNodeManager[Req, Rep, _, ClientCustomStrategy.ToPartitionedMap] =
671 +
    new PartitionNodeManager(
672 +
      underlying,
673 +
      state,
674 +
      getPartitionIdAndRequest,
675 +
      getLogicalPartitionId,
676 +
      params)
363 677
364 -
  /**
365 -
   * A ResponseMergerRegistry implemented by client to supply [[ResponseMerger]]s
366 -
   * for message fan-out cases.
367 -
   * @see [[ResponseMerger]]
368 -
   */
369 -
  val responseMergerRegistry: ResponseMergerRegistry = new ResponseMergerRegistry()
370 678
}
371 679
372 680
object MethodBuilderCustomStrategy {
@@ -381,7 +689,7 @@
Loading
381 689
 * @param getPartitionIdAndRequest A function for the partitioning logic.
382 690
 *        MethodBuilder is customized per-method so that this method only takes one
383 691
 *        Thrift request type.
384 -
 * @param logicalPartitionFn Gets the logical partition identifier from a host
692 +
 * @param getLogicalPartitionId Gets the logical partition identifier from a host
385 693
 *        identifier, host identifiers are derived from [[ZkMetadata]]
386 694
 *        shardId. Indicates which logical partition a physical host belongs to,
387 695
 *        multiple hosts can belong to the same partition, for example:
@@ -399,16 +707,16 @@
Loading
399 707
 */
400 708
final class MethodBuilderCustomStrategy[Req <: ThriftStructIface, Rep](
401 709
  val getPartitionIdAndRequest: MethodBuilderCustomStrategy.ToPartitionedMap[Req],
402 -
  logicalPartitionFn: Int => Int,
710 +
  getLogicalPartitionId: Int => Int,
403 711
  val responseMerger: Option[ResponseMerger[Rep]])
404 712
    extends CustomPartitioningStrategy {
405 713
406 714
  def this(
407 715
    getPartitionIdAndRequest: MethodBuilderCustomStrategy.ToPartitionedMap[Req],
408 -
    logicalPartitionFn: Int => Int
716 +
    getLogicalPartitionId: Int => Int
409 717
  ) = this(
410 718
    getPartitionIdAndRequest,
411 -
    logicalPartitionFn,
719 +
    getLogicalPartitionId,
412 720
    None
413 721
  )
414 722
@@ -424,5 +732,22 @@
Loading
424 732
    responseMerger
425 733
  )
426 734
427 -
  def getLogicalPartition(instance: Int): Int = logicalPartitionFn(instance)
735 +
  // TODO reconcile these types w/ req / rep
736 +
  def newNodeManager[U, T](
737 +
    underlying: Stack[ServiceFactory[U, T]],
738 +
    params: Stack.Params
739 +
  ): PartitionNodeManager[U, T, _, ClientCustomStrategy.ToPartitionedMap] =
740 +
    new PartitionNodeManager[U, T, Unit, ClientCustomStrategy.ToPartitionedMap](
741 +
      underlying,
742 +
      Activity.value(()),
743 +
      _ => {
744 +
        case req: ThriftStructIface =>
745 +
          getPartitionIdAndRequest.asInstanceOf[
746 +
            ThriftStructIface => Future[Map[Int, ThriftStructIface]]
747 +
          ](req)
748 +
      },
749 +
      _ => getLogicalPartitionId,
750 +
      params
751 +
    )
752 +
428 753
}

@@ -1,14 +1,16 @@
Loading
1 1
package com.twitter.finagle.thrift.exp.partitioning
2 2
3 -
import com.twitter.finagle.partitioning.{PartitionNodeManager, PartitioningService}
3 +
import com.twitter.finagle.loadbalancer.{LoadBalancerFactory, TrafficDistributor}
4 +
import com.twitter.finagle.param.Label
5 +
import com.twitter.finagle.partitioning.PartitioningService
4 6
import com.twitter.finagle.thrift.ClientDeserializeCtx
5 7
import com.twitter.finagle.thrift.exp.partitioning.ThriftPartitioningService.{
6 8
  PartitioningStrategyException,
7 9
  ReqRepMarshallable
8 10
}
9 -
import com.twitter.finagle.{Service, ServiceFactory, Stack}
11 +
import com.twitter.finagle.{Address, Service, ServiceFactory, Stack}
10 12
import com.twitter.scrooge.ThriftStructIface
11 -
import com.twitter.util.Future
13 +
import com.twitter.util.{Future, Time}
12 14
import scala.util.control.NonFatal
13 15
14 16
/**
@@ -21,11 +23,22 @@
Loading
21 23
  underlying: Stack[ServiceFactory[Req, Rep]],
22 24
  thriftMarshallable: ReqRepMarshallable[Req, Rep],
23 25
  params: Stack.Params,
24 -
  customStrategy: CustomPartitioningStrategy)
26 +
  configuredStrategy: CustomPartitioningStrategy)
25 27
    extends PartitioningService[Req, Rep] {
26 28
27 -
  private[this] val nodeManager =
28 -
    new PartitionNodeManager(underlying, customStrategy.getLogicalPartition, params)
29 +
  private[this] val customStrategy = configuredStrategy match {
30 +
    case strat: ClientClusterStrategy =>
31 +
      new ClientCustomStrategy[Set[Address]](
32 +
        strat.getPartitionIdAndRequestFn,
33 +
        strat.getLogicalPartitionIdFn,
34 +
        TrafficDistributor
35 +
          .varAddrToActivity(params[LoadBalancerFactory.Dest].va, params[Label].label))
36 +
    case _ => configuredStrategy
37 +
  }
38 +
39 +
  private[this] val nodeManager = customStrategy.newNodeManager(underlying, params)
40 +
41 +
  private[this] val serializer = new ThriftRequestSerializer(params)
29 42
30 43
  private[this] def rpcName: String = ClientDeserializeCtx.get.rpcName.getOrElse("N/A")
31 44
@@ -36,17 +49,20 @@
Loading
36 49
  }
37 50
38 51
  // for fan-out requests
39 -
  final protected def partitionRequest(
52 +
  final protected[finagle] def partitionRequest(
40 53
    original: Req
41 54
  ): Future[Map[Req, Future[Service[Req, Rep]]]] = {
42 -
    val serializer = new ThriftRequestSerializer(params)
43 -
    val partitionIdAndRequest = getPartitionIdAndRequestMap(original)
55 +
    val snapPartitioner = nodeManager.snapshotSharder()
56 +
57 +
    val partitionIdAndRequest = getPartitionIdAndRequestMap(snapPartitioner.partitionFunction)
58 +
44 59
    partitionIdAndRequest.flatMap { idsAndRequests =>
45 60
      if (idsAndRequests.isEmpty) {
46 61
        noPartitionInformationHandler(original)
47 62
      } else if (idsAndRequests.size == 1) {
48 63
        // optimization: won't serialize request if it is a singleton partition
49 -
        Future.value(Map(original -> partitionServiceForPartitionId(idsAndRequests.head._1)))
64 +
        Future.value(
65 +
          Map(original -> snapPartitioner.getServiceByPartitionId(idsAndRequests.head._1)))
50 66
      } else {
51 67
        Future.value(idsAndRequests.map {
52 68
          case (id, request) =>
@@ -56,10 +72,7 @@
Loading
56 72
            val partitionedReq =
57 73
              thriftMarshallable.framePartitionedRequest(thriftClientRequest, original)
58 74
59 -
            // we assume NodeManager updates always happen before getPartitionIdAndRequestMap
60 -
            // updates. When updating the partitioning topology, it should do proper locking
61 -
            // before returning a lookup map.
62 -
            (partitionedReq, partitionServiceForPartitionId(id))
75 +
            (partitionedReq, snapPartitioner.getServiceByPartitionId(id))
63 76
        })
64 77
      }
65 78
    }
@@ -70,13 +83,18 @@
Loading
70 83
    results: PartitioningService.PartitionedResults[Req, Rep]
71 84
  ): Rep = {
72 85
    val mergerOption = customStrategy match {
73 -
      case clientCustomStrategy: ClientCustomStrategy =>
86 +
      case clientCustomStrategy: ClientCustomStrategy[_] =>
74 87
        clientCustomStrategy.responseMergerRegistry.get(rpcName)
75 88
      case mbCustomStrategy: MethodBuilderCustomStrategy[_, _] =>
76 89
        mbCustomStrategy
77 90
        //upcasting, MethodBuilderCustomStrategy[Req <: ThriftStructIface, _]
78 91
          .asInstanceOf[MethodBuilderCustomStrategy[_, Any]]
79 92
          .responseMerger
93 +
      case clusterStrategy: ClientClusterStrategy =>
94 +
        throw new IllegalStateException(
95 +
          s"found a ClientClusterStrategy $clusterStrategy after it" +
96 +
            " should have been converted to a ClientCustomStrategy.  This state should be " +
97 +
            "impossible to reach. It indicates a serious bug.")
80 98
    }
81 99
82 100
    val responseMerger = mergerOption match {
@@ -100,19 +118,25 @@
Loading
100 118
  }
101 119
102 120
  // note: this function should be only evaluate once per-request
103 -
  private[this] def getPartitionIdAndRequestMap(req: Req): Future[Map[Int, ThriftStructIface]] = {
121 +
  private[partitioning] def getPartitionIdAndRequestMap(
122 +
    pf: ClientCustomStrategy.ToPartitionedMap
123 +
  ): Future[Map[Int, ThriftStructIface]] = {
104 124
    val inputArg = ClientDeserializeCtx.get.request.asInstanceOf[ThriftStructIface]
105 125
    try {
106 126
      val getPartitionIdAndRequest = { ts: ThriftStructIface =>
107 127
        customStrategy match {
108 -
          case clientCustomStrategy: ClientCustomStrategy =>
109 -
            clientCustomStrategy.getPartitionIdAndRequest
110 -
              .applyOrElse(ts, ClientCustomStrategy.defaultPartitionIdAndRequest)
128 +
          case _: ClientCustomStrategy[_] =>
129 +
            pf.applyOrElse(ts, ClientCustomStrategy.defaultPartitionIdAndRequest)
111 130
          case mbCustomStrategy: MethodBuilderCustomStrategy[_, _] =>
112 131
            mbCustomStrategy
113 132
            //upcasting, MethodBuilderCustomStrategy[Req <: ThriftStructIface, _]
114 133
              .asInstanceOf[MethodBuilderCustomStrategy[ThriftStructIface, _]]
115 134
              .getPartitionIdAndRequest(ts)
135 +
          case clusterStrategy: ClientClusterStrategy =>
136 +
            throw new IllegalStateException(
137 +
              s"found a ClientClusterStrategy $clusterStrategy after it should have been " +
138 +
                "converted to a ClientCustomStrategy.  This state should be impossible" +
139 +
                " to reach. It indicates a serious bug.")
116 140
        }
117 141
      }
118 142
      // ClientCustomStrategy.defaultPartitionIdAndRequest throws a Future.exception
@@ -131,7 +155,6 @@
Loading
131 155
    }
132 156
  }
133 157
134 -
  private[this] def partitionServiceForPartitionId(partitionId: Int): Future[Service[Req, Rep]] = {
135 -
    nodeManager.getServiceByPartitionId(partitionId)
136 -
  }
158 +
  override def close(deadline: Time): Future[Unit] =
159 +
    Future.join(Seq(nodeManager.close(deadline), super.close(deadline)))
137 160
}

@@ -77,12 +77,12 @@
Loading
77 77
   */
78 78
  private[finagle] def safelyScanLeft[T, U](
79 79
    init: U,
80 -
    stream: Event[Activity.State[T]]
80 +
    stream: Activity[T]
81 81
  )(
82 82
    f: (U, T) => U
83 -
  ): Event[Activity.State[U]] = {
83 +
  ): Activity[U] = {
84 84
    val initState: Activity.State[U] = Activity.Ok(init)
85 -
    stream.foldLeft(initState) {
85 +
    Activity(stream.states.foldLeft(initState) {
86 86
      case (Activity.Pending, Activity.Ok(update)) => Activity.Ok(f(init, update))
87 87
      case (Activity.Failed(_), Activity.Ok(update)) => Activity.Ok(f(init, update))
88 88
      case (Activity.Ok(state), Activity.Ok(update)) => Activity.Ok(f(state, update))
@@ -90,7 +90,7 @@
Loading
90 90
      case (stale @ Activity.Ok(state), Activity.Pending) if init != state => stale
91 91
      case (_, failed @ Activity.Failed(_)) => failed
92 92
      case (_, Activity.Pending) => Activity.Pending
93 -
    }
93 +
    })
94 94
  }
95 95
96 96
  /**
@@ -252,8 +252,8 @@
Loading
252 252
   * the [[Address]] does not have a weight, a default weight of 1.0 is used.
253 253
   */
254 254
  private[this] def weightEndpoints(
255 -
    addrs: Event[Activity.State[Set[Address]]]
256 -
  ): Event[Activity.State[Set[WeightedFactory[Req, Rep]]]] = {
255 +
    addrs: Activity[Set[Address]]
256 +
  ): Activity[Set[WeightedFactory[Req, Rep]]] = {
257 257
    val init = Map.empty[Address, WeightedFactory[Req, Rep]]
258 258
    safelyScanLeft(init, addrs) {
259 259
      case (active, addrs) =>
@@ -291,11 +291,7 @@
Loading
291 291
              case _ => cache
292 292
            }
293 293
        }
294 -
    }.map {
295 -
      case Activity.Ok(cache) => Activity.Ok(cache.values.toSet)
296 -
      case Activity.Pending => Activity.Pending
297 -
      case failed @ Activity.Failed(_) => failed
298 -
    }
294 +
    }.map(_.values.toSet)
299 295
  }
300 296
301 297
  /**
@@ -303,8 +299,8 @@
Loading
303 299
   * Because balancer instances are stateful, they need to be cached across updates.
304 300
   */
305 301
  private[this] def partition(
306 -
    endpoints: Event[Activity.State[Set[WeightedFactory[Req, Rep]]]]
307 -
  ): Event[Activity.State[Iterable[WeightClass[Req, Rep]]]] = {
302 +
    endpoints: Activity[Set[WeightedFactory[Req, Rep]]]
303 +
  ): Activity[Iterable[WeightClass[Req, Rep]]] = {
308 304
    // Cache entries are balancer instances together with their backing collection
309 305
    // which is updatable. The entries are keyed by weight class.
310 306
    val init = Map.empty[Double, CachedBalancer[Req, Rep]]
@@ -355,18 +351,15 @@
Loading
355 351
        val bal = newBalancer(Activity(endpoints))
356 352
        Map(1.0 -> CachedBalancer(bal, endpoints, 0))
357 353
      }
358 -
    }.map {
359 -
      case Activity.Ok(cache) =>
360 -
        Activity.Ok(cache.map {
361 -
          case (weight, CachedBalancer(bal, endpoints, size)) =>
362 -
            WeightClass(bal, endpoints, weight, size)
363 -
        })
364 -
      case Activity.Pending => Activity.Pending
365 -
      case failed @ Activity.Failed(_) => failed
354 +
    }.map { cache =>
355 +
      cache.map {
356 +
        case (weight, CachedBalancer(bal, endpoints, size)) =>
357 +
          WeightClass(bal, endpoints, weight, size)
358 +
      }
366 359
    }
367 360
  }
368 361
369 -
  private[this] val weightClasses = partition(weightEndpoints(dest.states))
362 +
  private[this] val weightClasses = partition(weightEndpoints(dest))
370 363
  private[this] val pending = new Promise[ServiceFactory[Req, Rep]]
371 364
  private[this] val init: ServiceFactory[Req, Rep] = new DelayedFactory(pending)
372 365
@@ -394,7 +387,7 @@
Loading
394 387
  // Translate the stream of weightClasses into a stream of underlying
395 388
  // ServiceFactories that can service requests.
396 389
  private[this] val underlying: Event[ServiceFactory[Req, Rep]] =
397 -
    weightClasses.foldLeft(init) {
390 +
    weightClasses.states.foldLeft(init) {
398 391
      case (_, Activity.Ok(wcs)) =>
399 392
        val dist = new Distributor(wcs, busyWeightClasses, rng)
400 393
        updateGauges(wcs)

@@ -0,0 +1,42 @@
Loading
1 +
package com.twitter.finagle.partitioning
2 +
3 +
import com.twitter.util.Future
4 +
import com.twitter.finagle.{Service, ServiceFactory}
5 +
6 +
/**
7 +
 * Represents the snapshot of the information needed to determine how to
8 +
 * carve up the request, and where to send those carved-up slices to.
9 +
 */
10 +
case class SnapPartitioner[Req, Rep, B >: PartialFunction[Any, Future[Nothing]]](
11 +
  partitionFunction: B,
12 +
  private[partitioning] val partitionMapping: Map[Int, ServiceFactory[Req, Rep]]) {
13 +
14 +
  /**
15 +
   * When given a partitionId, asynchronously returns a [[Service]] that sends
16 +
   * requests to a shard within that partition.
17 +
   *
18 +
   * @note we assume that the factory has implemented FactoryToService under the covers,
19 +
   * so the returned service does not need to be closed by the caller.
20 +
   */
21 +
  def getServiceByPartitionId(partitionId: Int): Future[Service[Req, Rep]] =
22 +
    partitionMapping.get(partitionId) match {
23 +
      case Some(factory) => factory()
24 +
      case None =>
25 +
        Future.exception(
26 +
          new PartitionNodeManager.NoPartitionException(
27 +
            s"No partition: $partitionId found in the node manager"))
28 +
    }
29 +
}
30 +
31 +
object SnapPartitioner {
32 +
33 +
  /**
34 +
   * Provides a sensible default if you don't have enough information to decide
35 +
   * how to partition.
36 +
   */
37 +
  private[partitioning] def uninitialized[
38 +
    Req,
39 +
    Rep,
40 +
    B >: PartialFunction[Any, Future[Nothing]]
41 +
  ]: SnapPartitioner[Req, Rep, B] = SnapPartitioner(PartialFunction.empty, Map.empty)
42 +
}
Files Coverage
finagle-base-http/src/main/scala/com/twitter/finagle/http 85.24%
finagle-core/src/main/scala/com/twitter/finagle 85.72%
finagle-exception/src/main/scala/com/twitter/finagle/exception 79.66%
finagle-exp/src/main/scala/com/twitter/finagle/exp 90.00%
finagle-http/src/main/scala/com/twitter/finagle 81.89%
finagle-http2/src/main/scala/com/twitter/finagle/http2 77.64%
finagle-memcached/src/main/scala/com/twitter/finagle 69.63%
finagle-mux/src/main/scala/com/twitter/finagle 76.28%
finagle-mysql/src/main/scala/com/twitter/finagle 66.94%
finagle-netty4-http/src/main/scala/com/twitter/finagle/netty4/http 95.97%
finagle-netty4/src/main/scala/com/twitter/finagle/netty4 87.73%
...le-opencensus-tracing/src/main/scala/com/twitter/finagle/tracing/opencensus 89.89%
finagle-partitioning/src/main/scala/com/twitter/finagle/partitioning 88.11%
finagle-redis/src/main/scala/com/twitter/finagle 36.21%
finagle-scribe/src/main/scala/com/twitter/finagle/scribe 75.93%
finagle-serversets/src/main/scala/com/twitter 79.27%
finagle-stats-core/src/main/scala/com/twitter/finagle/stats 78.01%
finagle-thrift/src/main/scala/com/twitter/finagle 75.22%
finagle-thriftmux/src/main/scala/com/twitter/finagle 87.67%
finagle-toggle/src/main/scala/com/twitter/finagle 86.18%
finagle-zipkin-core/src/main/scala/com/twitter/finagle/zipkin/core 81.40%
finagle-zipkin-scribe/src/main/scala/com/twitter/finagle/zipkin/thrift 78.33%
finagle-stats/src/main/scala/com/twitter/finagle/stats/stats.scala 0.00%
...tunable/src/main/scala/com/twitter/finagle/tunable/StandardTunableMap.scala 56.52%
Project Totals (830 files) 78.16%
3559.29
TRAVIS_OS_NAME=linux
openjdk8=
3559.32
TRAVIS_OS_NAME=linux
openjdk8=
3559.34
TRAVIS_OS_NAME=linux
openjdk8=
3559.35
TRAVIS_OS_NAME=linux
openjdk8=
3559.25
TRAVIS_OS_NAME=linux
openjdk8=
3559.23
TRAVIS_OS_NAME=linux
openjdk8=
3559.27
TRAVIS_OS_NAME=linux
openjdk8=
3559.26
TRAVIS_OS_NAME=linux
openjdk8=
3559.31
TRAVIS_OS_NAME=linux
openjdk8=
3559.37
TRAVIS_OS_NAME=linux
openjdk8=
3559.1
TRAVIS_OS_NAME=linux
openjdk8=
3559.6
TRAVIS_OS_NAME=linux
openjdk8=
3559.12
TRAVIS_OS_NAME=linux
openjdk8=