radicalbit / NSDb
Showing 9 of 26 files from the diff.

@@ -33,10 +33,10 @@
Loading
33 33
  */
34 34
object FileUtils {
35 35
36 -
  final val NODE_ID_EXTENSION = "name"
37 -
  final val NODE_ID_LENGTH    = 10
38 -
  final val BUFFER_SIZE       = 4096
39 -
  final val buffer            = new Array[Byte](BUFFER_SIZE)
36 +
  final val NODE_FS_ID_EXTENSION = "name"
37 +
  final val NODE_FS_ID_LENGTH    = 10
38 +
  final val BUFFER_SIZE          = 4096
39 +
  final val buffer               = new Array[Byte](BUFFER_SIZE)
40 40
41 41
  private class DirectoryFilter extends FileFilter {
42 42
    override def accept(pathname: File): Boolean = pathname.isDirectory
@@ -49,7 +49,7 @@
Loading
49 49
50 50
  private class NodeIdFilter extends FileFilter {
51 51
    override def accept(pathname: File): Boolean =
52 -
      !pathname.isDirectory && pathname.getName.endsWith(NODE_ID_EXTENSION)
52 +
      !pathname.isDirectory && pathname.getName.endsWith(NODE_FS_ID_EXTENSION)
53 53
  }
54 54
55 55
  /**
@@ -137,9 +137,9 @@
Loading
137 137
  def getOrCreateNodeFsId(address: String, basePath: String): String = {
138 138
    Option(Paths.get(basePath).toFile.listFiles(new NodeIdFilter)).getOrElse(Array.empty) match {
139 139
      case Array() =>
140 -
        val newName = RandomStringUtils.randomAlphabetic(NODE_ID_LENGTH)
140 +
        val newName = RandomStringUtils.randomAlphabetic(NODE_FS_ID_LENGTH)
141 141
        new File(basePath).mkdirs()
142 -
        new File(basePath, s"$newName.$NODE_ID_EXTENSION").createNewFile()
142 +
        new File(basePath, s"$newName.$NODE_FS_ID_EXTENSION").createNewFile()
143 143
        newName
144 144
      case Array(singleFile) => FilenameUtils.removeExtension(singleFile.getName)
145 145
      case _                 => throw new InvalidNodeIdException(address)

@@ -21,17 +21,18 @@
Loading
21 21
  * and an identifier associated to the file system, i.e. the concrete volume in which shards are stored.
22 22
  * @param nodeAddress the akka cluster node address.
23 23
  * @param nodeFsId the file system identifier.
24 +
  * @param volatileNodeUuid needed to be able to distinguish different incarnations of a node with same nodeAddress and nodeFsId.
24 25
  */
25 -
case class NSDbNode(nodeAddress: String, nodeFsId: String) extends NSDbSerializable {
26 -
  def uniqueNodeId: String = s"${nodeAddress}_$nodeFsId"
26 +
case class NSDbNode(nodeAddress: String, nodeFsId: String, volatileNodeUuid: String) extends NSDbSerializable {
27 +
  def uniqueNodeId: String = s"${nodeAddress}_${nodeFsId}_$volatileNodeUuid"
27 28
}
28 29
29 30
object NSDbNode {
30 31
31 32
  def fromUniqueId(uniqueIdentifier: String): NSDbNode = {
32 33
    val components = uniqueIdentifier.split('_')
33 -
    NSDbNode(components(0), components(1))
34 +
    NSDbNode(components(0), components(1), components(2))
34 35
  }
35 36
36 -
  def empty: NSDbNode = NSDbNode("", "")
37 +
  def empty: NSDbNode = NSDbNode("", "", "")
37 38
}

@@ -28,6 +28,7 @@
Loading
28 28
import io.radicalbit.nsdb.cluster.PubSubTopics._
29 29
import io.radicalbit.nsdb.cluster._
30 30
import io.radicalbit.nsdb.cluster.actor.NSDbMetricsEvents._
31 +
import io.radicalbit.nsdb.cluster.actor.NodeActorsGuardian.UpdateVolatileId
31 32
import io.radicalbit.nsdb.cluster.coordinator.MetadataCoordinator.commands.{
32 33
  AddLocations,
33 34
  GetOutdatedLocations,
@@ -47,6 +48,7 @@
Loading
47 48
  PublisherUnSubscribed
48 49
}
49 50
import io.radicalbit.nsdb.util.{ErrorManagementUtils, FileUtils, FutureRetryUtility}
51 +
import org.apache.commons.lang3.RandomStringUtils
50 52
51 53
import java.nio.file.{Files, NoSuchFileException, Paths}
52 54
import java.util.concurrent.TimeUnit
@@ -65,7 +67,7 @@
Loading
65 67
  protected lazy val cluster           = Cluster(context.system)
66 68
  private lazy val clusterMetricSystem = ClusterMetricsExtension(context.system)
67 69
  protected lazy val selfNodeName      = createNodeAddress(cluster.selfMember)
68 -
  protected lazy val nodeFsId            = FileUtils.getOrCreateNodeFsId(selfNodeName, config.getString(NSDBMetadataPath))
70 +
  protected lazy val nodeFsId          = FileUtils.getOrCreateNodeFsId(selfNodeName, config.getString(NSDBMetadataPath))
69 71
70 72
  private val mediator = DistributedPubSub(context.system).mediator
71 73
@@ -94,8 +96,6 @@
Loading
94 96
  implicit val scheduler: Scheduler = context.system.scheduler
95 97
  implicit val _log: LoggingAdapter = log
96 98
97 -
  private var nodeUuid: String = _
98 -
99 99
  def enableClusterMetricsExtension: Boolean
100 100
101 101
  override def preStart(): Unit = {
@@ -151,10 +151,13 @@
Loading
151 151
    case MemberUp(member) if member == cluster.selfMember =>
152 152
      log.info(s"Member with nodeId $nodeFsId and address ${member.address} is Up")
153 153
154 -
      val node = NSDbNode(createNodeAddress(member), nodeFsId)
154 +
      val volatileId = RandomStringUtils.randomAlphabetic(VOLATILE_ID_LENGTH)
155 +
156 +
      val node = NSDbNode(createNodeAddress(member), nodeFsId, volatileId)
155 157
156 158
      val nodeActorsGuardian = context.parent
157 159
      (for {
160 +
        _ <- nodeActorsGuardian ? UpdateVolatileId(volatileId)
158 161
        children @ NodeChildActorsGot(metadataCoordinator, _, _, _) <- (nodeActorsGuardian ? GetNodeChildActors)
159 162
          .mapTo[NodeChildActorsGot]
160 163
        outdatedLocations <- (children.metadataCoordinator ? GetOutdatedLocations).mapTo[OutdatedLocationsGot]
@@ -215,7 +218,7 @@
Loading
215 218
      val nodeName       = createNodeAddress(member)
216 219
      val nodeIdToRemove = NSDbClusterSnapshot(context.system).getNode(nodeName)
217 220
218 -
      unsubscribeNode(nodeIdToRemove.nodeFsId)
221 +
      unsubscribeNode(nodeIdToRemove.uniqueNodeId)
219 222
220 223
      NSDbClusterSnapshot(context.system).removeNode(nodeName)
221 224
    case _: MemberEvent => // ignore

@@ -91,9 +91,9 @@
Loading
91 91
    s"shard_reader-${location.node.nodeAddress}-${location.node.nodeFsId}-${location.metric}-${location.from}-${location.to}"
92 92
93 93
  private def location(actorName: String): Option[Location] =
94 -
    actorName.split("-").takeRight(5) match {
95 -
      case Array(nodeAddress, nodeFsId, metric, from, to) =>
96 -
        Some(Location(metric, NSDbNode(nodeAddress, nodeFsId), from.toLong, to.toLong))
94 +
    actorName.split("-").takeRight(6) match {
95 +
      case Array(nodeAddress, nodeFsId, volatileNodeId, metric, from, to) =>
96 +
        Some(Location(metric, NSDbNode(nodeAddress, nodeFsId, volatileNodeId), from.toLong, to.toLong))
97 97
      case _ => None
98 98
    }
99 99

@@ -22,8 +22,8 @@
Loading
22 22
import scala.collection.JavaConverters._
23 23
24 24
/**
25 -
  * Extension that is inspired by the akka [[Cluster]] extension with the purpose to store the current snapshot for a NSDb cluster.
26 -
  * Besides the (already provided by akka) [[Member]] information, the unique node identifier is snapshot
25 +
  * Extension that is inspired by the akka [[akka.cluster.Cluster]] extension with the purpose to store the current snapshot for a NSDb cluster.
26 +
  * Besides the (already provided by akka) [[akka.cluster.Member]] information, the unique node identifier is snapshot
27 27
  * and associated to an address, which may vary.
28 28
  */
29 29
class NSDbClusterSnapshotExtension(system: ExtendedActorSystem) extends Extension {
@@ -36,10 +36,11 @@
Loading
36 36
    * Adds a node and associate it to the a unique identifier
37 37
    * @param address the actual address of the node.
38 38
    * @param nodeId the node unique identifier.
39 +
    * @param volatileId the node volatile id
39 40
    */
40 -
  def addNode(address: String, nodeId: String): NSDbNode = {
41 +
  def addNode(address: String, nodeId: String, volatileId: String): NSDbNode = {
41 42
    system.log.debug(s"adding node with address $address and $nodeId to $threadSafeMap")
42 -
    threadSafeMap.put(address, NSDbNode(address, nodeId))
43 +
    threadSafeMap.put(address, NSDbNode(address, nodeId, volatileId))
43 44
  }
44 45
45 46
  def addNode(node: NSDbNode): NSDbNode = {

@@ -44,4 +44,6 @@
Loading
44 44
    final val NSDB_METRICS_TOPIC   = "nsdb-metrics"
45 45
    final val NSDB_LISTENERS_TOPIC = "nsdb-listeners"
46 46
  }
47 +
48 +
  final val VOLATILE_ID_LENGTH: Int = 10
47 49
}

@@ -27,7 +27,8 @@
Loading
27 27
                                   writeCoordinator: ActorRef,
28 28
                                   metadataCoordinator: ActorRef,
29 29
                                   publisherActor: ActorRef): Unit =
30 -
    new NsdbNodeEndpoint(nodeFsId, readCoordinator, writeCoordinator, metadataCoordinator, publisherActor)(context.system)
30 +
    new NsdbNodeEndpoint(nodeFsId, readCoordinator, writeCoordinator, metadataCoordinator, publisherActor)(
31 +
      context.system)
31 32
32 33
  protected def onFailureBehaviour(member: Member, error: Any): Unit = {
33 34
    log.error("received wrong response {}", error)

@@ -25,12 +25,13 @@
Loading
25 25
import io.radicalbit.nsdb.actors.PublisherActor
26 26
import io.radicalbit.nsdb.actors.supervision.OneForOneWithRetriesStrategy
27 27
import io.radicalbit.nsdb.cluster.PubSubTopics._
28 +
import io.radicalbit.nsdb.cluster.actor.NodeActorsGuardian.{UpdateVolatileId, VolatileIdUpdated}
28 29
import io.radicalbit.nsdb.cluster.coordinator._
29 30
import io.radicalbit.nsdb.cluster.createNodeAddress
30 31
import io.radicalbit.nsdb.cluster.extension.NSDbClusterSnapshot
31 32
import io.radicalbit.nsdb.cluster.logic.{CapacityWriteNodesSelectionLogic, LocalityReadNodesSelection}
32 33
import io.radicalbit.nsdb.common.configuration.NSDbConfig.HighLevel._
33 -
import io.radicalbit.nsdb.common.protocol.NSDbNode
34 +
import io.radicalbit.nsdb.common.protocol.{NSDbNode, NSDbSerializable}
34 35
import io.radicalbit.nsdb.exception.InvalidLocationsInNode
35 36
import io.radicalbit.nsdb.protocol.MessageProtocol.Commands._
36 37
import io.radicalbit.nsdb.util.FileUtils
@@ -47,21 +48,26 @@
Loading
47 48
48 49
  private val mediator = DistributedPubSub(context.system).mediator
49 50
50 -
  private val nodeAddress = createNodeAddress(selfMember)
51 +
  protected val nodeAddress: String = createNodeAddress(selfMember)
51 52
52 53
  protected lazy val selfNodeName: String = createNodeAddress(selfMember)
53 -
  protected lazy val nodeFsId: String       = FileUtils.getOrCreateNodeFsId(selfNodeName, config.getString(NSDBMetadataPath))
54 -
55 -
  protected lazy val node: NSDbNode = NSDbNode(nodeAddress, nodeFsId)
54 +
  protected lazy val nodeFsId: String     = FileUtils.getOrCreateNodeFsId(selfNodeName, config.getString(NSDBMetadataPath))
56 55
57 56
  private val config = context.system.settings.config
58 57
59 58
  private val indexBasePath = config.getString(StorageIndexPath)
60 59
60 +
  private val heartbeatInterval = FiniteDuration(
61 +
    context.system.settings.config.getDuration("nsdb.heartbeat.interval", TimeUnit.SECONDS),
62 +
    TimeUnit.SECONDS)
63 +
64 +
  protected var node: NSDbNode                   = _
65 +
  protected var heartBeatDispatcher: Cancellable = _
66 +
61 67
  if (config.hasPath(StorageTmpPath))
62 68
    System.setProperty("java.io.tmpdir", config.getString(StorageTmpPath))
63 69
64 -
  protected def actorNameSuffix: String = NSDbNode(nodeAddress, nodeFsId).uniqueNodeId
70 +
  protected def actorNameSuffix: String = node.uniqueNodeId
65 71
66 72
  private lazy val writeNodesSelectionLogic = new CapacityWriteNodesSelectionLogic(
67 73
    CapacityWriteNodesSelectionLogic.fromConfigValue(config.getString("nsdb.cluster.metrics-selector")))
@@ -93,6 +99,18 @@
Loading
93 99
  def createClusterListener: ActorRef =
94 100
    context.actorOf(Props[ClusterListener], name = s"cluster-listener_${createNodeAddress(selfMember)}")
95 101
102 +
  def updateVolatileId(volatileId: String) = {
103 +
    node = NSDbNode(nodeAddress, nodeFsId, volatileId)
104 +
105 +
    import context.dispatcher
106 +
107 +
    if (heartBeatDispatcher != null) heartBeatDispatcher.cancel()
108 +
109 +
    heartBeatDispatcher = context.system.scheduler.schedule(heartbeatInterval, heartbeatInterval) {
110 +
      mediator ! Publish(NSDB_LISTENERS_TOPIC, NodeAlive(node))
111 +
    }
112 +
  }
113 +
96 114
  private val clusterListener: ActorRef = createClusterListener
97 115
98 116
  protected lazy val schemaCoordinator: ActorRef = context.actorOf(
@@ -151,25 +169,13 @@
Loading
151 169
    s"metrics-data-actor_$actorNameSuffix"
152 170
  )
153 171
154 -
  override def preStart(): Unit = {
155 -
156 -
    val interval = FiniteDuration(
157 -
      context.system.settings.config.getDuration("nsdb.heartbeat.interval", TimeUnit.SECONDS),
158 -
      TimeUnit.SECONDS)
159 -
160 -
    import context.dispatcher
161 -
162 -
    /**
163 -
      * scheduler that disseminate gossip message to all the cluster listener actors
164 -
      */
165 -
    context.system.scheduler.schedule(interval, interval) {
166 -
      mediator ! Publish(NSDB_LISTENERS_TOPIC, NodeAlive(NSDbNode(nodeAddress, nodeFsId)))
167 -
    }
168 -
172 +
  override def preStart(): Unit =
169 173
    log.info(s"NodeActorGuardian is ready at ${self.path.name}")
170 -
  }
171 174
172 175
  def receive: Receive = {
176 +
    case UpdateVolatileId(volatileId) =>
177 +
      updateVolatileId(volatileId)
178 +
      sender() ! VolatileIdUpdated(node)
173 179
    case GetNodeChildActors =>
174 180
      sender ! NodeChildActorsGot(metadataCoordinator, writeCoordinator, readCoordinator, publisherActor)
175 181
    case GetMetricsDataActors =>
@@ -183,3 +189,8 @@
Loading
183 189
      mediator ! Publish(COORDINATORS_TOPIC, SubscribePublisher(publisherActor, node.uniqueNodeId))
184 190
  }
185 191
}
192 +
193 +
object NodeActorsGuardian {
194 +
  case class UpdateVolatileId(volatileId: String) extends NSDbSerializable
195 +
  case class VolatileIdUpdated(node: NSDbNode)    extends NSDbSerializable
196 +
}

@@ -172,9 +172,10 @@
Loading
172 172
      log.debug("WriteCoordinator commit log  actor : {}", commitLogCoordinators.size)
173 173
    }
174 174
175 -
    context.system.scheduler.schedule(FiniteDuration(0, "ms"), retentionCheckInterval) {
176 -
      metadataCache ! GetAllMetricInfoWithRetention
177 -
    }
175 +
    context.system.scheduler.scheduleAtFixedRate(FiniteDuration(0, "ms"),
176 +
                                                 retentionCheckInterval,
177 +
                                                 metadataCache,
178 +
                                                 GetAllMetricInfoWithRetention)
178 179
179 180
  }
180 181
Notifications are pending CI completion. Periodically Codecov will check the CI state, when complete notifications will be submitted. Push notifications now.

No yaml found.

Create your codecov.yml to customize your Codecov experience

Sunburst
The inner-most circle is the entire project, moving away from the center are folders then, finally, a single file. The size and color of each slice is representing the number of statements and the coverage, respectively.
Icicle
The top section represents the entire project. Proceeding with folders and finally individual files. The size and color of each slice is representing the number of statements and the coverage, respectively.
Grid
Each block represents a single file in the project. The size and color of each block is represented by the number of statements and the coverage, respectively.
Loading