radicalbit / NSDb

@@ -16,7 +16,6 @@
Loading
16 16
17 17
package io.radicalbit.nsdb.cluster.actor
18 18
19 -
import java.util.concurrent.TimeUnit
20 19
import akka.actor.{Actor, ActorLogging, ActorRef}
21 20
import akka.cluster.ddata._
22 21
import akka.dispatch.ControlMessage
@@ -29,6 +28,7 @@
Loading
29 28
import io.radicalbit.nsdb.model.{Location, LocationWithCoordinates}
30 29
import io.radicalbit.nsdb.util.ErrorManagementUtils
31 30
31 +
import java.util.concurrent.TimeUnit
32 32
import scala.concurrent.Future
33 33
import scala.concurrent.duration.FiniteDuration
34 34
@@ -64,6 +64,7 @@
Loading
64 64
  private final case class NamespaceRequest(db: String, replyTo: ActorRef)
65 65
  private final case class MetricRequest(db: String, namespace: String, replyTo: ActorRef)
66 66
  private final case class NodesBlackListRequest(replyTo: ActorRef)
67 +
  private final case object NodesBlackListInternalRequest
67 68
68 69
  final case class PutCoordinateInCache(db: String, namespace: String, metric: String) extends NSDbSerializable
69 70
  final case class PutLocationInCache(db: String,
@@ -162,7 +163,8 @@
Loading
162 163
  final case class NodesBlackListGot(blacklist: Set[NSDbNode])      extends GetNodesBlackListResponse
163 164
  final case class GetNodesBlackListFailed(reason: String)          extends GetNodesBlackListResponse
164 165
165 -
  case object CheckBlackListTtl extends NSDbSerializable
166 +
  final case object CheckBlackListTtl                                               extends NSDbSerializable
167 +
  final case class RemoveNodesFromBlacklist(nodesToBeRemoved: Set[NSDbNodeWithTtl]) extends NSDbSerializable
166 168
}
167 169
168 170
/**
@@ -238,7 +240,10 @@
Loading
238 240
  import context.dispatcher
239 241
240 242
  override def preStart(): Unit = {
241 -
    context.system.scheduler.scheduleAtFixedRate(blackListCheckInterval, blackListCheckInterval, self, null)
243 +
    context.system.scheduler.scheduleAtFixedRate(blackListCheckInterval,
244 +
                                                 blackListCheckInterval,
245 +
                                                 self,
246 +
                                                 CheckBlackListTtl)
242 247
  }
243 248
244 249
  def receive: Receive = {
@@ -360,6 +365,16 @@
Loading
360 365
        }
361 366
        .pipeTo(sender)
362 367
    case CheckBlackListTtl =>
368 +
      replicator ! Get(nodesBlacklistKey, ReadLocal, Some(NodesBlackListInternalRequest))
369 +
    case RemoveNodesFromBlacklist(toBeRemoved) =>
370 +
      Future
371 +
        .sequence(
372 +
          toBeRemoved.map(e =>
373 +
            (replicator ? Update(nodesBlacklistKey, ORSet(), metadataWriteConsistency)(_ remove e))
374 +
              .mapTo[UpdateSuccess[_]]))
375 +
        .recover {
376 +
          case t => log.error(t, "Unexpected error while removing a node from blacklist")
377 +
        }
363 378
    case AddNodeToBlackList(node) =>
364 379
      (replicator ? Update(nodesBlacklistKey, ORSet(), metadataWriteConsistency)(
365 380
        _ :+ NSDbNodeWithTtl(node, System.currentTimeMillis)))
@@ -530,6 +545,9 @@
Loading
530 545
    case g @ GetSuccess(_, Some(NodesBlackListRequest(replyTo))) =>
531 546
      val elements = g.dataValue.asInstanceOf[ORSet[NSDbNodeWithTtl]].elements
532 547
      replyTo ! NodesBlackListGot(elements.map(_.node))
548 +
    case g @ GetSuccess(_, Some(NodesBlackListInternalRequest)) =>
549 +
      val elements = g.dataValue.asInstanceOf[ORSet[NSDbNodeWithTtl]].elements
550 +
      self ! RemoveNodesFromBlacklist(elements.filter(e => System.currentTimeMillis > e.createdAt + blackListTtl))
533 551
    case NotFound(_, Some(MetricLocationsRequest(key, replyTo))) =>
534 552
      replyTo ! LocationsCached(key.db, key.namespace, key.metric, Seq.empty)
535 553
    case NotFound(_, Some(MetricLocationsInNodeRequest(db, namespace, metric, _, replyTo))) =>
@@ -538,6 +556,7 @@
Loading
538 556
      replyTo ! OutdatedLocationsFromCacheGot(Set.empty)
539 557
    case NotFound(_, Some(NodesBlackListRequest(replyTo))) =>
540 558
      replyTo ! NodesBlackListGot(Set.empty)
559 +
    case NotFound(_, Some(NodesBlackListInternalRequest)) => //do nothing
541 560
    case NotFound(_, Some(MetricInfoRequest(key, replyTo))) =>
542 561
      replyTo ! MetricInfoCached(key.db, key.namespace, key.metric, None)
543 562
    case NotFound(_, Some(AllMetricInfoWithRetentionRequest(replyTo))) =>

No yaml found.

Create your codecov.yml to customize your Codecov experience

Sunburst
The inner-most circle is the entire project, moving away from the center are folders then, finally, a single file. The size and color of each slice is representing the number of statements and the coverage, respectively.
Icicle
The top section represents the entire project. Proceeding with folders and finally individual files. The size and color of each slice is representing the number of statements and the coverage, respectively.
Grid
Each block represents a single file in the project. The size and color of each block is represented by the number of statements and the coverage, respectively.
Loading