radicalbit / NSDb

Compare 7a834f3 ... +0 ... 0acce0e

Coverage Reach
nsdb-core/src/main/scala/io/radicalbit/nsdb/actors/PublisherActor.scala nsdb-core/src/main/scala/io/radicalbit/nsdb/actors/ShardReaderActor.scala nsdb-core/src/main/scala/io/radicalbit/nsdb/actors/MetricReaderActor.scala nsdb-core/src/main/scala/io/radicalbit/nsdb/actors/MetricPerformerActor.scala nsdb-core/src/main/scala/io/radicalbit/nsdb/actors/MetricAccumulatorActor.scala nsdb-core/src/main/scala/io/radicalbit/nsdb/actors/MetricsActor.scala nsdb-core/src/main/scala/io/radicalbit/nsdb/actors/ShardOperation.scala nsdb-core/src/main/scala/io/radicalbit/nsdb/index/AbstractStructuredIndex.scala nsdb-core/src/main/scala/io/radicalbit/nsdb/index/TypeSupport.scala nsdb-core/src/main/scala/io/radicalbit/nsdb/index/AllFacetIndexes.scala nsdb-core/src/main/scala/io/radicalbit/nsdb/index/lucene/Index.scala nsdb-core/src/main/scala/io/radicalbit/nsdb/index/lucene/OrderedTaxonomyFacetCounts.scala nsdb-core/src/main/scala/io/radicalbit/nsdb/index/FacetSumIndex.scala nsdb-core/src/main/scala/io/radicalbit/nsdb/index/FacetCountIndex.scala nsdb-core/src/main/scala/io/radicalbit/nsdb/index/FacetRangeIndex.scala nsdb-core/src/main/scala/io/radicalbit/nsdb/index/SumAndCountFacetCombiner.scala nsdb-core/src/main/scala/io/radicalbit/nsdb/index/FacetIndex.scala nsdb-core/src/main/scala/io/radicalbit/nsdb/index/DirectorySupport.scala nsdb-core/src/main/scala/io/radicalbit/nsdb/index/StorageStrategy.scala nsdb-core/src/main/scala/io/radicalbit/nsdb/index/SimpleIndex.scala nsdb-core/src/main/scala/io/radicalbit/nsdb/index/TemporaryIndex.scala nsdb-core/src/main/scala/io/radicalbit/nsdb/commit_log/StandardCommitLogSerializer.scala nsdb-core/src/main/scala/io/radicalbit/nsdb/commit_log/RollingCommitLogFileWriter.scala nsdb-core/src/main/scala/io/radicalbit/nsdb/commit_log/RollingCommitLogFileChecker.scala nsdb-core/src/main/scala/io/radicalbit/nsdb/commit_log/CommitLogWriterActor.scala nsdb-core/src/main/scala/io/radicalbit/nsdb/commit_log/CommitLogFile.scala nsdb-core/src/main/scala/io/radicalbit/nsdb/statement/ExpressionParser.scala nsdb-core/src/main/scala/io/radicalbit/nsdb/statement/StatementParser.scala nsdb-core/src/main/scala/io/radicalbit/nsdb/statement/TimeRangeManager.scala nsdb-core/src/main/scala/io/radicalbit/nsdb/statement/FieldsParser.scala nsdb-core/src/main/scala/io/radicalbit/nsdb/statement/StatementParserErrors.scala nsdb-core/src/main/scala/io/radicalbit/nsdb/post_proc/package.scala nsdb-core/src/main/scala/io/radicalbit/nsdb/util/FileUtils.scala nsdb-core/src/main/scala/io/radicalbit/nsdb/util/ErrorManagementUtils.scala nsdb-core/src/main/scala/io/radicalbit/nsdb/util/PipeableFutureWithSideEffect.scala nsdb-core/src/main/scala/io/radicalbit/nsdb/util/FutureRetryUtility.scala nsdb-core/src/main/scala/io/radicalbit/nsdb/util/ActorPathLogging.scala nsdb-core/src/main/scala/io/radicalbit/nsdb/model/Schema.scala nsdb-core/src/main/scala/io/radicalbit/nsdb/model/Location.scala nsdb-core/src/main/scala/io/radicalbit/nsdb/model/TimeRange.scala nsdb-core/src/main/scala/io/radicalbit/nsdb/protocol/MessageProtocol.scala nsdb-core/src/main/scala/io/radicalbit/nsdb/monitoring/NSDbMonitoring.scala nsdb-cluster/src/main/scala/io/radicalbit/nsdb/cluster/coordinator/MetadataCoordinator.scala nsdb-cluster/src/main/scala/io/radicalbit/nsdb/cluster/coordinator/WriteCoordinator.scala nsdb-cluster/src/main/scala/io/radicalbit/nsdb/cluster/coordinator/ReadCoordinator.scala nsdb-cluster/src/main/scala/io/radicalbit/nsdb/cluster/coordinator/SchemaCoordinator.scala nsdb-cluster/src/main/scala/io/radicalbit/nsdb/cluster/coordinator/CommitLogCoordinator.scala nsdb-cluster/src/main/scala/io/radicalbit/nsdb/cluster/actor/ReplicatedMetadataCache.scala nsdb-cluster/src/main/scala/io/radicalbit/nsdb/cluster/actor/AbstractClusterListener.scala nsdb-cluster/src/main/scala/io/radicalbit/nsdb/cluster/actor/NodeActorsGuardian.scala nsdb-cluster/src/main/scala/io/radicalbit/nsdb/cluster/actor/MetricsDataActor.scala nsdb-cluster/src/main/scala/io/radicalbit/nsdb/cluster/actor/ReplicatedSchemaCache.scala nsdb-cluster/src/main/scala/io/radicalbit/nsdb/cluster/actor/SequentialFutureProcessing.scala nsdb-cluster/src/main/scala/io/radicalbit/nsdb/cluster/actor/DatabaseActorsGuardian.scala nsdb-cluster/src/main/scala/io/radicalbit/nsdb/cluster/actor/ClusterListener.scala nsdb-cluster/src/main/scala/io/radicalbit/nsdb/cluster/index/LocationIndex.scala nsdb-cluster/src/main/scala/io/radicalbit/nsdb/cluster/index/MetricInfoIndex.scala nsdb-cluster/src/main/scala/io/radicalbit/nsdb/cluster/endpoint/GrpcEndpoint.scala nsdb-cluster/src/main/scala/io/radicalbit/nsdb/cluster/logic/WriteConfig.scala nsdb-cluster/src/main/scala/io/radicalbit/nsdb/cluster/logic/ReadNodesSelection.scala nsdb-cluster/src/main/scala/io/radicalbit/nsdb/cluster/logic/CapacityWriteNodesSelectionLogic.scala nsdb-cluster/src/main/scala/io/radicalbit/nsdb/cluster/logic/LocalityReadNodesSelection.scala nsdb-cluster/src/main/scala/io/radicalbit/nsdb/cluster/NSDbActors.scala nsdb-cluster/src/main/scala/io/radicalbit/nsdb/cluster/NsdbNodeEndpoint.scala nsdb-cluster/src/main/scala/io/radicalbit/nsdb/cluster/package.scala nsdb-cluster/src/main/scala/io/radicalbit/nsdb/cluster/extension/NSDbClusterSnapshot.scala nsdb-cluster/src/main/scala/io/radicalbit/nsdb/cluster/extension/RemoteAddress.scala nsdb-cluster/src/main/scala/io/radicalbit/nsdb/cluster/metrics/NSDbMetrics.scala nsdb-cluster/src/main/scala/io/radicalbit/nsdb/cluster/metrics/DiskMetricsSelector.scala nsdb-cluster/src/main/scala/io/radicalbit/nsdb/cluster/NsdbCluster.scala nsdb-cluster/src/main/scala/io/radicalbit/nsdb/cluster/ProductionCluster.scala nsdb-cluster/src/main/scala/io/radicalbit/nsdb/cluster/NsdbPerfLogger.scala nsdb-http/src/main/scala/io/radicalbit/nsdb/web/routes/CommandApi.scala nsdb-http/src/main/scala/io/radicalbit/nsdb/web/routes/QueryApi.scala nsdb-http/src/main/scala/io/radicalbit/nsdb/web/routes/QueryValidationApi.scala nsdb-http/src/main/scala/io/radicalbit/nsdb/web/routes/DataApi.scala nsdb-http/src/main/scala/io/radicalbit/nsdb/web/NSDbJson.scala nsdb-http/src/main/scala/io/radicalbit/nsdb/web/WsResources.scala nsdb-http/src/main/scala/io/radicalbit/nsdb/web/actor/StreamActor.scala nsdb-http/src/main/scala/io/radicalbit/nsdb/web/WebResources.scala nsdb-http/src/main/scala/io/radicalbit/nsdb/web/NSDbHttpSecurityDirective.scala nsdb-http/src/main/scala/io/radicalbit/nsdb/web/SSLSupport.scala nsdb-http/src/main/scala/io/radicalbit/nsdb/web/ApiResources.scala nsdb-http/src/main/scala/io/radicalbit/nsdb/web/Filters.scala nsdb-http/src/main/scala/io/radicalbit/nsdb/web/ApiSupport.scala nsdb-http/src/main/scala/io/radicalbit/nsdb/web/QueryEnriched.scala nsdb-http/src/main/scala/io/radicalbit/nsdb/web/CustomSerializers.scala nsdb-http/src/main/scala/io/radicalbit/nsdb/web/validation/Validators.scala nsdb-http/src/main/scala/io/radicalbit/nsdb/web/validation/ValidationDirective.scala nsdb-http/src/main/scala/io/radicalbit/nsdb/web/validation/Validator.scala nsdb-http/src/main/scala/io/radicalbit/nsdb/web/validation/ValidatorRegexHelpers.scala nsdb-http/src/main/scala/io/radicalbit/nsdb/web/swagger/SwaggerDocService.scala nsdb-sql/src/main/scala/io/radicalbit/nsdb/sql/parser/SQLStatementParser.scala nsdb-sql/src/main/scala/io/radicalbit/nsdb/sql/parser/CommandStatementParser.scala nsdb-sql/src/main/scala/io/radicalbit/nsdb/sql/parser/RegexNSDb.scala nsdb-common/src/main/scala/io/radicalbit/nsdb/common/configuration/NSDbConfig.scala nsdb-common/src/main/scala/io/radicalbit/nsdb/common/configuration/NSDbConfigProvider.scala nsdb-common/src/main/scala/io/radicalbit/nsdb/common/package.scala nsdb-common/src/main/scala/io/radicalbit/nsdb/common/statement/SQLStatement.scala nsdb-common/src/main/scala/io/radicalbit/nsdb/common/statement/SqlStatementSerialization.scala nsdb-common/src/main/scala/io/radicalbit/nsdb/common/protocol/Bit.scala nsdb-common/src/main/scala/io/radicalbit/nsdb/common/protocol/FieldClassType.scala nsdb-scala-api/src/main/scala/io/radicalbit/nsdb/api/scala/NSDB.scala nsdb-scala-api/src/main/scala/io/radicalbit/nsdb/api/scala/example/NSDBMain.scala nsdb-scala-api/src/main/scala/io/radicalbit/nsdb/api/scala/example/NSDbStreamingMain.scala nsdb-scala-api/src/main/scala/io/radicalbit/nsdb/api/scala/NSDBMetricInfo.scala nsdb-scala-api/src/main/scala/io/radicalbit/nsdb/api/scala/streaming/NSDbStreaming.scala nsdb-scala-api/src/main/scala/io/radicalbit/nsdb/api/scala/NSDBDescribe.scala nsdb-minicluster/src/main/scala/io/radicalbit/nsdb/minicluster/converters/BitConverters.scala nsdb-minicluster/src/main/scala/io/radicalbit/nsdb/minicluster/ws/WebSocketClient.scala nsdb-minicluster/src/main/scala/io/radicalbit/nsdb/minicluster/ws/SynchronizedBuffer.scala nsdb-minicluster/src/main/scala/io/radicalbit/nsdb/minicluster/test/MiniClusterSingleNodeSpec.scala nsdb-minicluster/src/main/scala/io/radicalbit/nsdb/minicluster/NSDbMiniCluster.scala nsdb-minicluster/src/main/scala/io/radicalbit/nsdb/minicluster/MiniClusterStarter.scala nsdb-minicluster/src/main/scala/io/radicalbit/nsdb/minicluster/NSDBAkkaMiniCluster.scala nsdb-java-api/src/main/scala/io/radicalbit/nsdb/api/java/ScalaUtils.scala

No flags found

Use flags to group coverage reports by test type, project and/or folders.
Then setup custom commit statuses and notifications for each flag.

e.g., #unittest #integration

#production #enterprise

#frontend #backend

Learn more about Codecov Flags here.


@@ -107,10 +107,16 @@
Loading
107 107
108 108
  override def postStop(): Unit = cluster.unsubscribe(self)
109 109
110 +
  private def createNodeActorGuardianName(nodeId: String, nodeName: String): String =
111 +
    s"guardian_${nodeId}_${nodeName}"
112 +
113 +
  private def createNodeActorGuardianPath(nodeId: String, nodeName: String): String =
114 +
    s"/user/${createNodeActorGuardianName(nodeId, nodeName)}"
115 +
110 116
  protected def createNodeActorsGuardian(): ActorRef = {
111 117
    context.system.actorOf(
112 118
      NodeActorsGuardian.props(self, nodeId).withDeploy(Deploy(scope = RemoteScope(cluster.selfMember.address))),
113 -
      name = s"guardian_${nodeId}_${selfNodeName}"
119 +
      name = createNodeActorGuardianName(nodeId, selfNodeName)
114 120
    )
115 121
  }
116 122
@@ -131,22 +137,23 @@
Loading
131 137
      log.error(s"RemoveNodeMetadataFailed for node $nodeName")
132 138
  }
133 139
134 -
  private def unsubscribeNode(nodeName: String)(implicit scheduler: Scheduler, _log: LoggingAdapter) = {
140 +
  private def unsubscribeNode(otherNodeId: String)(implicit scheduler: Scheduler, _log: LoggingAdapter) = {
141 +
    log.info(s"unsubscribing node $otherNodeId from node $nodeId")
135 142
    (for {
136 143
      NodeChildActorsGot(metadataCoordinator, writeCoordinator, readCoordinator, _) <- (context.actorSelection(
137 -
        s"/user/guardian_$nodeName") ? GetNodeChildActors)
144 +
        createNodeActorGuardianPath(nodeId, selfNodeName)) ? GetNodeChildActors)
138 145
        .mapTo[NodeChildActorsGot]
139 -
      _ <- (readCoordinator ? UnsubscribeMetricsDataActor(nodeName)).mapTo[MetricsDataActorUnSubscribed]
140 -
      _ <- (writeCoordinator ? UnSubscribeCommitLogCoordinator(nodeName))
146 +
      _ <- (readCoordinator ? UnsubscribeMetricsDataActor(otherNodeId)).mapTo[MetricsDataActorUnSubscribed]
147 +
      _ <- (writeCoordinator ? UnSubscribeCommitLogCoordinator(otherNodeId))
141 148
        .mapTo[CommitLogCoordinatorUnSubscribed]
142 -
      _ <- (writeCoordinator ? UnSubscribePublisher(nodeName)).mapTo[PublisherUnSubscribed]
143 -
      _ <- (writeCoordinator ? UnsubscribeMetricsDataActor(nodeName))
149 +
      _ <- (writeCoordinator ? UnSubscribePublisher(otherNodeId)).mapTo[PublisherUnSubscribed]
150 +
      _ <- (writeCoordinator ? UnsubscribeMetricsDataActor(otherNodeId))
144 151
        .mapTo[MetricsDataActorUnSubscribed]
145 -
      _ <- (metadataCoordinator ? UnsubscribeMetricsDataActor(nodeName))
152 +
      _ <- (metadataCoordinator ? UnsubscribeMetricsDataActor(otherNodeId))
146 153
        .mapTo[MetricsDataActorUnSubscribed]
147 -
      _ <- (metadataCoordinator ? UnSubscribeCommitLogCoordinator(nodeName))
154 +
      _ <- (metadataCoordinator ? UnSubscribeCommitLogCoordinator(otherNodeId))
148 155
        .mapTo[CommitLogCoordinatorUnSubscribed]
149 -
      removeNodeMetadataResponse <- (metadataCoordinator ? RemoveNodeMetadata(nodeName))
156 +
      removeNodeMetadataResponse <- (metadataCoordinator ? RemoveNodeMetadata(otherNodeId))
150 157
        .mapTo[RemoveNodeMetadataResponse]
151 158
    } yield removeNodeMetadataResponse)
152 159
      .retry(delay, retries)(_.isInstanceOf[NodeMetadataRemoved])
@@ -155,7 +162,7 @@
Loading
155 162
156 163
  def receive: Receive = {
157 164
    case MemberUp(member) if member == cluster.selfMember =>
158 -
      log.info("Member is Up: {}", member.address)
165 +
      log.info(s"Member with nodeId $nodeId and address ${member.address} is Up")
159 166
160 167
      val nodeActorsGuardian = createNodeActorsGuardian()
161 168
@@ -167,7 +174,7 @@
Loading
167 174
168 175
          val locationsToAdd: Seq[LocationWithCoordinates] = retrieveLocationsToAdd.diff(outdatedLocations.locations)
169 176
170 -
          log.debug(s"locations to add from node $nodeId $locationsToAdd")
177 +
          log.info(s"locations to add from node $nodeId \t$locationsToAdd")
171 178
172 179
          val locationsGroupedBy: Map[(String, String), Seq[LocationWithCoordinates]] = locationsToAdd.groupBy {
173 180
            case LocationWithCoordinates(database, namespace, _) => (database, namespace)
@@ -191,7 +198,8 @@
Loading
191 198
        .onComplete {
192 199
          case Success(
193 200
              (NodeChildActorsGot(metadataCoordinator, writeCoordinator, readCoordinator, publisherActor),
194 -
               (_, failures))) if failures.isEmpty =>
201 +
               (success, failures))) if failures.isEmpty =>
202 +
            log.info(s"location ${success} successfully added for node $nodeId")
195 203
            val nodeName = createNodeName(member)
196 204
            mediator ! Subscribe(NODE_GUARDIANS_TOPIC, nodeActorsGuardian)
197 205
            mediator ! Publish(NSDB_LISTENERS_TOPIC, NodeAlive(nodeId, nodeName))
@@ -207,9 +215,10 @@
Loading
207 215
    case MemberRemoved(member, previousStatus) if member != cluster.selfMember =>
208 216
      log.warning("{} Member is Removed: {} after {}", selfNodeName, member.address, previousStatus)
209 217
210 -
      val nodeName = createNodeName(member)
218 +
      val nodeName       = createNodeName(member)
219 +
      val nodeIdToRemove = NSDbClusterSnapshot(context.system).getId(nodeName)
211 220
212 -
      unsubscribeNode(nodeName)
221 +
      unsubscribeNode(nodeIdToRemove.nodeId)
213 222
214 223
      NSDbClusterSnapshot(context.system).removeNode(nodeName)
215 224
    case _: MemberEvent => // ignore

@@ -17,6 +17,7 @@
Loading
17 17
package io.radicalbit.nsdb.cluster.extension
18 18
19 19
import akka.actor.{ExtendedActorSystem, Extension, ExtensionId, ExtensionIdProvider}
20 +
import io.radicalbit.nsdb.protocol.MessageProtocol.Events.NSDbNode
20 21
21 22
import scala.collection.JavaConverters._
22 23
@@ -29,33 +30,33 @@
Loading
29 30
30 31
  import java.util.concurrent.ConcurrentHashMap
31 32
32 -
  private val threadSafeMap: ConcurrentHashMap[String, String] = new ConcurrentHashMap[String, String]()
33 +
  private val threadSafeMap: ConcurrentHashMap[String, NSDbNode] = new ConcurrentHashMap[String, NSDbNode]()
33 34
34 35
  /**
35 36
    * Adds a node and associate it to the a unique identifier
36 37
    * @param address the actual address of the node.
37 38
    * @param nodeId the node unique identifier.
38 39
    */
39 -
  def addNode(address: String, nodeId: String): String = {
40 +
  def addNode(address: String, nodeId: String): NSDbNode = {
40 41
    system.log.debug(s"adding node with address $address to $threadSafeMap")
41 -
    threadSafeMap.put(address, nodeId)
42 +
    threadSafeMap.put(address, NSDbNode(address, nodeId))
42 43
  }
43 44
44 45
  /**
45 46
    * Removes a node.
46 47
    * @param address the actual node address.
47 48
    */
48 -
  def removeNode(address: String): String = {
49 +
  def removeNode(address: String): NSDbNode = {
49 50
    system.log.warning(s"removing node with address $address from $threadSafeMap")
50 51
    threadSafeMap.remove(address)
51 52
  }
52 53
53 54
  /**
54 55
    * Returns the current active nodes
55 56
    */
56 -
  def nodes: Set[(String, String)] = threadSafeMap.asScala.toSet
57 +
  def nodes: Iterable[NSDbNode] = threadSafeMap.asScala.values
57 58
58 -
  def getId(address: String): String = threadSafeMap.get(address)
59 +
  def getId(address: String): NSDbNode = threadSafeMap.get(address)
59 60
}
60 61
61 62
object NSDbClusterSnapshot extends ExtensionId[NSDbClusterSnapshotExtension] with ExtensionIdProvider {

@@ -17,6 +17,7 @@
Loading
17 17
package io.radicalbit.nsdb.web.routes
18 18
19 19
import akka.actor.ActorRef
20 +
import akka.event.LoggingAdapter
20 21
import akka.http.scaladsl.model.StatusCodes.{InternalServerError, NotFound}
21 22
import akka.http.scaladsl.model.{ContentTypes, HttpEntity, HttpResponse}
22 23
import akka.http.scaladsl.server.Directives._
@@ -52,6 +53,7 @@
Loading
52 53
  implicit val timeout: Timeout
53 54
  implicit val formats: Formats
54 55
  implicit val ec: ExecutionContext
56 +
  def logger: LoggingAdapter
55 57
56 58
  case class CommandRequestDatabase(db: String)
57 59
  case class CommandRequestNamespace(db: String, namespace: String)
@@ -64,6 +66,78 @@
Loading
64 66
  case class Field(name: String, `type`: String)
65 67
  case class DescribeMetricResponse(fields: Set[Field], metricInfo: Option[MetricInfo]) extends CommandResponse
66 68
69 +
  @Api(value = "/dbs", produces = "application/json")
70 +
  @Path("/dbs")
71 +
  @ApiOperation(
72 +
    value = "Perform show topology command. A topology is a representation of the cluster members",
73 +
    nickname = "show topology",
74 +
    httpMethod = "GET",
75 +
    response = classOf[TopologyGot]
76 +
  )
77 +
  @ApiResponses(
78 +
    Array(
79 +
      new ApiResponse(code = 500, message = "Internal server error")
80 +
    ))
81 +
  def topologyApi: Route =
82 +
    path("topology") {
83 +
      (pathEnd & get) {
84 +
        onComplete(metadataCoordinator ? GetTopology) {
85 +
          case Success(topology: TopologyGot) =>
86 +
            complete(HttpEntity(ContentTypes.`application/json`, write(topology)))
87 +
          case Success(wrongResponse) =>
88 +
            logger.error(s"received unexpected response $wrongResponse")
89 +
            complete(HttpResponse(InternalServerError, entity = "Unknown reason"))
90 +
          case Failure(ex) =>
91 +
            logger.error(ex, "unexpected error")
92 +
            complete(HttpResponse(InternalServerError, entity = ex.getMessage))
93 +
        }
94 +
      }
95 +
    }
96 +
97 +
  @Api(value = "locations/{db}/{namespace}/{metric}", produces = "application/json")
98 +
  @Path("/locations/{db}/{namespace}/{metric}")
99 +
  @ApiOperation(value = "Perform show locations given db, namespace, metric",
100 +
                nickname = "Show Locations",
101 +
                httpMethod = "GET",
102 +
                response = classOf[String])
103 +
  @ApiImplicitParams(
104 +
    Array(
105 +
      new ApiImplicitParam(name = "db", value = "database", required = true, dataType = "string", paramType = "path"),
106 +
      new ApiImplicitParam(name = "namespace",
107 +
                           value = "namespace",
108 +
                           required = true,
109 +
                           dataType = "string",
110 +
                           paramType = "path")
111 +
    ))
112 +
  @ApiResponses(
113 +
    Array(
114 +
      new ApiResponse(code = 500, message = "Internal server error")
115 +
    ))
116 +
  def locationsApi: Route = {
117 +
    pathPrefix("locations") {
118 +
      pathPrefix(Segment) { db =>
119 +
        pathPrefix(Segment) { namespace =>
120 +
          pathPrefix(Segment) { metric =>
121 +
            (pathEnd & get) {
122 +
              withMetricAuthorization(db, namespace, metric, false, authorizationProvider) {
123 +
                onComplete(metadataCoordinator ? GetLocations(db, namespace, metric)) {
124 +
                  case Success(response: LocationsGot) =>
125 +
                    complete(HttpEntity(ContentTypes.`application/json`, write(response)))
126 +
                  case Success(wrongResponse) =>
127 +
                    logger.error(s"received unexpected response $wrongResponse")
128 +
                    complete(HttpResponse(InternalServerError, entity = "Unknown reason"))
129 +
                  case Failure(ex) =>
130 +
                    logger.error(ex, "unexpected error")
131 +
                    complete(HttpResponse(InternalServerError, entity = ex.getMessage))
132 +
                }
133 +
              }
134 +
            }
135 +
          }
136 +
        }
137 +
      }
138 +
    }
139 +
  }
140 +
67 141
  @Api(value = "/dbs", produces = "application/json")
68 142
  @Path("/dbs")
69 143
  @ApiOperation(value = "Perform show dbs command",
@@ -75,18 +149,17 @@
Loading
75 149
      new ApiResponse(code = 500, message = "Internal server error")
76 150
    ))
77 151
  def showDbs: Route =
78 -
    pathPrefix("commands") {
79 -
      extractRequest { request =>
80 -
        request.header
81 -
        path("dbs") {
82 -
          (pathEnd & get) {
83 -
            onComplete(readCoordinator ? GetDbs) {
84 -
              case Success(DbsGot(dbs)) =>
85 -
                complete(HttpEntity(ContentTypes.`application/json`, write(ShowDbsResponse(dbs))))
86 -
              case Success(_)  => complete(HttpResponse(InternalServerError, entity = "Unknown reason"))
87 -
              case Failure(ex) => complete(HttpResponse(InternalServerError, entity = ex.getMessage))
88 -
            }
89 -
          }
152 +
    path("dbs") {
153 +
      (pathEnd & get) {
154 +
        onComplete(readCoordinator ? GetDbs) {
155 +
          case Success(DbsGot(dbs)) =>
156 +
            complete(HttpEntity(ContentTypes.`application/json`, write(ShowDbsResponse(dbs))))
157 +
          case Success(wrongResponse) =>
158 +
            logger.error(s"received unexpected response $wrongResponse")
159 +
            complete(HttpResponse(InternalServerError, entity = "Unknown reason"))
160 +
          case Failure(ex) =>
161 +
            logger.error(ex, "unexpected error")
162 +
            complete(HttpResponse(InternalServerError, entity = ex.getMessage))
90 163
        }
91 164
      }
92 165
    }
@@ -106,17 +179,19 @@
Loading
106 179
      new ApiResponse(code = 500, message = "Internal server error")
107 180
    ))
108 181
  def showNamespaces: Route =
109 -
    pathPrefix("commands") {
110 -
      pathPrefix(Segment) { db =>
111 -
        path("namespaces") {
112 -
          (pathEnd & get) {
113 -
            withDbAuthorization(db, false, authorizationProvider) {
114 -
              onComplete(readCoordinator ? GetNamespaces(db)) {
115 -
                case Success(NamespacesGot(_, namespaces)) =>
116 -
                  complete(HttpEntity(ContentTypes.`application/json`, write(ShowNamespacesResponse(namespaces))))
117 -
                case Success(_)  => complete(HttpResponse(InternalServerError, entity = "Unknown reason"))
118 -
                case Failure(ex) => complete(HttpResponse(InternalServerError, entity = ex.getMessage))
119 -
              }
182 +
    pathPrefix(Segment) { db =>
183 +
      path("namespaces") {
184 +
        (pathEnd & get) {
185 +
          withDbAuthorization(db, false, authorizationProvider) {
186 +
            onComplete(readCoordinator ? GetNamespaces(db)) {
187 +
              case Success(NamespacesGot(_, namespaces)) =>
188 +
                complete(HttpEntity(ContentTypes.`application/json`, write(ShowNamespacesResponse(namespaces))))
189 +
              case Success(wrongResponse) =>
190 +
                logger.error(s"received unexpected response $wrongResponse")
191 +
                complete(HttpResponse(InternalServerError, entity = "Unknown reason"))
192 +
              case Failure(ex) =>
193 +
                logger.error(ex, "unexpected error")
194 +
                complete(HttpResponse(InternalServerError, entity = ex.getMessage))
120 195
            }
121 196
          }
122 197
        }
@@ -143,17 +218,19 @@
Loading
143 218
      new ApiResponse(code = 500, message = "Internal server error")
144 219
    ))
145 220
  def dropNamespace: Route = {
146 -
    pathPrefix("commands") {
147 -
      pathPrefix(Segment) { db =>
148 -
        pathPrefix(Segment) { namespace =>
149 -
          pathEnd {
150 -
            delete {
151 -
              withNamespaceAuthorization(db, namespace, true, authorizationProvider) {
152 -
                onComplete(writeCoordinator ? DeleteNamespace(db, namespace)) {
153 -
                  case Success(NamespaceDeleted(_, _)) => complete("Ok")
154 -
                  case Success(_)                      => complete(HttpResponse(InternalServerError, entity = "Unknown reason"))
155 -
                  case Failure(ex)                     => complete(HttpResponse(InternalServerError, entity = ex.getMessage))
156 -
                }
221 +
    pathPrefix(Segment) { db =>
222 +
      pathPrefix(Segment) { namespace =>
223 +
        pathEnd {
224 +
          delete {
225 +
            withNamespaceAuthorization(db, namespace, true, authorizationProvider) {
226 +
              onComplete(writeCoordinator ? DeleteNamespace(db, namespace)) {
227 +
                case Success(NamespaceDeleted(_, _)) => complete("Ok")
228 +
                case Success(wrongResponse) =>
229 +
                  logger.error(s"received unexpected response $wrongResponse")
230 +
                  complete(HttpResponse(InternalServerError, entity = "Unknown reason"))
231 +
                case Failure(ex) =>
232 +
                  logger.error(ex, "unexpected error")
233 +
                  complete(HttpResponse(InternalServerError, entity = ex.getMessage))
157 234
              }
158 235
            }
159 236
          }
@@ -182,19 +259,21 @@
Loading
182 259
      new ApiResponse(code = 500, message = "Internal server error")
183 260
    ))
184 261
  def showMetrics: Route =
185 -
    pathPrefix("commands") {
186 -
      pathPrefix(Segment) { db =>
187 -
        pathPrefix(Segment) { namespace =>
188 -
          path("metrics") {
189 -
            pathEnd {
190 -
              get {
191 -
                withNamespaceAuthorization(db, namespace, false, authorizationProvider) {
192 -
                  onComplete(readCoordinator ? GetMetrics(db, namespace)) {
193 -
                    case Success(MetricsGot(_, _, metrics)) =>
194 -
                      complete(HttpEntity(ContentTypes.`application/json`, write(ShowMetricsResponse(metrics))))
195 -
                    case Success(_)  => complete(HttpResponse(InternalServerError, entity = "Unknown reason"))
196 -
                    case Failure(ex) => complete(HttpResponse(InternalServerError, entity = ex.getMessage))
197 -
                  }
262 +
    pathPrefix(Segment) { db =>
263 +
      pathPrefix(Segment) { namespace =>
264 +
        path("metrics") {
265 +
          pathEnd {
266 +
            get {
267 +
              withNamespaceAuthorization(db, namespace, false, authorizationProvider) {
268 +
                onComplete(readCoordinator ? GetMetrics(db, namespace)) {
269 +
                  case Success(MetricsGot(_, _, metrics)) =>
270 +
                    complete(HttpEntity(ContentTypes.`application/json`, write(ShowMetricsResponse(metrics))))
271 +
                  case Success(wrongResponse) =>
272 +
                    logger.error(s"received unexpected response $wrongResponse")
273 +
                    complete(HttpResponse(InternalServerError, entity = "Unknown reason"))
274 +
                  case Failure(ex) =>
275 +
                    logger.error(ex, "unexpected error")
276 +
                    complete(HttpResponse(InternalServerError, entity = ex.getMessage))
198 277
                }
199 278
              }
200 279
            }
@@ -225,55 +304,57 @@
Loading
225 304
      new ApiResponse(code = 500, message = "Internal server error")
226 305
    ))
227 306
  def describeMetric: Route =
228 -
    pathPrefix("commands") {
229 -
      pathPrefix(Segment) { db =>
230 -
        pathPrefix(Segment) { namespace =>
231 -
          pathPrefix(Segment) { metric =>
232 -
            (pathEnd & get) {
233 -
              withMetricAuthorization(db, namespace, metric, false, authorizationProvider) {
234 -
                onComplete(
235 -
                  Future.sequence(
236 -
                    Seq((readCoordinator ? GetSchema(db, namespace, metric)).mapTo[SchemaGot],
237 -
                        (metadataCoordinator ? GetMetricInfo(db, namespace, metric)).mapTo[MetricInfoGot])
238 -
                  )) {
239 -
                  case Success(SchemaGot(_, _, _, Some(schema)) :: MetricInfoGot(_, _, _, metricInfo) :: Nil) =>
240 -
                    complete(
241 -
                      HttpEntity(
242 -
                        ContentTypes.`application/json`,
243 -
                        write(
244 -
                          DescribeMetricResponse(
245 -
                            schema.fieldsMap.map {
246 -
                              case (_, field) =>
247 -
                                Field(name = field.name, `type` = field.indexType.getClass.getSimpleName)
248 -
                            }.toSet,
249 -
                            metricInfo
250 -
                          )
307 +
    pathPrefix(Segment) { db =>
308 +
      pathPrefix(Segment) { namespace =>
309 +
        pathPrefix(Segment) { metric =>
310 +
          (pathEnd & get) {
311 +
            withMetricAuthorization(db, namespace, metric, false, authorizationProvider) {
312 +
              onComplete(
313 +
                Future.sequence(
314 +
                  Seq((readCoordinator ? GetSchema(db, namespace, metric)).mapTo[SchemaGot],
315 +
                      (metadataCoordinator ? GetMetricInfo(db, namespace, metric)).mapTo[MetricInfoGot])
316 +
                )) {
317 +
                case Success(SchemaGot(_, _, _, Some(schema)) :: MetricInfoGot(_, _, _, metricInfo) :: Nil) =>
318 +
                  complete(
319 +
                    HttpEntity(
320 +
                      ContentTypes.`application/json`,
321 +
                      write(
322 +
                        DescribeMetricResponse(
323 +
                          schema.fieldsMap.map {
324 +
                            case (_, field) =>
325 +
                              Field(name = field.name, `type` = field.indexType.getClass.getSimpleName)
326 +
                          }.toSet,
327 +
                          metricInfo
251 328
                        )
252 329
                      )
253 330
                    )
254 -
                  case Success(SchemaGot(_, _, _, schemaOpt) :: MetricInfoGot(_, _, _, Some(metricInfo)) :: Nil) =>
255 -
                    complete(
256 -
                      HttpEntity(
257 -
                        ContentTypes.`application/json`,
258 -
                        write(
259 -
                          DescribeMetricResponse(
260 -
                            schemaOpt
261 -
                              .map(s =>
262 -
                                s.fieldsMap.map {
263 -
                                  case (_, field) =>
264 -
                                    Field(name = field.name, `type` = field.indexType.getClass.getSimpleName)
265 -
                                }.toSet)
266 -
                              .getOrElse(Set.empty),
267 -
                            Some(metricInfo)
268 -
                          )
331 +
                  )
332 +
                case Success(SchemaGot(_, _, _, schemaOpt) :: MetricInfoGot(_, _, _, Some(metricInfo)) :: Nil) =>
333 +
                  complete(
334 +
                    HttpEntity(
335 +
                      ContentTypes.`application/json`,
336 +
                      write(
337 +
                        DescribeMetricResponse(
338 +
                          schemaOpt
339 +
                            .fold(Set.empty[Field])(s =>
340 +
                              s.fieldsMap.foldLeft(Set.empty[Field]) {
341 +
                                case (acc: Set[Field], (_, schemaField)) =>
342 +
                                  acc + Field(name = schemaField.name,
343 +
                                              `type` = schemaField.indexType.getClass.getSimpleName)
344 +
                            }),
345 +
                          Some(metricInfo)
269 346
                        )
270 347
                      )
271 348
                    )
272 -
                  case Success(SchemaGot(_, _, _, None) :: _ :: Nil) =>
273 -
                    complete(HttpResponse(NotFound))
274 -
                  case Failure(ex) => complete(HttpResponse(InternalServerError, entity = ex.getMessage))
275 -
                  case _           => complete(HttpResponse(InternalServerError, entity = "Unknown reason"))
276 -
                }
349 +
                  )
350 +
                case Success(SchemaGot(_, _, _, None) :: _ :: Nil) =>
351 +
                  complete(HttpResponse(NotFound))
352 +
                case Success(wrongResponse) =>
353 +
                  logger.error(s"received unexpected response $wrongResponse")
354 +
                  complete(HttpResponse(InternalServerError, entity = "Unknown reason"))
355 +
                case Failure(ex) =>
356 +
                  logger.error(ex, "unexpected error")
357 +
                  complete(HttpResponse(InternalServerError, entity = ex.getMessage))
277 358
              }
278 359
            }
279 360
          }
@@ -302,26 +383,29 @@
Loading
302 383
      new ApiResponse(code = 500, message = "Internal server error")
303 384
    ))
304 385
  def dropMetric: Route =
305 -
    pathPrefix("commands") {
306 -
      pathPrefix(Segment) { db =>
307 -
        pathPrefix(Segment) { namespace =>
308 -
          pathPrefix(Segment) { metric =>
309 -
            delete {
310 -
              withMetricAuthorization(db, namespace, metric, true, authorizationProvider) {
311 -
                onComplete(writeCoordinator ? DropMetric(db, namespace, metric)) {
312 -
                  case Success(MetricDropped(_, _, _)) => complete("Ok")
313 -
                  case Success(_)                      => complete(HttpResponse(InternalServerError, entity = "Unknown reason"))
314 -
                  case Failure(ex)                     => complete(HttpResponse(InternalServerError, entity = ex.getMessage))
315 -
                }
386 +
    pathPrefix(Segment) { db =>
387 +
      pathPrefix(Segment) { namespace =>
388 +
        pathPrefix(Segment) { metric =>
389 +
          delete {
390 +
            withMetricAuthorization(db, namespace, metric, true, authorizationProvider) {
391 +
              onComplete(writeCoordinator ? DropMetric(db, namespace, metric)) {
392 +
                case Success(MetricDropped(_, _, _)) => complete("Ok")
393 +
                case Success(wrongResponse) =>
394 +
                  logger.error(s"received unexpected response $wrongResponse")
395 +
                  complete(HttpResponse(InternalServerError, entity = "Unknown reason"))
396 +
                case Failure(ex) =>
397 +
                  logger.error(ex, "unexpected error")
398 +
                  complete(HttpResponse(InternalServerError, entity = ex.getMessage))
316 399
              }
317 400
            }
318 401
          }
319 402
        }
320 403
      }
321 404
    }
322 405
323 -
  def commandsApi: Route = {
324 -
    showDbs ~ showNamespaces ~ showMetrics ~ dropNamespace ~ describeMetric ~ dropMetric
325 -
  }
406 +
  def commandsApi: Route =
407 +
    pathPrefix("commands") {
408 +
      topologyApi ~ locationsApi ~ showDbs ~ showNamespaces ~ showMetrics ~ dropNamespace ~ describeMetric ~ dropMetric
409 +
    }
326 410
327 411
}

@@ -33,7 +33,7 @@
Loading
33 33
  */
34 34
object FileUtils {
35 35
36 -
  private val NODE_ID_EXTENSION = ".name"
36 +
  private val NODE_ID_EXTENSION = "name"
37 37
  private val NODE_ID_LENGTH    = 10
38 38
  private val BUFFER_SIZE       = 4096
39 39
  private val buffer            = new Array[Byte](BUFFER_SIZE)

@@ -25,8 +25,6 @@
Loading
25 25
import akka.util.Timeout
26 26
import io.radicalbit.nsdb.cluster.NsdbPerfLogger
27 27
import io.radicalbit.nsdb.cluster.PubSubTopics._
28 -
import io.radicalbit.nsdb.cluster.coordinator.MetadataCoordinator.commands.GetLocations
29 -
import io.radicalbit.nsdb.cluster.coordinator.MetadataCoordinator.events.LocationsGot
30 28
import io.radicalbit.nsdb.cluster.logic.ReadNodesSelection
31 29
import io.radicalbit.nsdb.common.NSDbNumericType
32 30
import io.radicalbit.nsdb.common.configuration.NSDbConfig.HighLevel.precision

Click to load this diff.
Loading diff...

Click to load this diff.
Loading diff...

Click to load this diff.
Loading diff...

Click to load this diff.
Loading diff...

Click to load this diff.
Loading diff...

Click to load this diff.
Loading diff...

Learn more Showing 3 files with coverage changes found.

Changes in nsdb-cluster/src/main/scala/io/radicalbit/nsdb/cluster/coordinator/ReadCoordinator.scala
-3
+3
Loading file...
Changes in nsdb-cluster/src/main/scala/io/radicalbit/nsdb/cluster/coordinator/WriteCoordinator.scala
-9
+9
Loading file...
Changes in nsdb-cluster/src/main/scala/io/radicalbit/nsdb/cluster/coordinator/MetadataCoordinator.scala
-11
+11
Loading file...
Loading