radicalbit / NSDb

@@ -120,8 +120,8 @@
Loading
120 120
          pathPrefix(Segment) { metric =>
121 121
            (pathEnd & get) {
122 122
              withMetricAuthorization(db, namespace, metric, false, authorizationProvider) {
123 -
                onComplete(metadataCoordinator ? GetLocations(db, namespace, metric)) {
124 -
                  case Success(response: LocationsGot) =>
123 +
                onComplete(metadataCoordinator ? GetLiveLocations(db, namespace, metric)) {
124 +
                  case Success(response: LiveLocationsGot) =>
125 125
                    complete(HttpEntity(ContentTypes.`application/json`, write(response)))
126 126
                  case Success(wrongResponse) =>
127 127
                    logger.error(s"received unexpected response $wrongResponse")

@@ -71,7 +71,6 @@
Loading
71 71
72 72
  private lazy val writeNodesSelectionLogic = new CapacityWriteNodesSelectionLogic(
73 73
    CapacityWriteNodesSelectionLogic.fromConfigValue(config.getString("nsdb.cluster.metrics-selector")))
74 -
  private lazy val readNodesSelection = new LocalityReadNodesSelection(nodeFsId)
75 74
76 75
  private lazy val maxAttempts = context.system.settings.config.getInt("nsdb.write.retry-attempts")
77 76
@@ -110,8 +109,12 @@
Loading
110 109
111 110
  private val clusterListener: ActorRef = createClusterListener
112 111
113 -
  private lazy val metadataCache = context.actorOf(Props[ReplicatedMetadataCache], s"metadata-cache-$actorNameSuffix")
114 -
  private lazy val schemaCache   = context.actorOf(Props[ReplicatedSchemaCache], s"schema-cache-$actorNameSuffix")
112 +
  private lazy val metadataCache = context.actorOf(
113 +
    Props[ReplicatedMetadataCache].withDeploy(Deploy(scope = RemoteScope(selfMember.address))),
114 +
    s"metadata-cache_$actorNameSuffix")
115 +
  private lazy val schemaCache = context.actorOf(
116 +
    Props[ReplicatedSchemaCache].withDeploy(Deploy(scope = RemoteScope(selfMember.address))),
117 +
    s"schema-cache_$actorNameSuffix")
115 118
116 119
  protected lazy val schemaCoordinator: ActorRef = context.actorOf(
117 120
    SchemaCoordinator
@@ -132,7 +135,7 @@
Loading
132 135
  protected lazy val readCoordinator: ActorRef =
133 136
    context.actorOf(
134 137
      ReadCoordinator
135 -
        .props(metadataCoordinator, schemaCoordinator, mediator, readNodesSelection)
138 +
        .props(metadataCoordinator, schemaCoordinator, mediator, new LocalityReadNodesSelection(node.uniqueNodeId))
136 139
        .withDispatcher("akka.actor.control-aware-dispatcher")
137 140
        .withDeploy(Deploy(scope = RemoteScope(selfMember.address))),
138 141
      s"read-coordinator_$actorNameSuffix"

@@ -33,12 +33,12 @@
Loading
33 33
    * commands executed among NSDb actors.
34 34
    */
35 35
  object Commands {
36 -
    case object GetTopology                                                extends NSDbSerializable
37 -
    case class GetLocations(db: String, namespace: String, metric: String) extends NSDbSerializable
38 -
    case object GetDbs                                                     extends ControlMessage with NSDbSerializable
39 -
    case class GetNamespaces(db: String)                                   extends ControlMessage with NSDbSerializable
40 -
    case class GetMetrics(db: String, namespace: String)                   extends ControlMessage with NSDbSerializable
41 -
    case class GetSchema(db: String, namespace: String, metric: String)    extends NSDbSerializable
36 +
    case object GetTopology                                                    extends NSDbSerializable
37 +
    case class GetLiveLocations(db: String, namespace: String, metric: String) extends NSDbSerializable
38 +
    case object GetDbs                                                         extends ControlMessage with NSDbSerializable
39 +
    case class GetNamespaces(db: String)                                       extends ControlMessage with NSDbSerializable
40 +
    case class GetMetrics(db: String, namespace: String)                       extends ControlMessage with NSDbSerializable
41 +
    case class GetSchema(db: String, namespace: String, metric: String)        extends NSDbSerializable
42 42
43 43
    case class GetMetricInfo(db: String, namespace: String, metric: String) extends NSDbSerializable
44 44
@@ -116,8 +116,11 @@
Loading
116 116
    */
117 117
  object Events {
118 118
119 -
    case class LocationsGot(db: String, namespace: String, metric: String, locations: Seq[Location])
120 -
        extends NSDbSerializable
119 +
    sealed trait GetLiveLocationsResponse extends NSDbSerializable
120 +
    case class LiveLocationsGot(db: String, namespace: String, metric: String, locations: Seq[Location])
121 +
        extends GetLiveLocationsResponse
122 +
    case class GetLiveLocationError(db: String, namespace: String, metric: String, error: String)
123 +
        extends GetLiveLocationsResponse
121 124
122 125
    @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
123 126
    @JsonSubTypes(

@@ -18,7 +18,6 @@
Loading
18 18
19 19
import java.math.{MathContext, RoundingMode}
20 20
import java.util.concurrent.TimeUnit
21 -
22 21
import akka.actor.{ActorRef, Props}
23 22
import akka.cluster.pubsub.DistributedPubSubMediator.{Publish, Subscribe}
24 23
import akka.pattern.ask
@@ -192,91 +191,102 @@
Loading
192 191
      val startTime                         = System.currentTimeMillis()
193 192
      implicit val timeContext: TimeContext = context.getOrElse(TimeContext())
194 193
      log.debug("executing {} with {} data actors", statement, metricsDataActors.size)
195 -
      val selectStatementResponse: Future[ExecuteSelectStatementResponse] = Future
196 -
        .sequence(
197 -
          Seq(
198 -
            schemaCoordinator ? GetSchema(statement.db, statement.namespace, statement.metric),
199 -
            metadataCoordinator ? GetLocations(statement.db, statement.namespace, statement.metric)
200 -
          ))
201 -
        .flatMap {
202 -
          case SchemaGot(_, _, _, Some(schema)) :: LocationsGot(_, _, _, locations) :: Nil =>
203 -
            log.debug("found schema {} and locations {}", schema, locations)
204 -
            val filteredLocations: Seq[Location] =
205 -
              ReadNodesSelection.filterLocationsThroughTime(statement.condition.map(_.expression), locations)
206 -
207 -
            val uniqueLocationsByNode: Map[String, Seq[Location]] =
208 -
              readNodesSelection.getDistinctLocationsByNode(filteredLocations)
209 194
210 -
            StatementParser.parseStatement(statement, schema) match {
211 -
              case Right(ParsedSimpleQuery(_, _, _, _, false, _, _, _)) =>
212 -
                gatherNodeResults(msg, schema, uniqueLocationsByNode)(applyOrderingWithLimit(_, statement, schema))
213 -
214 -
              case Right(ParsedSimpleQuery(_, _, _, _, true, _, fields, _)) if fields.lengthCompare(1) == 0 =>
215 -
                val distinctField = fields.head.name
216 -
217 -
                gatherAndGroupNodeResults(msg, distinctField, schema, uniqueLocationsByNode) { bits =>
218 -
                  Bit(
219 -
                    timestamp = 0,
220 -
                    value = NSDbNumericType(0),
221 -
                    dimensions = retrieveField(bits, distinctField, (bit: Bit) => bit.dimensions),
222 -
                    tags = retrieveField(bits, distinctField, (bit: Bit) => bit.tags)
223 -
                  )
224 -
                }.map(limitAndOrder(_))
225 -
226 -
              case Right(ParsedGlobalAggregatedQuery(_, _, _, _, _, fields, aggregations, _)) =>
227 -
                gatherNodeResults(msg, schema, uniqueLocationsByNode) { rawResults =>
228 -
                  globalAggregationReduce(rawResults, fields, aggregations, statement, schema)
229 -
                }
230 -
              case Right(ParsedAggregatedQuery(_, _, _, _, aggregation, _, _)) =>
231 -
                gatherAndGroupNodeResults(msg, statement.groupBy.get.field, schema, uniqueLocationsByNode)(
232 -
                  internalAggregationReduce(_, schema, aggregation)
233 -
                ).map(limitAndOrder(_, Some(aggregation)))
234 -
235 -
              case Right(
236 -
                  ParsedTemporalAggregatedQuery(_, _, _, _, rangeLength, aggregation, condition, _, gracePeriod, _)) =>
237 -
                val sortedLocations = filteredLocations.sortBy(_.from)
238 -
                val limitedLocations = gracePeriod.fold(sortedLocations)(gracePeriodInterval =>
239 -
                  ReadNodesSelection.filterLocationsThroughGracePeriod(gracePeriodInterval, sortedLocations))
240 -
241 -
                val timeRangeContext: Option[TimeRangeContext] =
242 -
                  if (limitedLocations.isEmpty) None
243 -
                  else {
244 -
245 -
                    val upperBound = limitedLocations.last.to
246 -
                    val lowerBound = limitedLocations.head.from
247 -
248 -
                    Some(
249 -
                      TimeRangeContext(upperBound,
250 -
                                       lowerBound,
251 -
                                       rangeLength,
252 -
                                       TimeRangeManager.computeRangesForIntervalAndCondition(upperBound,
253 -
                                                                                             lowerBound,
254 -
                                                                                             rangeLength,
255 -
                                                                                             condition,
256 -
                                                                                             gracePeriod)))
195 +
      val selectStatementResponse: Future[ExecuteSelectStatementResponse] =
196 +
        (for {
197 +
          getSchemaResponse <- schemaCoordinator ? GetSchema(statement.db, statement.namespace, statement.metric)
198 +
          liveLocationResponse <- metadataCoordinator ? GetLiveLocations(statement.db,
199 +
                                                                         statement.namespace,
200 +
                                                                         statement.metric)
201 +
        } yield (getSchemaResponse, liveLocationResponse))
202 +
          .recoverWith {
203 +
            case t =>
204 +
              log.error(t, s"Error in Execute Statement $statement")
205 +
              Future(SelectStatementFailed(statement, "Generic error occurred"))
206 +
          }
207 +
          .flatMap {
208 +
            case (SchemaGot(_, _, _, Some(schema)), LiveLocationsGot(_, _, _, liveLocations)) =>
209 +
              log.debug(s"found schema $schema, and live $liveLocations")
210 +
              val filteredLocations: Seq[Location] =
211 +
                ReadNodesSelection.filterLocationsThroughTime(statement.condition.map(_.expression), liveLocations)
212 +
213 +
              val uniqueLocationsByNode: Map[String, Seq[Location]] =
214 +
                readNodesSelection.getDistinctLocationsByNode(filteredLocations)
215 +
216 +
              StatementParser.parseStatement(statement, schema) match {
217 +
                case Right(ParsedSimpleQuery(_, _, _, _, false, _, _, _)) =>
218 +
                  gatherNodeResults(msg, schema, uniqueLocationsByNode)(applyOrderingWithLimit(_, statement, schema))
219 +
220 +
                case Right(ParsedSimpleQuery(_, _, _, _, true, _, fields, _)) if fields.lengthCompare(1) == 0 =>
221 +
                  val distinctField = fields.head.name
222 +
223 +
                  gatherAndGroupNodeResults(msg, distinctField, schema, uniqueLocationsByNode) { bits =>
224 +
                    Bit(
225 +
                      timestamp = 0,
226 +
                      value = NSDbNumericType(0),
227 +
                      dimensions = retrieveField(bits, distinctField, (bit: Bit) => bit.dimensions),
228 +
                      tags = retrieveField(bits, distinctField, (bit: Bit) => bit.tags)
229 +
                    )
230 +
                  }.map(limitAndOrder(_))
231 +
232 +
                case Right(ParsedGlobalAggregatedQuery(_, _, _, _, _, fields, aggregations, _)) =>
233 +
                  gatherNodeResults(msg, schema, uniqueLocationsByNode) { rawResults =>
234 +
                    globalAggregationReduce(rawResults, fields, aggregations, statement, schema)
257 235
                  }
258 -
259 -
                val limitedUniqueLocationsByNode = readNodesSelection.getDistinctLocationsByNode(limitedLocations)
260 -
261 -
                gatherNodeResults(msg, schema, limitedUniqueLocationsByNode, timeRangeContext)(
262 -
                  postProcessingTemporalQueryResult(schema, statement, aggregation))
263 -
264 -
              case Left(error) =>
265 -
                Future(SelectStatementFailed(statement, error))
266 -
              case _ =>
267 -
                Future(SelectStatementFailed(statement, "Not a select statement."))
268 -
            }
269 -
          case _ =>
270 -
            Future(
271 -
              SelectStatementFailed(statement,
272 -
                                    s"Metric ${statement.metric} does not exist ",
273 -
                                    MetricNotFound(statement.metric)))
274 -
        }
275 -
        .recoverWith {
276 -
          case t =>
277 -
            log.error(t, s"Error in Execute Statement $statement")
278 -
            Future(SelectStatementFailed(statement, "Generic error occurred"))
279 -
        }
236 +
                case Right(ParsedAggregatedQuery(_, _, _, _, aggregation, _, _)) =>
237 +
                  gatherAndGroupNodeResults(msg, statement.groupBy.get.field, schema, uniqueLocationsByNode)(
238 +
                    internalAggregationReduce(_, schema, aggregation)
239 +
                  ).map(limitAndOrder(_, Some(aggregation)))
240 +
241 +
                case Right(
242 +
                    ParsedTemporalAggregatedQuery(_,
243 +
                                                  _,
244 +
                                                  _,
245 +
                                                  _,
246 +
                                                  rangeLength,
247 +
                                                  aggregation,
248 +
                                                  condition,
249 +
                                                  _,
250 +
                                                  gracePeriod,
251 +
                                                  _)) =>
252 +
                  val sortedLocations = filteredLocations.sortBy(_.from)
253 +
                  val limitedLocations = gracePeriod.fold(sortedLocations)(gracePeriodInterval =>
254 +
                    ReadNodesSelection.filterLocationsThroughGracePeriod(gracePeriodInterval, sortedLocations))
255 +
256 +
                  val timeRangeContext: Option[TimeRangeContext] =
257 +
                    if (limitedLocations.isEmpty) None
258 +
                    else {
259 +
260 +
                      val upperBound = limitedLocations.last.to
261 +
                      val lowerBound = limitedLocations.head.from
262 +
263 +
                      Some(
264 +
                        TimeRangeContext(upperBound,
265 +
                                         lowerBound,
266 +
                                         rangeLength,
267 +
                                         TimeRangeManager.computeRangesForIntervalAndCondition(upperBound,
268 +
                                                                                               lowerBound,
269 +
                                                                                               rangeLength,
270 +
                                                                                               condition,
271 +
                                                                                               gracePeriod)))
272 +
                    }
273 +
274 +
                  val limitedUniqueLocationsByNode = readNodesSelection.getDistinctLocationsByNode(limitedLocations)
275 +
276 +
                  gatherNodeResults(msg, schema, limitedUniqueLocationsByNode, timeRangeContext)(
277 +
                    postProcessingTemporalQueryResult(schema, statement, aggregation))
278 +
279 +
                case Left(error) =>
280 +
                  Future(SelectStatementFailed(statement, error))
281 +
                case _ =>
282 +
                  Future(SelectStatementFailed(statement, "Not a select statement."))
283 +
              }
284 +
            case _ =>
285 +
              Future(
286 +
                SelectStatementFailed(statement,
287 +
                                      s"Metric ${statement.metric} does not exist ",
288 +
                                      MetricNotFound(statement.metric)))
289 +
          }
280 290
281 291
      selectStatementResponse.pipeToWithEffect(sender()) { _ =>
282 292
        if (perfLogger.isDebugEnabled)

@@ -441,7 +441,7 @@
Loading
441 441
        locations <- Future
442 442
          .sequence {
443 443
            metrics.map(metric =>
444 -
              (metadataCoordinator ? GetLocations(db, namespace, metric)).mapTo[LocationsGot].map(_.locations))
444 +
              (metadataCoordinator ? GetLiveLocations(db, namespace, metric)).mapTo[LiveLocationsGot].map(_.locations))
445 445
          }
446 446
          .map(_.flatten)
447 447
        commitLogResponses <- Future.sequence {
@@ -492,10 +492,10 @@
Loading
492 492
              (schemaCoordinator ? GetSchema(statement.db, statement.namespace, statement.metric))
493 493
                .flatMap {
494 494
                  case SchemaGot(_, _, _, Some(schema)) =>
495 -
                    (metadataCoordinator ? GetLocations(db, namespace, metric)).flatMap {
496 -
                      case LocationsGot(_, _, _, locations) if locations.isEmpty =>
495 +
                    (metadataCoordinator ? GetLiveLocations(db, namespace, metric)).flatMap {
496 +
                      case LiveLocationsGot(_, _, _, locations) if locations.isEmpty =>
497 497
                        Future(DeleteStatementExecuted(statement.db, statement.metric, statement.metric))
498 -
                      case LocationsGot(_, _, _, locations) =>
498 +
                      case LiveLocationsGot(_, _, _, locations) =>
499 499
                        broadcastMessage(ExecuteDeleteStatementInternalInLocations(statement, schema, locations))
500 500
                      case _ =>
501 501
                        Future(
@@ -518,7 +518,9 @@
Loading
518 518
      }.pipeTo(sender())
519 519
    case msg @ DropMetric(db, namespace, metric) =>
520 520
      val chain = for {
521 -
        locations <- (metadataCoordinator ? GetLocations(db, namespace, metric)).mapTo[LocationsGot].map(_.locations)
521 +
        locations <- (metadataCoordinator ? GetLiveLocations(db, namespace, metric))
522 +
          .mapTo[LiveLocationsGot]
523 +
          .map(_.locations)
522 524
        commitLogResponses <- Future.sequence {
523 525
          locations
524 526
            .collect {

@@ -157,11 +157,11 @@
Loading
157 157
  final case class NodeToBlackListAdded(node: NSDbNode)                     extends AddNodeToBlackListResponse
158 158
  final case class AddNodeToBlackListFailed(node: NSDbNode, reason: String) extends AddNodeToBlackListResponse
159 159
160 -
  final case class NSDbNodeWithTtl(node: NSDbNode, createdAt: Long) extends NSDbSerializable
161 -
  final case object GetNodesBlackList                               extends NSDbSerializable
162 -
  sealed trait GetNodesBlackListResponse                            extends NSDbSerializable
163 -
  final case class NodesBlackListGot(blacklist: Set[NSDbNode])      extends GetNodesBlackListResponse
164 -
  final case class GetNodesBlackListFailed(reason: String)          extends GetNodesBlackListResponse
160 +
  final case class NSDbNodeWithTtl(node: NSDbNode, createdAt: Long)     extends NSDbSerializable
161 +
  final case object GetNodesBlackListFromCache                          extends NSDbSerializable
162 +
  sealed trait GetNodesBlackListFromCacheResponse                       extends NSDbSerializable
163 +
  final case class NodesBlackListFromCacheGot(blacklist: Set[NSDbNode]) extends GetNodesBlackListFromCacheResponse
164 +
  final case class GetNodesBlackListFromCacheFailed(reason: String)     extends GetNodesBlackListFromCacheResponse
165 165
166 166
  final case object CheckBlackListTtl                                               extends NSDbSerializable
167 167
  final case class RemoveNodesFromBlacklist(nodesToBeRemoved: Set[NSDbNodeWithTtl]) extends NSDbSerializable
@@ -413,7 +413,7 @@
Loading
413 413
    case GetMetricsFromCache(db, namespace) =>
414 414
      log.debug("searching for key {} in cache", coordinatesKey)
415 415
      replicator ! Get(coordinatesKey, ReadLocal, request = Some(MetricRequest(db, namespace, sender())))
416 -
    case GetNodesBlackList =>
416 +
    case GetNodesBlackListFromCache =>
417 417
      log.debug("searching for nodes blacklist")
418 418
      replicator ! Get(nodesBlacklistKey, ReadLocal, Some(NodesBlackListRequest(sender())))
419 419
    case DropMetricFromCache(db, namespace, metric) =>
@@ -544,7 +544,7 @@
Loading
544 544
      replyTo ! MetricsFromCacheGot(db, namespace, elements)
545 545
    case g @ GetSuccess(_, Some(NodesBlackListRequest(replyTo))) =>
546 546
      val elements = g.dataValue.asInstanceOf[ORSet[NSDbNodeWithTtl]].elements
547 -
      replyTo ! NodesBlackListGot(elements.map(_.node))
547 +
      replyTo ! NodesBlackListFromCacheGot(elements.map(_.node))
548 548
    case g @ GetSuccess(_, Some(NodesBlackListInternalRequest)) =>
549 549
      val elements = g.dataValue.asInstanceOf[ORSet[NSDbNodeWithTtl]].elements
550 550
      self ! RemoveNodesFromBlacklist(elements.filter(e => System.currentTimeMillis > e.createdAt + blackListTtl))
@@ -555,7 +555,7 @@
Loading
555 555
    case NotFound(_, Some(OutdatedLocationsRequest(replyTo))) =>
556 556
      replyTo ! OutdatedLocationsFromCacheGot(Set.empty)
557 557
    case NotFound(_, Some(NodesBlackListRequest(replyTo))) =>
558 -
      replyTo ! NodesBlackListGot(Set.empty)
558 +
      replyTo ! NodesBlackListFromCacheGot(Set.empty)
559 559
    case NotFound(_, Some(NodesBlackListInternalRequest)) => //do nothing
560 560
    case NotFound(_, Some(MetricInfoRequest(key, replyTo))) =>
561 561
      replyTo ! MetricInfoCached(key.db, key.namespace, key.metric, None)

@@ -16,7 +16,6 @@
Loading
16 16
17 17
package io.radicalbit.nsdb.cluster.coordinator
18 18
19 -
import java.util.concurrent.TimeUnit
20 19
import akka.actor._
21 20
import akka.cluster.ddata.DurableStore.{LoadAll, LoadData}
22 21
import akka.cluster.ddata._
@@ -36,7 +35,6 @@
Loading
36 35
import io.radicalbit.nsdb.cluster.coordinator.MetadataCoordinator.events._
37 36
import io.radicalbit.nsdb.cluster.logic.WriteConfig.MetadataConsistency.MetadataConsistency
38 37
import io.radicalbit.nsdb.cluster.logic.WriteNodesSelectionLogic
39 -
import io.radicalbit.nsdb.util.ErrorManagementUtils.{partitionResponses, _}
40 38
import io.radicalbit.nsdb.commit_log.CommitLogWriterActor._
41 39
import io.radicalbit.nsdb.common.configuration.NSDbConfig
42 40
import io.radicalbit.nsdb.common.model.MetricInfo
@@ -48,7 +46,9 @@
Loading
48 46
import io.radicalbit.nsdb.protocol.MessageProtocol.Events._
49 47
import io.radicalbit.nsdb.statement.TimeRangeManager
50 48
import io.radicalbit.nsdb.util.ActorPathLogging
49 +
import io.radicalbit.nsdb.util.ErrorManagementUtils._
51 50
51 +
import java.util.concurrent.TimeUnit
52 52
import scala.collection.mutable
53 53
import scala.concurrent.Future
54 54
import scala.concurrent.duration.FiniteDuration
@@ -159,6 +159,31 @@
Loading
159 159
        case _ => post(MetricInfo(db, namespace, metric, defaultShardingInterval))
160 160
      }
161 161
162 +
  /**
163 +
    * A live location is a location that is not blacklisted and it's consider safe to fetch data from.
164 +
    * @param db the db.
165 +
    * @param namespace the namespace.
166 +
    * @param metric the metric.
167 +
    * @return a [[LiveLocationsGot]] containing the sequence of live locations. If an error occurs, a [[GetLiveLocationError]] is returned instead.
168 +
    */
169 +
  protected def getLiveLocation(db: String, namespace: String, metric: String): Future[GetLiveLocationsResponse] = {
170 +
    (for {
171 +
      allLocations <- (metadataCache ? GetLocationsFromCache(db, namespace, metric))
172 +
        .mapTo[LocationsCached]
173 +
      blacklist <- (metadataCache ? GetNodesBlackListFromCache)
174 +
        .mapTo[GetNodesBlackListFromCacheResponse]
175 +
    } yield
176 +
      blacklist match {
177 +
        case NodesBlackListFromCacheGot(blackList) =>
178 +
          LiveLocationsGot(db, namespace, metric, allLocations.locations.filterNot(loc => blackList.contains(loc.node)))
179 +
        case GetNodesBlackListFromCacheFailed(reason) => GetLiveLocationError(db, namespace, metric, reason)
180 +
      }).recover {
181 +
      case t =>
182 +
        log.error(t, "Unexpected error while retrieving Nodes Blacklist")
183 +
        GetLiveLocationError(db, namespace, metric, t.getMessage)
184 +
    }
185 +
  }
186 +
162 187
  override def preStart(): Unit = {
163 188
    mediator ! Subscribe(COORDINATORS_TOPIC, self)
164 189
@@ -445,11 +470,8 @@
Loading
445 470
          NamespaceDeleted(db, namespace)
446 471
        }
447 472
        .pipeTo(sender())
448 -
    case GetLocations(db, namespace, metric) =>
449 -
      (metadataCache ? GetLocationsFromCache(db, namespace, metric))
450 -
        .mapTo[LocationsCached]
451 -
        .map(l => LocationsGot(db, namespace, metric, l.locations))
452 -
        .pipeTo(sender())
473 +
    case GetLiveLocations(db, namespace, metric) =>
474 +
      getLiveLocation(db, namespace, metric).pipeTo(sender())
453 475
    case GetWriteLocations(db, namespace, metric, timestamp) =>
454 476
      val clusterAliveMembers = nsdbClusterSnapshot.nodes
455 477
      log.debug(s"cluster alive members $clusterAliveMembers")
@@ -469,9 +491,9 @@
Loading
469 491
          if (retention > 0 && (timestamp < currentTime - retention || timestamp > currentTime + retention))
470 492
            Future(GetWriteLocationsBeyondRetention(db, namespace, metric, timestamp, metricInfo.retention))
471 493
          else {
472 -
            (metadataCache ? GetLocationsFromCache(db, namespace, metric))
494 +
            getLiveLocation(db, namespace, metric)
473 495
              .flatMap {
474 -
                case LocationsCached(_, _, _, locations) =>
496 +
                case LiveLocationsGot(_, _, _, locations) =>
475 497
                  locations.filter(location => location.contains(timestamp)) match {
476 498
                    case Nil =>
477 499
                      for {
@@ -505,7 +527,7 @@
Loading
505 527
                    case s => Future(WriteLocationsGot(db, namespace, metric, s))
506 528
                  }
507 529
                case e =>
508 -
                  val errorMessage = s"unexpected result while trying to get locations in cache $e"
530 +
                  val errorMessage = s"unexpected result while trying to get live locations in cache $e"
509 531
                  Future(GetWriteLocationsFailed(db, namespace, metric, timestamp, errorMessage))
510 532
              }
511 533
          }
@@ -708,6 +730,7 @@
Loading
708 730
    trait RestoreMetadataResponse                                  extends NSDbSerializable
709 731
    case class MetadataRestored(path: String)                      extends RestoreMetadataResponse
710 732
    case class RestoreMetadataFailed(path: String, reason: String) extends RestoreMetadataResponse
733 +
711 734
  }
712 735
713 736
  def props(clusterListener: ActorRef,

@@ -22,15 +22,15 @@
Loading
22 22
  * ReadNodesSelection strategy that privileges local locations.
23 23
  * @param localNode the local node.
24 24
  */
25 -
class LocalityReadNodesSelection(localNode: String) extends ReadNodesSelection {
25 +
class LocalityReadNodesSelection(localNodeUniqueId: String) extends ReadNodesSelection {
26 26
27 27
  override def getDistinctLocationsByNode(locationsWithReplicas: Seq[Location]): Map[String, Seq[Location]] = {
28 -
    assert(localNode != null)
28 +
    assert(localNodeUniqueId != null)
29 29
    locationsWithReplicas
30 30
      .groupBy(l => (l.from, l.to))
31 31
      .map {
32 32
        case ((_, _), locations) if locations.size > 1 =>
33 -
          locations.find(_.node.uniqueNodeId == localNode).getOrElse(locations.head)
33 +
          locations.find(_.node.uniqueNodeId == localNodeUniqueId).getOrElse(locations.head)
34 34
        case ((_, _), locations) => locations.minBy(_.node.uniqueNodeId)
35 35
      }
36 36
      .toSeq

No yaml found.

Create your codecov.yml to customize your Codecov experience

Sunburst
The inner-most circle is the entire project, moving away from the center are folders then, finally, a single file. The size and color of each slice is representing the number of statements and the coverage, respectively.
Icicle
The top section represents the entire project. Proceeding with folders and finally individual files. The size and color of each slice is representing the number of statements and the coverage, respectively.
Grid
Each block represents a single file in the project. The size and color of each block is represented by the number of statements and the coverage, respectively.
Loading