@@ -206,7 +206,7 @@
Loading
206 206
  ] = {
207 207
    val init: (B, Map[Try[Int], CachedServiceFactory[Req, Rep]]) =
208 208
      (PartialFunction.empty, Map.empty)
209 -
    safelyScanLeft(init, destActivity.join(observable)) {
209 +
    Activity(safelyScanLeft(init, destActivity.join(observable).run.changes) {
210 210
      case ((_, partitionNodes), (activeSet, state)) =>
211 211
        (
212 212
          getPartitionFunctionPerState(state),
@@ -216,7 +216,7 @@
Loading
216 216
            getShardIdFromAddress(state),
217 217
            cachedServiceFactoryDiffOps
218 218
          ))
219 -
    }
219 +
    })
220 220
  }
221 221
222 222
  // Transform the stream of [[CachedServiceFactory]] to ServiceFactory and filter out

@@ -77,12 +77,12 @@
Loading
77 77
   */
78 78
  private[finagle] def safelyScanLeft[T, U](
79 79
    init: U,
80 -
    stream: Activity[T]
80 +
    stream: Event[Activity.State[T]]
81 81
  )(
82 82
    f: (U, T) => U
83 -
  ): Activity[U] = {
83 +
  ): Event[Activity.State[U]] = {
84 84
    val initState: Activity.State[U] = Activity.Ok(init)
85 -
    Activity(stream.states.foldLeft(initState) {
85 +
    stream.foldLeft(initState) {
86 86
      case (Activity.Pending, Activity.Ok(update)) => Activity.Ok(f(init, update))
87 87
      case (Activity.Failed(_), Activity.Ok(update)) => Activity.Ok(f(init, update))
88 88
      case (Activity.Ok(state), Activity.Ok(update)) => Activity.Ok(f(state, update))
@@ -90,7 +90,7 @@
Loading
90 90
      case (stale @ Activity.Ok(state), Activity.Pending) if init != state => stale
91 91
      case (_, failed @ Activity.Failed(_)) => failed
92 92
      case (_, Activity.Pending) => Activity.Pending
93 -
    })
93 +
    }
94 94
  }
95 95
96 96
  /**
@@ -252,8 +252,8 @@
Loading
252 252
   * the [[Address]] does not have a weight, a default weight of 1.0 is used.
253 253
   */
254 254
  private[this] def weightEndpoints(
255 -
    addrs: Activity[Set[Address]]
256 -
  ): Activity[Set[WeightedFactory[Req, Rep]]] = {
255 +
    addrs: Event[Activity.State[Set[Address]]]
256 +
  ): Event[Activity.State[Set[WeightedFactory[Req, Rep]]]] = {
257 257
    val init = Map.empty[Address, WeightedFactory[Req, Rep]]
258 258
    safelyScanLeft(init, addrs) {
259 259
      case (active, addrs) =>
@@ -291,7 +291,11 @@
Loading
291 291
              case _ => cache
292 292
            }
293 293
        }
294 -
    }.map(_.values.toSet)
294 +
    }.map {
295 +
      case Activity.Ok(cache) => Activity.Ok(cache.values.toSet)
296 +
      case Activity.Pending => Activity.Pending
297 +
      case failed @ Activity.Failed(_) => failed
298 +
    }
295 299
  }
296 300
297 301
  /**
@@ -299,8 +303,8 @@
Loading
299 303
   * Because balancer instances are stateful, they need to be cached across updates.
300 304
   */
301 305
  private[this] def partition(
302 -
    endpoints: Activity[Set[WeightedFactory[Req, Rep]]]
303 -
  ): Activity[Iterable[WeightClass[Req, Rep]]] = {
306 +
    endpoints: Event[Activity.State[Set[WeightedFactory[Req, Rep]]]]
307 +
  ): Event[Activity.State[Iterable[WeightClass[Req, Rep]]]] = {
304 308
    // Cache entries are balancer instances together with their backing collection
305 309
    // which is updatable. The entries are keyed by weight class.
306 310
    val init = Map.empty[Double, CachedBalancer[Req, Rep]]
@@ -351,15 +355,18 @@
Loading
351 355
        val bal = newBalancer(Activity(endpoints))
352 356
        Map(1.0 -> CachedBalancer(bal, endpoints, 0))
353 357
      }
354 -
    }.map { cache =>
355 -
      cache.map {
356 -
        case (weight, CachedBalancer(bal, endpoints, size)) =>
357 -
          WeightClass(bal, endpoints, weight, size)
358 -
      }
358 +
    }.map {
359 +
      case Activity.Ok(cache) =>
360 +
        Activity.Ok(cache.map {
361 +
          case (weight, CachedBalancer(bal, endpoints, size)) =>
362 +
            WeightClass(bal, endpoints, weight, size)
363 +
        })
364 +
      case Activity.Pending => Activity.Pending
365 +
      case failed @ Activity.Failed(_) => failed
359 366
    }
360 367
  }
361 368
362 -
  private[this] val weightClasses = partition(weightEndpoints(dest))
369 +
  private[this] val weightClasses = partition(weightEndpoints(dest.states))
363 370
  private[this] val pending = new Promise[ServiceFactory[Req, Rep]]
364 371
  private[this] val init: ServiceFactory[Req, Rep] = new DelayedFactory(pending)
365 372
@@ -387,7 +394,7 @@
Loading
387 394
  // Translate the stream of weightClasses into a stream of underlying
388 395
  // ServiceFactories that can service requests.
389 396
  private[this] val underlying: Event[ServiceFactory[Req, Rep]] =
390 -
    weightClasses.states.foldLeft(init) {
397 +
    weightClasses.foldLeft(init) {
391 398
      case (_, Activity.Ok(wcs)) =>
392 399
        val dist = new Distributor(wcs, busyWeightClasses, rng)
393 400
        updateGauges(wcs)
Files Coverage
finagle-base-http/src/main/scala/com/twitter/finagle/http 85.08%
finagle-core/src/main/scala/com/twitter/finagle 85.74%
finagle-exception/src/main/scala/com/twitter/finagle/exception 79.66%
finagle-exp/src/main/scala/com/twitter/finagle/exp 91.00%
finagle-http/src/main/scala/com/twitter/finagle 81.89%
finagle-http2/src/main/scala/com/twitter/finagle/http2 76.84%
finagle-memcached/src/main/scala/com/twitter/finagle 69.63%
finagle-mux/src/main/scala/com/twitter/finagle 76.70%
finagle-mysql/src/main/scala/com/twitter/finagle 66.94%
finagle-netty4-http/src/main/scala/com/twitter/finagle/netty4/http 95.97%
finagle-netty4/src/main/scala/com/twitter/finagle/netty4 87.73%
...le-opencensus-tracing/src/main/scala/com/twitter/finagle/tracing/opencensus 89.89%
finagle-partitioning/src/main/scala/com/twitter/finagle/partitioning 87.76%
finagle-redis/src/main/scala/com/twitter/finagle 36.21%
finagle-scribe/src/main/scala/com/twitter/finagle/scribe 75.93%
finagle-serversets/src/main/scala/com/twitter 79.27%
finagle-stats-core/src/main/scala/com/twitter/finagle/stats 78.01%
finagle-thrift/src/main/scala/com/twitter/finagle 75.22%
finagle-thriftmux/src/main/scala/com/twitter/finagle 87.67%
finagle-toggle/src/main/scala/com/twitter/finagle 86.18%
finagle-zipkin-core/src/main/scala/com/twitter/finagle/zipkin/core 81.40%
finagle-zipkin-scribe/src/main/scala/com/twitter/finagle/zipkin/thrift 78.33%
finagle-stats/src/main/scala/com/twitter/finagle/stats/stats.scala 0.00%
...tunable/src/main/scala/com/twitter/finagle/tunable/StandardTunableMap.scala 56.52%
Project Totals (829 files) 78.15%
3563.30
TRAVIS_OS_NAME=linux
openjdk8=
3563.33
TRAVIS_OS_NAME=linux
openjdk8=
3563.31
TRAVIS_OS_NAME=linux
openjdk8=
3563.32
TRAVIS_OS_NAME=linux
openjdk8=
3563.25
TRAVIS_OS_NAME=linux
openjdk8=
3563.27
TRAVIS_OS_NAME=linux
openjdk8=
3563.28
TRAVIS_OS_NAME=linux
openjdk8=
3563.29
TRAVIS_OS_NAME=linux
openjdk8=
3563.35
TRAVIS_OS_NAME=linux
openjdk8=
3563.34
TRAVIS_OS_NAME=linux
openjdk8=
3563.3
TRAVIS_OS_NAME=linux
openjdk8=
3563.2
TRAVIS_OS_NAME=linux
openjdk8=
3563.13
TRAVIS_OS_NAME=linux
openjdk8=
3563.67
openjdk11=
TRAVIS_OS_NAME=linux
3563.65
openjdk11=
TRAVIS_OS_NAME=linux
3563.61
openjdk11=
TRAVIS_OS_NAME=linux
3563.63
openjdk11=
TRAVIS_OS_NAME=linux
3563.66
openjdk11=
TRAVIS_OS_NAME=linux
3563.64
openjdk11=
TRAVIS_OS_NAME=linux