@@ -2,6 +2,7 @@
Loading
2 2
3 3
import com.twitter.finagle._
4 4
import com.twitter.finagle.client.Transporter
5 +
import com.twitter.finagle.loadbalancer.aperture.EagerConnections
5 6
import com.twitter.finagle.service.FailFastFactory
6 7
import com.twitter.finagle.stats._
7 8
import com.twitter.finagle.util.DefaultMonitor
@@ -70,7 +71,8 @@
Loading
70 71
   *
71 72
   * If this is configured, the [[Dest]] param will be ignored.
72 73
   */
73 -
  private[finagle] case class Endpoints(va: Event[Activity.State[Set[TrafficDistributor.AddressedFactory[_, _]]]])
74 +
  private[finagle] case class Endpoints(
75 +
    va: Event[Activity.State[Set[TrafficDistributor.AddressedFactory[_, _]]]])
74 76
75 77
  private[finagle] object Endpoints {
76 78
    implicit val param = Stack.Param(
@@ -308,7 +310,8 @@
Loading
308 310
      val balancerExc = new NoBrokersAvailableException(params[ErrorLabel].label)
309 311
310 312
      def newBalancer(
311 -
        endpoints: Activity[Set[EndpointFactory[Req, Rep]]]
313 +
        endpoints: Activity[Set[EndpointFactory[Req, Rep]]],
314 +
        disableEagerConnections: Boolean
312 315
      ): ServiceFactory[Req, Rep] = {
313 316
        val ordering = params[AddressOrdering].ordering
314 317
        val orderedEndpoints = endpoints.map { set =>
@@ -325,10 +328,15 @@
Loading
325 328
          }
326 329
        }
327 330
331 +
        val paramsWithStats = params + param.Stats(balancerStats)
332 +
        val finalParams =
333 +
          if (disableEagerConnections) paramsWithStats + EagerConnections(false)
334 +
          else paramsWithStats
335 +
328 336
        val underlying = loadBalancerFactory.newBalancer(
329 337
          orderedEndpoints,
330 338
          balancerExc,
331 -
          params + param.Stats(balancerStats)
339 +
          finalParams
332 340
        )
333 341
        params[WhenNoNodesOpenParam].whenNoNodesOpen match {
334 342
          case WhenNoNodesOpen.PickOne => underlying

@@ -311,10 +311,14 @@
Loading
311 311
 * serviced by a distinct `newBalancer` instance. That is, load offered to a weight
312 312
 * class is also load balanced across its members. Offered load is distributed according
313 313
 * to the classes weight and number of members that belong to the class.
314 +
 *
315 +
 * @param newBalancer A lambda to create a new balancer from a set of endpoints and a boolean
316 +
 * indicator, `disableEagerConnection`, if the balancer should have the
317 +
 * [[c.t.f.loadbalancer.aperture.EagerConnections]] feature disabled.
314 318
 */
315 319
private class TrafficDistributor[Req, Rep](
316 320
  dest: Event[Activity.State[Set[TrafficDistributor.AddressedFactory[Req, Rep]]]],
317 -
  newBalancer: Activity[Set[EndpointFactory[Req, Rep]]] => ServiceFactory[Req, Rep],
321 +
  newBalancer: (Activity[Set[EndpointFactory[Req, Rep]]], Boolean) => ServiceFactory[Req, Rep],
318 322
  rng: Rng = Rng.threadLocal,
319 323
  statsReceiver: StatsReceiver = NullStatsReceiver)
320 324
    extends ServiceFactory[Req, Rep] {
@@ -342,8 +346,14 @@
Loading
342 346
      // Construct new balancers from new endpoints.
343 347
      def add(factories: Set[AddressedFactory[Req, Rep]]): CachedBalancer[Req, Rep] = {
344 348
        val group = factories.map(_.factory)
349 +
        val weight = if (factories.isEmpty) 1D else factories.head.weight
345 350
        val endpoints: BalancerEndpoints[Req, Rep] = Var(Activity.Ok(group))
346 -
        val bal = newBalancer(Activity(endpoints))
351 +
352 +
        // we disable eager connections for non 1.0 weight class balancers. We assume the 1.0
353 +
        // weight balancer to be the main balancer and because sessions are managed independently
354 +
        // by each balancer, we avoid eagerly creating connections for balancers that may not be
355 +
        // long-lived.
356 +
        val bal = newBalancer(Activity(endpoints), weight != 1.0)
347 357
        CachedBalancer(bal, endpoints, group.size)
348 358
      }
349 359
@@ -374,7 +384,7 @@
Loading
374 384
        // so a subsequent iteration has a chance to close it. This way,
375 385
        // scanLeft is responsible for creating and managing the resource.
376 386
        val endpoints: BalancerEndpoints[Req, Rep] = Var(Activity.Ok(Set.empty))
377 -
        val bal = newBalancer(Activity(endpoints))
387 +
        val bal = newBalancer(Activity(endpoints), false)
378 388
        Map(1.0 -> CachedBalancer(bal, endpoints, 0))
379 389
      }
380 390
    }.map {
Files Coverage
finagle-base-http/src/main/scala/com/twitter/finagle/http 79.35%
finagle-core/src/main/scala/com/twitter/finagle 85.42%
finagle-exception/src/main/scala/com/twitter/finagle/exception 79.66%
finagle-exp/src/main/scala/com/twitter/finagle/exp 90.00%
finagle-http/src/main/scala/com/twitter/finagle 21.73%
finagle-http2/src/main/scala/com/twitter/finagle/http2 66.29%
finagle-memcached/src/main/scala/com/twitter/finagle 69.63%
finagle-mux/src/main/scala/com/twitter/finagle 76.28%
finagle-mysql/src/main/scala/com/twitter/finagle 67.00%
finagle-netty4-http/src/main/scala/com/twitter/finagle/netty4/http 60.69%
finagle-netty4/src/main/scala/com/twitter/finagle/netty4 87.03%
...le-opencensus-tracing/src/main/scala/com/twitter/finagle/tracing/opencensus 89.89%
finagle-partitioning/src/main/scala/com/twitter/finagle/partitioning 88.81%
finagle-redis/src/main/scala/com/twitter/finagle 36.49%
finagle-scribe/src/main/scala/com/twitter/finagle/scribe 75.93%
finagle-serversets/src/main/scala/com/twitter 79.18%
finagle-stats-core/src/main/scala/com/twitter/finagle/stats 78.04%
finagle-thrift/src/main/scala/com/twitter/finagle 75.06%
finagle-thriftmux/src/main/scala/com/twitter/finagle 87.67%
finagle-toggle/src/main/scala/com/twitter/finagle 91.18%
finagle-zipkin-core/src/main/scala/com/twitter/finagle/zipkin/core 81.40%
finagle-zipkin-scribe/src/main/scala/com/twitter/finagle/zipkin/thrift 78.33%
finagle-stats/src/main/scala/com/twitter/finagle/stats/stats.scala 0.00%
...tunable/src/main/scala/com/twitter/finagle/tunable/StandardTunableMap.scala 56.52%
Project Totals (830 files) 74.08%
3589.25
TRAVIS_OS_NAME=linux
openjdk8=
3589.26
TRAVIS_OS_NAME=linux
openjdk8=
3589.30
TRAVIS_OS_NAME=linux
openjdk8=
3589.29
TRAVIS_OS_NAME=linux
openjdk8=
3589.23
TRAVIS_OS_NAME=linux
openjdk8=
3589.24
TRAVIS_OS_NAME=linux
openjdk8=
3589.27
TRAVIS_OS_NAME=linux
openjdk8=
3589.28
TRAVIS_OS_NAME=linux
openjdk8=
3589.32
TRAVIS_OS_NAME=linux
openjdk8=
3589.31
TRAVIS_OS_NAME=linux
openjdk8=
3589.7
TRAVIS_OS_NAME=linux
openjdk8=
3589.6
TRAVIS_OS_NAME=linux
openjdk8=
3589.5
TRAVIS_OS_NAME=linux
openjdk8=
3589.3
TRAVIS_OS_NAME=linux
openjdk8=
3589.4
TRAVIS_OS_NAME=linux
openjdk8=
3589.2
TRAVIS_OS_NAME=linux
openjdk8=
3589.11
TRAVIS_OS_NAME=linux
openjdk8=
3589.1
TRAVIS_OS_NAME=linux
openjdk8=
3589.13
TRAVIS_OS_NAME=linux
openjdk8=