radicalbit / NSDb

Compare 6843baf ... +3 ... ac20113

Coverage Reach
nsdb-core/src/main/scala/io/radicalbit/nsdb/actors/PublisherActor.scala nsdb-core/src/main/scala/io/radicalbit/nsdb/actors/ShardReaderActor.scala nsdb-core/src/main/scala/io/radicalbit/nsdb/actors/MetricReaderActor.scala nsdb-core/src/main/scala/io/radicalbit/nsdb/actors/MetricPerformerActor.scala nsdb-core/src/main/scala/io/radicalbit/nsdb/actors/MetricAccumulatorActor.scala nsdb-core/src/main/scala/io/radicalbit/nsdb/actors/MetricsActor.scala nsdb-core/src/main/scala/io/radicalbit/nsdb/actors/supervision/OneForOneWithRetriesStrategy.scala nsdb-core/src/main/scala/io/radicalbit/nsdb/actors/ShardOperation.scala nsdb-core/src/main/scala/io/radicalbit/nsdb/index/AbstractStructuredIndex.scala nsdb-core/src/main/scala/io/radicalbit/nsdb/index/TypeSupport.scala nsdb-core/src/main/scala/io/radicalbit/nsdb/index/AllFacetIndexes.scala nsdb-core/src/main/scala/io/radicalbit/nsdb/index/lucene/Index.scala nsdb-core/src/main/scala/io/radicalbit/nsdb/index/lucene/OrderedTaxonomyFacetCounts.scala nsdb-core/src/main/scala/io/radicalbit/nsdb/index/FacetSumIndex.scala nsdb-core/src/main/scala/io/radicalbit/nsdb/index/FacetCountIndex.scala nsdb-core/src/main/scala/io/radicalbit/nsdb/index/FacetRangeIndex.scala nsdb-core/src/main/scala/io/radicalbit/nsdb/index/SumAndCountFacetCombiner.scala nsdb-core/src/main/scala/io/radicalbit/nsdb/index/FacetIndex.scala nsdb-core/src/main/scala/io/radicalbit/nsdb/index/DirectorySupport.scala nsdb-core/src/main/scala/io/radicalbit/nsdb/index/StorageStrategy.scala nsdb-core/src/main/scala/io/radicalbit/nsdb/index/SimpleIndex.scala nsdb-core/src/main/scala/io/radicalbit/nsdb/index/TemporaryIndex.scala nsdb-core/src/main/scala/io/radicalbit/nsdb/commit_log/StandardCommitLogSerializer.scala nsdb-core/src/main/scala/io/radicalbit/nsdb/commit_log/RollingCommitLogFileWriter.scala nsdb-core/src/main/scala/io/radicalbit/nsdb/commit_log/RollingCommitLogFileChecker.scala nsdb-core/src/main/scala/io/radicalbit/nsdb/commit_log/CommitLogWriterActor.scala nsdb-core/src/main/scala/io/radicalbit/nsdb/commit_log/CommitLogFile.scala nsdb-core/src/main/scala/io/radicalbit/nsdb/statement/ExpressionParser.scala nsdb-core/src/main/scala/io/radicalbit/nsdb/statement/StatementParser.scala nsdb-core/src/main/scala/io/radicalbit/nsdb/statement/TimeRangeManager.scala nsdb-core/src/main/scala/io/radicalbit/nsdb/statement/FieldsParser.scala nsdb-core/src/main/scala/io/radicalbit/nsdb/statement/StatementParserErrors.scala nsdb-core/src/main/scala/io/radicalbit/nsdb/post_proc/package.scala nsdb-core/src/main/scala/io/radicalbit/nsdb/util/FileUtils.scala nsdb-core/src/main/scala/io/radicalbit/nsdb/util/ErrorManagementUtils.scala nsdb-core/src/main/scala/io/radicalbit/nsdb/util/PipeableFutureWithSideEffect.scala nsdb-core/src/main/scala/io/radicalbit/nsdb/util/FutureRetryUtility.scala nsdb-core/src/main/scala/io/radicalbit/nsdb/util/ActorPathLogging.scala nsdb-core/src/main/scala/io/radicalbit/nsdb/model/Schema.scala nsdb-core/src/main/scala/io/radicalbit/nsdb/model/Location.scala nsdb-core/src/main/scala/io/radicalbit/nsdb/model/TimeRange.scala nsdb-core/src/main/scala/io/radicalbit/nsdb/protocol/MessageProtocol.scala nsdb-core/src/main/scala/io/radicalbit/nsdb/monitoring/NSDbMonitoring.scala nsdb-cluster/src/main/scala/io/radicalbit/nsdb/cluster/coordinator/MetadataCoordinator.scala nsdb-cluster/src/main/scala/io/radicalbit/nsdb/cluster/coordinator/WriteCoordinator.scala nsdb-cluster/src/main/scala/io/radicalbit/nsdb/cluster/coordinator/ReadCoordinator.scala nsdb-cluster/src/main/scala/io/radicalbit/nsdb/cluster/coordinator/SchemaCoordinator.scala nsdb-cluster/src/main/scala/io/radicalbit/nsdb/cluster/coordinator/CommitLogCoordinator.scala nsdb-cluster/src/main/scala/io/radicalbit/nsdb/cluster/actor/ReplicatedMetadataCache.scala nsdb-cluster/src/main/scala/io/radicalbit/nsdb/cluster/actor/AbstractClusterListener.scala nsdb-cluster/src/main/scala/io/radicalbit/nsdb/cluster/actor/MetricsDataActor.scala nsdb-cluster/src/main/scala/io/radicalbit/nsdb/cluster/actor/NodeActorsGuardian.scala nsdb-cluster/src/main/scala/io/radicalbit/nsdb/cluster/actor/ReplicatedSchemaCache.scala nsdb-cluster/src/main/scala/io/radicalbit/nsdb/cluster/actor/SequentialFutureProcessing.scala nsdb-cluster/src/main/scala/io/radicalbit/nsdb/cluster/actor/DatabaseActorsGuardian.scala nsdb-cluster/src/main/scala/io/radicalbit/nsdb/cluster/actor/ClusterListener.scala nsdb-cluster/src/main/scala/io/radicalbit/nsdb/cluster/endpoint/GrpcEndpoint.scala nsdb-cluster/src/main/scala/io/radicalbit/nsdb/cluster/logic/WriteConfig.scala nsdb-cluster/src/main/scala/io/radicalbit/nsdb/cluster/logic/ReadNodesSelection.scala nsdb-cluster/src/main/scala/io/radicalbit/nsdb/cluster/logic/CapacityWriteNodesSelectionLogic.scala nsdb-cluster/src/main/scala/io/radicalbit/nsdb/cluster/logic/LocalityReadNodesSelection.scala nsdb-cluster/src/main/scala/io/radicalbit/nsdb/cluster/NSDbActors.scala nsdb-cluster/src/main/scala/io/radicalbit/nsdb/cluster/extension/NSDbClusterSnapshot.scala nsdb-cluster/src/main/scala/io/radicalbit/nsdb/cluster/extension/RemoteAddress.scala nsdb-cluster/src/main/scala/io/radicalbit/nsdb/cluster/package.scala nsdb-cluster/src/main/scala/io/radicalbit/nsdb/cluster/NsdbNodeEndpoint.scala nsdb-cluster/src/main/scala/io/radicalbit/nsdb/cluster/metrics/NSDbMetrics.scala nsdb-cluster/src/main/scala/io/radicalbit/nsdb/cluster/metrics/DiskMetricsSelector.scala nsdb-cluster/src/main/scala/io/radicalbit/nsdb/cluster/NsdbCluster.scala nsdb-cluster/src/main/scala/io/radicalbit/nsdb/cluster/ProductionCluster.scala nsdb-cluster/src/main/scala/io/radicalbit/nsdb/cluster/NsdbPerfLogger.scala nsdb-http/src/main/scala/io/radicalbit/nsdb/web/routes/CommandApi.scala nsdb-http/src/main/scala/io/radicalbit/nsdb/web/routes/QueryApi.scala nsdb-http/src/main/scala/io/radicalbit/nsdb/web/routes/QueryValidationApi.scala nsdb-http/src/main/scala/io/radicalbit/nsdb/web/routes/DataApi.scala nsdb-http/src/main/scala/io/radicalbit/nsdb/web/NSDbJson.scala nsdb-http/src/main/scala/io/radicalbit/nsdb/web/WsResources.scala nsdb-http/src/main/scala/io/radicalbit/nsdb/web/actor/StreamActor.scala nsdb-http/src/main/scala/io/radicalbit/nsdb/web/WebResources.scala nsdb-http/src/main/scala/io/radicalbit/nsdb/web/NSDbHttpSecurityDirective.scala nsdb-http/src/main/scala/io/radicalbit/nsdb/web/SSLSupport.scala nsdb-http/src/main/scala/io/radicalbit/nsdb/web/ApiResources.scala nsdb-http/src/main/scala/io/radicalbit/nsdb/web/Filters.scala nsdb-http/src/main/scala/io/radicalbit/nsdb/web/ApiSupport.scala nsdb-http/src/main/scala/io/radicalbit/nsdb/web/QueryEnriched.scala nsdb-http/src/main/scala/io/radicalbit/nsdb/web/CustomSerializers.scala nsdb-http/src/main/scala/io/radicalbit/nsdb/web/validation/Validators.scala nsdb-http/src/main/scala/io/radicalbit/nsdb/web/validation/ValidationDirective.scala nsdb-http/src/main/scala/io/radicalbit/nsdb/web/validation/Validator.scala nsdb-http/src/main/scala/io/radicalbit/nsdb/web/validation/ValidatorRegexHelpers.scala nsdb-http/src/main/scala/io/radicalbit/nsdb/web/swagger/SwaggerDocService.scala nsdb-sql/src/main/scala/io/radicalbit/nsdb/sql/parser/SQLStatementParser.scala nsdb-sql/src/main/scala/io/radicalbit/nsdb/sql/parser/CommandStatementParser.scala nsdb-sql/src/main/scala/io/radicalbit/nsdb/sql/parser/RegexNSDb.scala nsdb-common/src/main/scala/io/radicalbit/nsdb/common/configuration/NSDbConfig.scala nsdb-common/src/main/scala/io/radicalbit/nsdb/common/configuration/NSDbConfigProvider.scala nsdb-common/src/main/scala/io/radicalbit/nsdb/common/package.scala nsdb-common/src/main/scala/io/radicalbit/nsdb/common/statement/SQLStatement.scala nsdb-common/src/main/scala/io/radicalbit/nsdb/common/statement/SqlStatementSerialization.scala nsdb-common/src/main/scala/io/radicalbit/nsdb/common/protocol/Bit.scala nsdb-common/src/main/scala/io/radicalbit/nsdb/common/protocol/NSDbNode.scala nsdb-common/src/main/scala/io/radicalbit/nsdb/common/protocol/FieldClassType.scala nsdb-scala-api/src/main/scala/io/radicalbit/nsdb/api/scala/NSDB.scala nsdb-scala-api/src/main/scala/io/radicalbit/nsdb/api/scala/example/NSDBMain.scala nsdb-scala-api/src/main/scala/io/radicalbit/nsdb/api/scala/example/NSDbStreamingMain.scala nsdb-scala-api/src/main/scala/io/radicalbit/nsdb/api/scala/NSDBMetricInfo.scala nsdb-scala-api/src/main/scala/io/radicalbit/nsdb/api/scala/streaming/NSDbStreaming.scala nsdb-scala-api/src/main/scala/io/radicalbit/nsdb/api/scala/NSDBDescribe.scala nsdb-minicluster/src/main/scala/io/radicalbit/nsdb/minicluster/converters/BitConverters.scala nsdb-minicluster/src/main/scala/io/radicalbit/nsdb/minicluster/ws/WebSocketClient.scala nsdb-minicluster/src/main/scala/io/radicalbit/nsdb/minicluster/ws/SynchronizedBuffer.scala nsdb-minicluster/src/main/scala/io/radicalbit/nsdb/minicluster/test/MiniClusterSingleNodeSpec.scala nsdb-minicluster/src/main/scala/io/radicalbit/nsdb/minicluster/NSDbMiniCluster.scala nsdb-minicluster/src/main/scala/io/radicalbit/nsdb/minicluster/MiniClusterStarter.scala nsdb-minicluster/src/main/scala/io/radicalbit/nsdb/minicluster/NSDBAkkaMiniCluster.scala nsdb-java-api/src/main/scala/io/radicalbit/nsdb/api/java/ScalaUtils.scala

No flags found

Use flags to group coverage reports by test type, project and/or folders.
Then setup custom commit statuses and notifications for each flag.

e.g., #unittest #integration

#production #enterprise

#frontend #backend

Learn more about Codecov Flags here.

Showing 19 of 57 files from the diff.
Other files ignored by Codecov

@@ -19,8 +19,8 @@
Loading
19 19
import java.io._
20 20
import java.nio.file.{Path, Paths}
21 21
import java.util.zip.{ZipEntry, ZipFile}
22 -
23 22
import io.radicalbit.nsdb.common.exception.InvalidNodeIdException
23 +
import io.radicalbit.nsdb.common.protocol.NSDbNode
24 24
import io.radicalbit.nsdb.model.{Location, LocationWithCoordinates}
25 25
import org.apache.commons.io.FilenameUtils
26 26
import org.apache.commons.lang3.RandomStringUtils
@@ -33,10 +33,10 @@
Loading
33 33
  */
34 34
object FileUtils {
35 35
36 -
  private val NODE_ID_EXTENSION = "name"
37 -
  private val NODE_ID_LENGTH    = 10
38 -
  private val BUFFER_SIZE       = 4096
39 -
  private val buffer            = new Array[Byte](BUFFER_SIZE)
36 +
  final val NODE_FS_ID_EXTENSION = "name"
37 +
  final val NODE_FS_ID_LENGTH    = 10
38 +
  final val BUFFER_SIZE          = 4096
39 +
  final val buffer               = new Array[Byte](BUFFER_SIZE)
40 40
41 41
  private class DirectoryFilter extends FileFilter {
42 42
    override def accept(pathname: File): Boolean = pathname.isDirectory
@@ -49,7 +49,7 @@
Loading
49 49
50 50
  private class NodeIdFilter extends FileFilter {
51 51
    override def accept(pathname: File): Boolean =
52 -
      !pathname.isDirectory && pathname.getName.endsWith(NODE_ID_EXTENSION)
52 +
      !pathname.isDirectory && pathname.getName.endsWith(NODE_FS_ID_EXTENSION)
53 53
  }
54 54
55 55
  /**
@@ -99,10 +99,10 @@
Loading
99 99
    * The subfolders will be scanned according to this hierarchy.
100 100
    * indexBasePath -> database -> namespace -> "shards" -> metric_[from_timestamp]_[to_timestamp].
101 101
    * @param basePath the base path to begin the scan.
102 -
    * @param nodeName the node name that will be used for creating locations.
103 -
    *                 @return a list of [[LocationWithCoordinates]].
102 +
    * @param node the NSDb node.
103 +
    * @return a list of [[LocationWithCoordinates]].
104 104
    */
105 -
  def getLocationsFromFilesystem(basePath: String, nodeName: String): List[LocationWithCoordinates] =
105 +
  def getLocationsFromFilesystem(basePath: String, node: NSDbNode): List[LocationWithCoordinates] =
106 106
    FileUtils
107 107
      .getSubDirs(Paths.get(basePath))
108 108
      .flatMap { databaseDir =>
@@ -120,26 +120,26 @@
Loading
120 120
                    val metric          = fileName.split(s"_${from}").head
121 121
                    LocationWithCoordinates(database,
122 122
                                            namespaceDir.getName,
123 -
                                            Location(metric, nodeName, from.toLong, to.toLong))
123 +
                                            Location(metric, node, from.toLong, to.toLong))
124 124
                }
125 125
          }
126 126
      }
127 127
128 128
  /**
129 -
    * Creates (if not exists) a new unique id for a node and stores it in the file system.
129 +
    * Creates (if not exists) a new unique file system id for a node and stores it in the file system.
130 130
    * An already existing name will be used instead.
131 131
    * This method must be executed on the local node.
132 132
    * @param address the node address. This information will be connotate the exception in case of an error.
133 133
    * @param basePath the parent path in which create or search the unique id
134 134
    * @return the unique id for the current node.
135 135
    * @throws InvalidNodeIdException if multiple identifier are found.
136 136
    */
137 -
  def getOrCreateNodeId(address: String, basePath: String): String = {
137 +
  def getOrCreateNodeFsId(address: String, basePath: String): String = {
138 138
    Option(Paths.get(basePath).toFile.listFiles(new NodeIdFilter)).getOrElse(Array.empty) match {
139 139
      case Array() =>
140 -
        val newName = RandomStringUtils.randomAlphabetic(NODE_ID_LENGTH)
140 +
        val newName = RandomStringUtils.randomAlphabetic(NODE_FS_ID_LENGTH)
141 141
        new File(basePath).mkdirs()
142 -
        new File(basePath, s"$newName.$NODE_ID_EXTENSION").createNewFile()
142 +
        new File(basePath, s"$newName.$NODE_FS_ID_EXTENSION").createNewFile()
143 143
        newName
144 144
      case Array(singleFile) => FilenameUtils.removeExtension(singleFile.getName)
145 145
      case _                 => throw new InvalidNodeIdException(address)

@@ -0,0 +1,38 @@
Loading
1 +
/*
2 +
 * Copyright 2018-2020 Radicalbit S.r.l.
3 +
 *
4 +
 * Licensed under the Apache License, Version 2.0 (the "License");
5 +
 * you may not use this file except in compliance with the License.
6 +
 * You may obtain a copy of the License at
7 +
 *
8 +
 *     http://www.apache.org/licenses/LICENSE-2.0
9 +
 *
10 +
 * Unless required by applicable law or agreed to in writing, software
11 +
 * distributed under the License is distributed on an "AS IS" BASIS,
12 +
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 +
 * See the License for the specific language governing permissions and
14 +
 * limitations under the License.
15 +
 */
16 +
17 +
package io.radicalbit.nsdb.common.protocol
18 +
19 +
/**
20 +
  * A NSDb node must contain an identifier associated to the akka cluster address
21 +
  * and an identifier associated to the file system, i.e. the concrete volume in which shards are stored.
22 +
  * @param nodeAddress the akka cluster node address.
23 +
  * @param nodeFsId the file system identifier.
24 +
  * @param volatileNodeUuid needed to be able to distinguish different incarnations of a node with same nodeAddress and nodeFsId.
25 +
  */
26 +
case class NSDbNode(nodeAddress: String, nodeFsId: String, volatileNodeUuid: String) extends NSDbSerializable {
27 +
  def uniqueNodeId: String = s"${nodeAddress}_${nodeFsId}_$volatileNodeUuid"
28 +
}
29 +
30 +
object NSDbNode {
31 +
32 +
  def fromUniqueId(uniqueIdentifier: String): NSDbNode = {
33 +
    val components = uniqueIdentifier.split('_')
34 +
    NSDbNode(components(0), components(1), components(2))
35 +
  }
36 +
37 +
  def empty: NSDbNode = NSDbNode("", "", "")
38 +
}

@@ -28,7 +28,7 @@
Loading
28 28
import io.radicalbit.nsdb.cluster.logic.WriteConfig
29 29
import io.radicalbit.nsdb.commit_log.CommitLogWriterActor._
30 30
import io.radicalbit.nsdb.common.configuration.NSDbConfig
31 -
import io.radicalbit.nsdb.common.protocol.Bit
31 +
import io.radicalbit.nsdb.common.protocol.{Bit, NSDbNode}
32 32
import io.radicalbit.nsdb.common.statement.DeleteSQLStatement
33 33
import io.radicalbit.nsdb.index.{DirectorySupport, StorageStrategy}
34 34
import io.radicalbit.nsdb.model.{Location, Schema, TimeContext}
@@ -177,7 +177,7 @@
Loading
177 177
                       metric: String,
178 178
                       bit: Bit,
179 179
                       location: Location): Future[WriteCoordinatorResponse] =
180 -
    metricsDataActors.get(location.node) match {
180 +
    metricsDataActors.get(location.node.uniqueNodeId) match {
181 181
      case Some(actor) =>
182 182
        (actor ? AddRecordToShard(db, namespace, location, bit)).map {
183 183
          case msg: RecordAccumulated => msg
@@ -192,14 +192,14 @@
Loading
192 192
                           System.currentTimeMillis())
193 193
        }
194 194
      case None =>
195 -
        log.error(s"no data actor for node ${location.node}")
195 +
        log.error(s"no data actor for node ${location.node.uniqueNodeId}")
196 196
        Future(
197 197
          RecordRejected(db,
198 198
                         namespace,
199 199
                         metric,
200 200
                         bit,
201 201
                         location,
202 -
                         List(s"no data actor for node ${location.node}"),
202 +
                         List(s"no data actor for node ${location.node.uniqueNodeId}"),
203 203
                         System.currentTimeMillis()))
204 204
    }
205 205
@@ -239,7 +239,7 @@
Loading
239 239
                       namespace,
240 240
                       response.ts,
241 241
                       metric,
242 -
                       response.location.node,
242 +
                       response.location.node.uniqueNodeId,
243 243
                       RejectedEntryAction(bit),
244 244
                       response.location)
245 245
      }
@@ -249,7 +249,7 @@
Loading
249 249
          namespace,
250 250
          metric,
251 251
          bit,
252 -
          Location(metric, "", 0, 0),
252 +
          Location(metric, NSDbNode.empty, 0, 0),
253 253
          List(s"Error in CommitLog write request with reasons: ${failedResponses.map(_.reason)}"),
254 254
          System.currentTimeMillis()
255 255
        ))
@@ -276,13 +276,15 @@
Loading
276 276
      val accumulationResult = Future
277 277
        .sequence {
278 278
          responses.map { accumulatorResponse =>
279 -
            writeCommitLog(db,
280 -
                           namespace,
281 -
                           accumulatorResponse.timestamp,
282 -
                           metric,
283 -
                           accumulatorResponse.location.node,
284 -
                           AccumulatedEntryAction(bit),
285 -
                           accumulatorResponse.location)
279 +
            writeCommitLog(
280 +
              db,
281 +
              namespace,
282 +
              accumulatorResponse.timestamp,
283 +
              metric,
284 +
              accumulatorResponse.location.node.uniqueNodeId,
285 +
              AccumulatedEntryAction(bit),
286 +
              accumulatorResponse.location
287 +
            )
286 288
          }
287 289
        }
288 290
        .flatMap { responses =>
@@ -295,7 +297,7 @@
Loading
295 297
      // case in which the accumulation on some node failed
296 298
      // we need to delete the accumulated bits on the successful node commit in these nodes a RejectedEntry
297 299
      succeedResponses.foreach { response =>
298 -
        metricsDataActors.get(response.location.node) match {
300 +
        metricsDataActors.get(response.location.node.uniqueNodeId) match {
299 301
          case Some(mda) => mda ! DeleteRecordFromShard(db, namespace, response.location, bit)
300 302
          case None      =>
301 303
        }
@@ -304,13 +306,15 @@
Loading
304 306
      val commitLogRejection = Future
305 307
        .sequence {
306 308
          responses.map { accumulatorResponse =>
307 -
            writeCommitLog(db,
308 -
                           namespace,
309 -
                           accumulatorResponse.timestamp,
310 -
                           metric,
311 -
                           accumulatorResponse.location.node,
312 -
                           RejectedEntryAction(bit),
313 -
                           accumulatorResponse.location)
309 +
            writeCommitLog(
310 +
              db,
311 +
              namespace,
312 +
              accumulatorResponse.timestamp,
313 +
              metric,
314 +
              accumulatorResponse.location.node.uniqueNodeId,
315 +
              RejectedEntryAction(bit),
316 +
              accumulatorResponse.location
317 +
            )
314 318
          }
315 319
        }
316 320
        .flatMap { responses =>
@@ -321,7 +325,7 @@
Loading
321 325
                               namespace,
322 326
                               metric,
323 327
                               bit,
324 -
                               Location(metric, "", 0, 0),
328 +
                               Location(metric, NSDbNode.empty, 0, 0),
325 329
                               List(""),
326 330
                               System.currentTimeMillis())))
327 331
        }
@@ -341,7 +345,7 @@
Loading
341 345
    val commitLogResponses: Future[Seq[CommitLogResponse]] =
342 346
      Future
343 347
        .sequence(locations.map { loc =>
344 -
          writeCommitLog(db, namespace, timestamp, metric, loc.node, ReceivedEntryAction(bit), loc)
348 +
          writeCommitLog(db, namespace, timestamp, metric, loc.node.uniqueNodeId, ReceivedEntryAction(bit), loc)
345 349
        })
346 350
347 351
    commitLogResponses
@@ -437,14 +441,14 @@
Loading
437 441
        locations <- Future
438 442
          .sequence {
439 443
            metrics.map(metric =>
440 -
              (metadataCoordinator ? GetLocations(db, namespace, metric)).mapTo[LocationsGot].map(_.locations))
444 +
              (metadataCoordinator ? GetLiveLocations(db, namespace, metric)).mapTo[LiveLocationsGot].map(_.locations))
441 445
          }
442 446
          .map(_.flatten)
443 447
        commitLogResponses <- Future.sequence {
444 448
          locations
445 449
            .collect {
446 -
              case location if commitLogCoordinators.contains(location.node) =>
447 -
                (location, commitLogCoordinators(location.node))
450 +
              case location if commitLogCoordinators.contains(location.node.uniqueNodeId) =>
451 +
                (location, commitLogCoordinators(location.node.uniqueNodeId))
448 452
            }
449 453
            .map {
450 454
              case (location, actor) =>
@@ -479,7 +483,7 @@
Loading
479 483
          metric,
480 484
          commitLogCoordinators.keys.head,
481 485
          DeleteAction(statement),
482 -
          Location(metric, commitLogCoordinators.keys.head, 0, 0)
486 +
          Location(metric, NSDbNode.fromUniqueId(commitLogCoordinators.keys.head), 0, 0)
483 487
        ).flatMap {
484 488
          case WriteToCommitLogSucceeded(_, _, _, _, _) =>
485 489
            if (metricsDataActors.isEmpty)
@@ -488,10 +492,10 @@
Loading
488 492
              (schemaCoordinator ? GetSchema(statement.db, statement.namespace, statement.metric))
489 493
                .flatMap {
490 494
                  case SchemaGot(_, _, _, Some(schema)) =>
491 -
                    (metadataCoordinator ? GetLocations(db, namespace, metric)).flatMap {
492 -
                      case LocationsGot(_, _, _, locations) if locations.isEmpty =>
495 +
                    (metadataCoordinator ? GetLiveLocations(db, namespace, metric)).flatMap {
496 +
                      case LiveLocationsGot(_, _, _, locations) if locations.isEmpty =>
493 497
                        Future(DeleteStatementExecuted(statement.db, statement.metric, statement.metric))
494 -
                      case LocationsGot(_, _, _, locations) =>
498 +
                      case LiveLocationsGot(_, _, _, locations) =>
495 499
                        broadcastMessage(ExecuteDeleteStatementInternalInLocations(statement, schema, locations))
496 500
                      case _ =>
497 501
                        Future(
@@ -514,12 +518,14 @@
Loading
514 518
      }.pipeTo(sender())
515 519
    case msg @ DropMetric(db, namespace, metric) =>
516 520
      val chain = for {
517 -
        locations <- (metadataCoordinator ? GetLocations(db, namespace, metric)).mapTo[LocationsGot].map(_.locations)
521 +
        locations <- (metadataCoordinator ? GetLiveLocations(db, namespace, metric))
522 +
          .mapTo[LiveLocationsGot]
523 +
          .map(_.locations)
518 524
        commitLogResponses <- Future.sequence {
519 525
          locations
520 526
            .collect {
521 -
              case location if commitLogCoordinators.contains(location.node) =>
522 -
                (location, commitLogCoordinators(location.node))
527 +
              case location if commitLogCoordinators.contains(location.node.uniqueNodeId) =>
528 +
                (location, commitLogCoordinators(location.node.uniqueNodeId))
523 529
            }
524 530
            .map {
525 531
              case (location, actor) =>

@@ -28,6 +28,7 @@
Loading
28 28
import io.radicalbit.nsdb.cluster.PubSubTopics._
29 29
import io.radicalbit.nsdb.cluster._
30 30
import io.radicalbit.nsdb.cluster.actor.NSDbMetricsEvents._
31 +
import io.radicalbit.nsdb.cluster.actor.NodeActorsGuardian.UpdateVolatileId
31 32
import io.radicalbit.nsdb.cluster.coordinator.MetadataCoordinator.commands.{
32 33
  AddLocations,
33 34
  GetOutdatedLocations,
@@ -38,6 +39,7 @@
Loading
38 39
import io.radicalbit.nsdb.cluster.logic.WriteConfig.MetadataConsistency
39 40
import io.radicalbit.nsdb.cluster.metrics.NSDbMetrics
40 41
import io.radicalbit.nsdb.common.configuration.NSDbConfig.HighLevel._
42 +
import io.radicalbit.nsdb.common.protocol.NSDbNode
41 43
import io.radicalbit.nsdb.model.LocationWithCoordinates
42 44
import io.radicalbit.nsdb.protocol.MessageProtocol.Commands._
43 45
import io.radicalbit.nsdb.protocol.MessageProtocol.Events.{
@@ -46,6 +48,7 @@
Loading
46 48
  PublisherUnSubscribed
47 49
}
48 50
import io.radicalbit.nsdb.util.{ErrorManagementUtils, FileUtils, FutureRetryUtility}
51 +
import org.apache.commons.lang3.RandomStringUtils
49 52
50 53
import java.nio.file.{Files, NoSuchFileException, Paths}
51 54
import java.util.concurrent.TimeUnit
@@ -63,8 +66,8 @@
Loading
63 66
64 67
  protected lazy val cluster           = Cluster(context.system)
65 68
  private lazy val clusterMetricSystem = ClusterMetricsExtension(context.system)
66 -
  protected lazy val selfNodeName      = createNodeName(cluster.selfMember)
67 -
  protected lazy val nodeId            = FileUtils.getOrCreateNodeId(selfNodeName, config.getString(NSDBMetadataPath))
69 +
  protected lazy val selfNodeName      = createNodeAddress(cluster.selfMember)
70 +
  protected lazy val nodeFsId          = FileUtils.getOrCreateNodeFsId(selfNodeName, config.getString(NSDBMetadataPath))
68 71
69 72
  private val mediator = DistributedPubSub(context.system).mediator
70 73
@@ -105,14 +108,8 @@
Loading
105 108
106 109
  override def postStop(): Unit = cluster.unsubscribe(self)
107 110
108 -
  private def createNodeActorGuardianName(nodeId: String, nodeName: String): String =
109 -
    s"guardian_${nodeId}_${nodeName}"
110 -
111 -
  private def createNodeActorGuardianPath(nodeId: String, nodeName: String): String =
112 -
    s"/user/${createNodeActorGuardianName(nodeId, nodeName)}"
113 -
114 -
  protected def retrieveLocationsToAdd: List[LocationWithCoordinates] =
115 -
    FileUtils.getLocationsFromFilesystem(indexPath, nodeId)
111 +
  protected def retrieveLocationsToAdd(node: NSDbNode): List[LocationWithCoordinates] =
112 +
    FileUtils.getLocationsFromFilesystem(indexPath, node)
116 113
117 114
  protected def onSuccessBehaviour(readCoordinator: ActorRef,
118 115
                                   writeCoordinator: ActorRef,
@@ -129,10 +126,9 @@
Loading
129 126
  }
130 127
131 128
  private def unsubscribeNode(otherNodeId: String)(implicit scheduler: Scheduler, _log: LoggingAdapter) = {
132 -
    log.info(s"unsubscribing node $otherNodeId from node $nodeId")
129 +
    log.info(s"unsubscribing node $otherNodeId from node $nodeFsId")
133 130
    (for {
134 -
      NodeChildActorsGot(metadataCoordinator, writeCoordinator, readCoordinator, _) <- (context.actorSelection(
135 -
        createNodeActorGuardianPath(nodeId, selfNodeName)) ? GetNodeChildActors)
131 +
      NodeChildActorsGot(metadataCoordinator, writeCoordinator, readCoordinator, _) <- (context.parent ? GetNodeChildActors)
136 132
        .mapTo[NodeChildActorsGot]
137 133
      _ <- (readCoordinator ? UnsubscribeMetricsDataActor(otherNodeId)).mapTo[MetricsDataActorUnSubscribed]
138 134
      _ <- (writeCoordinator ? UnSubscribeCommitLogCoordinator(otherNodeId))
@@ -153,19 +149,24 @@
Loading
153 149
154 150
  def receive: Receive = {
155 151
    case MemberUp(member) if member == cluster.selfMember =>
156 -
      log.info(s"Member with nodeId $nodeId and address ${member.address} is Up")
152 +
      log.info(s"Member with nodeId $nodeFsId and address ${member.address} is Up")
157 153
158 -
      val nodeActorsGuardian = context.parent
154 +
      val volatileId = RandomStringUtils.randomAlphabetic(VOLATILE_ID_LENGTH)
159 155
156 +
      val node = NSDbNode(createNodeAddress(member), nodeFsId, volatileId)
157 +
158 +
      val nodeActorsGuardian = context.parent
160 159
      (for {
160 +
        _ <- nodeActorsGuardian ? UpdateVolatileId(volatileId)
161 161
        children @ NodeChildActorsGot(metadataCoordinator, _, _, _) <- (nodeActorsGuardian ? GetNodeChildActors)
162 162
          .mapTo[NodeChildActorsGot]
163 163
        outdatedLocations <- (children.metadataCoordinator ? GetOutdatedLocations).mapTo[OutdatedLocationsGot]
164 164
        addLocationsResult <- {
165 165
166 -
          val locationsToAdd: Seq[LocationWithCoordinates] = retrieveLocationsToAdd.diff(outdatedLocations.locations)
166 +
          val locationsToAdd: Seq[LocationWithCoordinates] =
167 +
            retrieveLocationsToAdd(node).diff(outdatedLocations.locations)
167 168
168 -
          log.info(s"locations to add from node $nodeId \t$locationsToAdd")
169 +
          log.info(s"locations to add from node $nodeFsId \t$locationsToAdd")
169 170
170 171
          val locationsGroupedBy: Map[(String, String), Seq[LocationWithCoordinates]] = locationsToAdd.groupBy {
171 172
            case LocationWithCoordinates(database, namespace, _) => (database, namespace)
@@ -190,35 +191,34 @@
Loading
190 191
          case Success(
191 192
              (NodeChildActorsGot(metadataCoordinator, writeCoordinator, readCoordinator, publisherActor),
192 193
               (success, failures))) if failures.isEmpty =>
193 -
            log.info(s"location ${success} successfully added for node $nodeId")
194 -
            val nodeName = createNodeName(member)
194 +
            log.info(s"location ${success} successfully added for node $nodeFsId")
195 195
196 196
            val interval =
197 197
              FiniteDuration(context.system.settings.config.getDuration("nsdb.heartbeat.interval", TimeUnit.SECONDS),
198 198
                             TimeUnit.SECONDS)
199 199
200 200
            context.system.scheduler.schedule(interval, interval) {
201 -
              mediator ! Publish(NSDB_LISTENERS_TOPIC, NodeAlive(nodeId, selfNodeName))
201 +
              mediator ! Publish(NSDB_LISTENERS_TOPIC, NodeAlive(node))
202 202
            }
203 203
204 204
            mediator ! Subscribe(NODE_GUARDIANS_TOPIC, nodeActorsGuardian)
205 -
            mediator ! Publish(NSDB_LISTENERS_TOPIC, NodeAlive(nodeId, nodeName))
206 -
            NSDbClusterSnapshot(context.system).addNode(nodeName, nodeId)
205 +
            mediator ! Publish(NSDB_LISTENERS_TOPIC, NodeAlive(node))
206 +
            NSDbClusterSnapshot(context.system).addNode(node)
207 207
            onSuccessBehaviour(readCoordinator, writeCoordinator, metadataCoordinator, publisherActor)
208 208
          case e =>
209 209
            onFailureBehaviour(member, e)
210 210
        }
211 -
    case NodeAlive(nodeId, address) =>
212 -
      NSDbClusterSnapshot(context.system).addNode(address, nodeId)
211 +
    case NodeAlive(node) =>
212 +
      NSDbClusterSnapshot(context.system).addNode(node)
213 213
    case UnreachableMember(member) =>
214 214
      log.info("Member detected as unreachable: {}", member)
215 215
    case MemberRemoved(member, previousStatus) if member != cluster.selfMember =>
216 216
      log.warning("{} Member is Removed: {} after {}", selfNodeName, member.address, previousStatus)
217 217
218 -
      val nodeName       = createNodeName(member)
219 -
      val nodeIdToRemove = NSDbClusterSnapshot(context.system).getId(nodeName)
218 +
      val nodeName       = createNodeAddress(member)
219 +
      val nodeIdToRemove = NSDbClusterSnapshot(context.system).getNode(nodeName)
220 220
221 -
      unsubscribeNode(nodeIdToRemove.nodeId)
221 +
      unsubscribeNode(nodeIdToRemove.uniqueNodeId)
222 222
223 223
      NSDbClusterSnapshot(context.system).removeNode(nodeName)
224 224
    case _: MemberEvent => // ignore
@@ -237,7 +237,7 @@
Loading
237 237
      log.debug(s"nsdb metrics $nsdbMetrics")
238 238
    case ClusterMetricsChanged(nodeMetrics) =>
239 239
      log.debug(s"received metrics $nodeMetrics")
240 -
      akkaClusterMetrics = nodeMetrics.groupBy(nodeMetric => createNodeName(nodeMetric.address))
240 +
      akkaClusterMetrics = nodeMetrics.groupBy(nodeMetric => createNodeAddress(nodeMetric.address))
241 241
      Try {
242 242
        val fs = Files.getFileStore(Paths.get(indexPath))
243 243
        mediator ! Publish(NSDB_METRICS_TOPIC, DiskOccupationChanged(selfNodeName, fs.getUsableSpace, fs.getTotalSpace))

@@ -17,13 +17,13 @@
Loading
17 17
package io.radicalbit.nsdb.cluster.extension
18 18
19 19
import akka.actor.{ExtendedActorSystem, Extension, ExtensionId, ExtensionIdProvider}
20 -
import io.radicalbit.nsdb.protocol.MessageProtocol.Events.NSDbNode
20 +
import io.radicalbit.nsdb.common.protocol.NSDbNode
21 21
22 22
import scala.collection.JavaConverters._
23 23
24 24
/**
25 -
  * Extension that is inspired by the akka [[Cluster]] extension with the purpose to store the current snapshot for a NSDb cluster.
26 -
  * Besides the (already provided by akka) [[Member]] information, the unique node identifier is snapshot
25 +
  * Extension that is inspired by the akka [[akka.cluster.Cluster]] extension with the purpose to store the current snapshot for a NSDb cluster.
26 +
  * Besides the (already provided by akka) [[akka.cluster.Member]] information, the unique node identifier is snapshot
27 27
  * and associated to an address, which may vary.
28 28
  */
29 29
class NSDbClusterSnapshotExtension(system: ExtendedActorSystem) extends Extension {
@@ -36,10 +36,16 @@
Loading
36 36
    * Adds a node and associate it to the a unique identifier
37 37
    * @param address the actual address of the node.
38 38
    * @param nodeId the node unique identifier.
39 +
    * @param volatileId the node volatile id
39 40
    */
40 -
  def addNode(address: String, nodeId: String): NSDbNode = {
41 -
    system.log.debug(s"adding node with address $address to $threadSafeMap")
42 -
    threadSafeMap.put(address, NSDbNode(address, nodeId))
41 +
  def addNode(address: String, nodeId: String, volatileId: String): NSDbNode = {
42 +
    system.log.debug(s"adding node with address $address and $nodeId to $threadSafeMap")
43 +
    threadSafeMap.put(address, NSDbNode(address, nodeId, volatileId))
44 +
  }
45 +
46 +
  def addNode(node: NSDbNode): NSDbNode = {
47 +
    system.log.debug(s"adding node with address ${node.nodeAddress} to $threadSafeMap")
48 +
    threadSafeMap.put(node.nodeAddress, node)
43 49
  }
44 50
45 51
  /**
@@ -56,7 +62,7 @@
Loading
56 62
    */
57 63
  def nodes: Iterable[NSDbNode] = threadSafeMap.asScala.values
58 64
59 -
  def getId(address: String): NSDbNode = threadSafeMap.get(address)
65 +
  def getNode(address: String): NSDbNode = threadSafeMap.get(address)
60 66
}
61 67
62 68
object NSDbClusterSnapshot extends ExtensionId[NSDbClusterSnapshotExtension] with ExtensionIdProvider {

Click to load this diff.
Loading diff...

Click to load this diff.
Loading diff...

Click to load this diff.
Loading diff...

Click to load this diff.
Loading diff...

Click to load this diff.
Loading diff...

Click to load this diff.
Loading diff...

Click to load this diff.
Loading diff...

Click to load this diff.
Loading diff...

Click to load this diff.
Loading diff...

Click to load this diff.
Loading diff...

Click to load this diff.
Loading diff...

Click to load this diff.
Loading diff...

Click to load this diff.
Loading diff...

Click to load this diff.
Loading diff...

Learn more Showing 4 files with coverage changes found.

Changes in nsdb-core/src/main/scala/io/radicalbit/nsdb/index/SimpleIndex.scala
-3
+3
Loading file...
Changes in nsdb-core/src/main/scala/io/radicalbit/nsdb/util/ErrorManagementUtils.scala
-4
+4
Loading file...
Changes in nsdb-core/src/main/scala/io/radicalbit/nsdb/commit_log/RollingCommitLogFileChecker.scala
-1
+1
Loading file...
New file nsdb-common/src/main/scala/io/radicalbit/nsdb/common/protocol/NSDbNode.scala
New
Loading file...
Loading