radicalbit / NSDb

@@ -107,10 +107,16 @@
Loading
107 107
108 108
  override def postStop(): Unit = cluster.unsubscribe(self)
109 109
110 +
  private def createNodeActorGuardianName(nodeId: String, nodeName: String): String =
111 +
    s"guardian_${nodeId}_${nodeName}"
112 +
113 +
  private def createNodeActorGuardianPath(nodeId: String, nodeName: String): String =
114 +
    s"/user/${createNodeActorGuardianName(nodeId, nodeName)}"
115 +
110 116
  protected def createNodeActorsGuardian(): ActorRef = {
111 117
    context.system.actorOf(
112 118
      NodeActorsGuardian.props(self, nodeId).withDeploy(Deploy(scope = RemoteScope(cluster.selfMember.address))),
113 -
      name = s"guardian_${nodeId}_${selfNodeName}"
119 +
      name = createNodeActorGuardianName(nodeId, selfNodeName)
114 120
    )
115 121
  }
116 122
@@ -131,22 +137,23 @@
Loading
131 137
      log.error(s"RemoveNodeMetadataFailed for node $nodeName")
132 138
  }
133 139
134 -
  private def unsubscribeNode(nodeName: String)(implicit scheduler: Scheduler, _log: LoggingAdapter) = {
140 +
  private def unsubscribeNode(otherNodeId: String)(implicit scheduler: Scheduler, _log: LoggingAdapter) = {
141 +
    log.info(s"unsubscribing node $otherNodeId from node $nodeId")
135 142
    (for {
136 143
      NodeChildActorsGot(metadataCoordinator, writeCoordinator, readCoordinator, _) <- (context.actorSelection(
137 -
        s"/user/guardian_$nodeName") ? GetNodeChildActors)
144 +
        createNodeActorGuardianPath(nodeId, selfNodeName)) ? GetNodeChildActors)
138 145
        .mapTo[NodeChildActorsGot]
139 -
      _ <- (readCoordinator ? UnsubscribeMetricsDataActor(nodeName)).mapTo[MetricsDataActorUnSubscribed]
140 -
      _ <- (writeCoordinator ? UnSubscribeCommitLogCoordinator(nodeName))
146 +
      _ <- (readCoordinator ? UnsubscribeMetricsDataActor(otherNodeId)).mapTo[MetricsDataActorUnSubscribed]
147 +
      _ <- (writeCoordinator ? UnSubscribeCommitLogCoordinator(otherNodeId))
141 148
        .mapTo[CommitLogCoordinatorUnSubscribed]
142 -
      _ <- (writeCoordinator ? UnSubscribePublisher(nodeName)).mapTo[PublisherUnSubscribed]
143 -
      _ <- (writeCoordinator ? UnsubscribeMetricsDataActor(nodeName))
149 +
      _ <- (writeCoordinator ? UnSubscribePublisher(otherNodeId)).mapTo[PublisherUnSubscribed]
150 +
      _ <- (writeCoordinator ? UnsubscribeMetricsDataActor(otherNodeId))
144 151
        .mapTo[MetricsDataActorUnSubscribed]
145 -
      _ <- (metadataCoordinator ? UnsubscribeMetricsDataActor(nodeName))
152 +
      _ <- (metadataCoordinator ? UnsubscribeMetricsDataActor(otherNodeId))
146 153
        .mapTo[MetricsDataActorUnSubscribed]
147 -
      _ <- (metadataCoordinator ? UnSubscribeCommitLogCoordinator(nodeName))
154 +
      _ <- (metadataCoordinator ? UnSubscribeCommitLogCoordinator(otherNodeId))
148 155
        .mapTo[CommitLogCoordinatorUnSubscribed]
149 -
      removeNodeMetadataResponse <- (metadataCoordinator ? RemoveNodeMetadata(nodeName))
156 +
      removeNodeMetadataResponse <- (metadataCoordinator ? RemoveNodeMetadata(otherNodeId))
150 157
        .mapTo[RemoveNodeMetadataResponse]
151 158
    } yield removeNodeMetadataResponse)
152 159
      .retry(delay, retries)(_.isInstanceOf[NodeMetadataRemoved])
@@ -155,7 +162,7 @@
Loading
155 162
156 163
  def receive: Receive = {
157 164
    case MemberUp(member) if member == cluster.selfMember =>
158 -
      log.info("Member is Up: {}", member.address)
165 +
      log.info(s"Member with nodeId $nodeId and address ${member.address} is Up")
159 166
160 167
      val nodeActorsGuardian = createNodeActorsGuardian()
161 168
@@ -167,7 +174,7 @@
Loading
167 174
168 175
          val locationsToAdd: Seq[LocationWithCoordinates] = retrieveLocationsToAdd.diff(outdatedLocations.locations)
169 176
170 -
          log.debug(s"locations to add from node $nodeId $locationsToAdd")
177 +
          log.info(s"locations to add from node $nodeId \t$locationsToAdd")
171 178
172 179
          val locationsGroupedBy: Map[(String, String), Seq[LocationWithCoordinates]] = locationsToAdd.groupBy {
173 180
            case LocationWithCoordinates(database, namespace, _) => (database, namespace)
@@ -191,7 +198,8 @@
Loading
191 198
        .onComplete {
192 199
          case Success(
193 200
              (NodeChildActorsGot(metadataCoordinator, writeCoordinator, readCoordinator, publisherActor),
194 -
               (_, failures))) if failures.isEmpty =>
201 +
               (success, failures))) if failures.isEmpty =>
202 +
            log.info(s"location ${success} successfully added for node $nodeId")
195 203
            val nodeName = createNodeName(member)
196 204
            mediator ! Subscribe(NODE_GUARDIANS_TOPIC, nodeActorsGuardian)
197 205
            mediator ! Publish(NSDB_LISTENERS_TOPIC, NodeAlive(nodeId, nodeName))
@@ -207,9 +215,10 @@
Loading
207 215
    case MemberRemoved(member, previousStatus) if member != cluster.selfMember =>
208 216
      log.warning("{} Member is Removed: {} after {}", selfNodeName, member.address, previousStatus)
209 217
210 -
      val nodeName = createNodeName(member)
218 +
      val nodeName       = createNodeName(member)
219 +
      val nodeIdToRemove = NSDbClusterSnapshot(context.system).getId(nodeName)
211 220
212 -
      unsubscribeNode(nodeName)
221 +
      unsubscribeNode(nodeIdToRemove.nodeId)
213 222
214 223
      NSDbClusterSnapshot(context.system).removeNode(nodeName)
215 224
    case _: MemberEvent => // ignore

@@ -17,6 +17,7 @@
Loading
17 17
package io.radicalbit.nsdb.cluster.extension
18 18
19 19
import akka.actor.{ExtendedActorSystem, Extension, ExtensionId, ExtensionIdProvider}
20 +
import io.radicalbit.nsdb.protocol.MessageProtocol.Events.NSDbNode
20 21
21 22
import scala.collection.JavaConverters._
22 23
@@ -29,23 +30,23 @@
Loading
29 30
30 31
  import java.util.concurrent.ConcurrentHashMap
31 32
32 -
  private val threadSafeMap: ConcurrentHashMap[String, String] = new ConcurrentHashMap[String, String]()
33 +
  private val threadSafeMap: ConcurrentHashMap[String, NSDbNode] = new ConcurrentHashMap[String, NSDbNode]()
33 34
34 35
  /**
35 36
    * Adds a node and associate it to the a unique identifier
36 37
    * @param address the actual address of the node.
37 38
    * @param nodeId the node unique identifier.
38 39
    */
39 -
  def addNode(address: String, nodeId: String): String = {
40 +
  def addNode(address: String, nodeId: String): NSDbNode = {
40 41
    system.log.debug(s"adding node with address $address to $threadSafeMap")
41 -
    threadSafeMap.put(address, nodeId)
42 +
    threadSafeMap.put(address, NSDbNode(address, nodeId))
42 43
  }
43 44
44 45
  /**
45 46
    * Removes a node.
46 47
    * @param address the actual node address.
47 48
    */
48 -
  def removeNode(address: String): String = {
49 +
  def removeNode(address: String): NSDbNode = {
49 50
    system.log.warning(s"removing node with address $address from $threadSafeMap")
50 51
    threadSafeMap.remove(address)
51 52
  }
@@ -53,9 +54,9 @@
Loading
53 54
  /**
54 55
    * Returns the current active nodes
55 56
    */
56 -
  def nodes: Set[(String, String)] = threadSafeMap.asScala.toSet
57 +
  def nodes: Iterable[NSDbNode] = threadSafeMap.asScala.values
57 58
58 -
  def getId(address: String): String = threadSafeMap.get(address)
59 +
  def getId(address: String): NSDbNode = threadSafeMap.get(address)
59 60
}
60 61
61 62
object NSDbClusterSnapshot extends ExtensionId[NSDbClusterSnapshotExtension] with ExtensionIdProvider {

@@ -70,7 +70,7 @@
Loading
70 70
71 71
  private lazy val writeNodesSelectionLogic = new CapacityWriteNodesSelectionLogic(
72 72
    CapacityWriteNodesSelectionLogic.fromConfigValue(config.getString("nsdb.cluster.metrics-selector")))
73 -
  private lazy val readNodesSelection = new LocalityReadNodesSelection(nodeAddress)
73 +
  private lazy val readNodesSelection = new LocalityReadNodesSelection(nodeId)
74 74
75 75
  private val metadataCache = context.actorOf(Props[ReplicatedMetadataCache], s"metadata-cache-$nodeId-$nodeAddress")
76 76
  private val schemaCache   = context.actorOf(Props[ReplicatedSchemaCache], s"schema-cache-$nodeId-$nodeAddress")
@@ -144,6 +144,8 @@
Loading
144 144
    context.system.scheduler.schedule(interval, interval) {
145 145
      mediator ! Publish(NSDB_LISTENERS_TOPIC, NodeAlive(nodeId, nodeAddress))
146 146
    }
147 +
148 +
    log.info(s"NodeActorGuardian is ready at ${self.path.name}")
147 149
  }
148 150
149 151
  def receive: Receive = {

@@ -17,6 +17,7 @@
Loading
17 17
package io.radicalbit.nsdb.web.routes
18 18
19 19
import akka.actor.ActorRef
20 +
import akka.event.LoggingAdapter
20 21
import akka.http.scaladsl.model.StatusCodes.{InternalServerError, NotFound}
21 22
import akka.http.scaladsl.model.{ContentTypes, HttpEntity, HttpResponse}
22 23
import akka.http.scaladsl.server.Directives._
@@ -52,6 +53,7 @@
Loading
52 53
  implicit val timeout: Timeout
53 54
  implicit val formats: Formats
54 55
  implicit val ec: ExecutionContext
56 +
  def logger: LoggingAdapter
55 57
56 58
  case class CommandRequestDatabase(db: String)
57 59
  case class CommandRequestNamespace(db: String, namespace: String)
@@ -64,6 +66,78 @@
Loading
64 66
  case class Field(name: String, `type`: String)
65 67
  case class DescribeMetricResponse(fields: Set[Field], metricInfo: Option[MetricInfo]) extends CommandResponse
66 68
69 +
  @Api(value = "/dbs", produces = "application/json")
70 +
  @Path("/dbs")
71 +
  @ApiOperation(
72 +
    value = "Perform show topology command. A topology is a representation of the cluster members",
73 +
    nickname = "show topology",
74 +
    httpMethod = "GET",
75 +
    response = classOf[TopologyGot]
76 +
  )
77 +
  @ApiResponses(
78 +
    Array(
79 +
      new ApiResponse(code = 500, message = "Internal server error")
80 +
    ))
81 +
  def topologyApi: Route =
82 +
    path("topology") {
83 +
      (pathEnd & get) {
84 +
        onComplete(metadataCoordinator ? GetTopology) {
85 +
          case Success(topology: TopologyGot) =>
86 +
            complete(HttpEntity(ContentTypes.`application/json`, write(topology)))
87 +
          case Success(wrongResponse) =>
88 +
            logger.error(s"received unexpected response $wrongResponse")
89 +
            complete(HttpResponse(InternalServerError, entity = "Unknown reason"))
90 +
          case Failure(ex) =>
91 +
            logger.error(ex, "unexpected error")
92 +
            complete(HttpResponse(InternalServerError, entity = ex.getMessage))
93 +
        }
94 +
      }
95 +
    }
96 +
97 +
  @Api(value = "locations/{db}/{namespace}/{metric}", produces = "application/json")
98 +
  @Path("/locations/{db}/{namespace}/{metric}")
99 +
  @ApiOperation(value = "Perform show locations given db, namespace, metric",
100 +
                nickname = "Show Locations",
101 +
                httpMethod = "GET",
102 +
                response = classOf[String])
103 +
  @ApiImplicitParams(
104 +
    Array(
105 +
      new ApiImplicitParam(name = "db", value = "database", required = true, dataType = "string", paramType = "path"),
106 +
      new ApiImplicitParam(name = "namespace",
107 +
                           value = "namespace",
108 +
                           required = true,
109 +
                           dataType = "string",
110 +
                           paramType = "path")
111 +
    ))
112 +
  @ApiResponses(
113 +
    Array(
114 +
      new ApiResponse(code = 500, message = "Internal server error")
115 +
    ))
116 +
  def locationsApi: Route = {
117 +
    pathPrefix("locations") {
118 +
      pathPrefix(Segment) { db =>
119 +
        pathPrefix(Segment) { namespace =>
120 +
          pathPrefix(Segment) { metric =>
121 +
            (pathEnd & get) {
122 +
              withMetricAuthorization(db, namespace, metric, false, authorizationProvider) {
123 +
                onComplete(metadataCoordinator ? GetLocations(db, namespace, metric)) {
124 +
                  case Success(response: LocationsGot) =>
125 +
                    complete(HttpEntity(ContentTypes.`application/json`, write(response)))
126 +
                  case Success(wrongResponse) =>
127 +
                    logger.error(s"received unexpected response $wrongResponse")
128 +
                    complete(HttpResponse(InternalServerError, entity = "Unknown reason"))
129 +
                  case Failure(ex) =>
130 +
                    logger.error(ex, "unexpected error")
131 +
                    complete(HttpResponse(InternalServerError, entity = ex.getMessage))
132 +
                }
133 +
              }
134 +
            }
135 +
          }
136 +
        }
137 +
      }
138 +
    }
139 +
  }
140 +
67 141
  @Api(value = "/dbs", produces = "application/json")
68 142
  @Path("/dbs")
69 143
  @ApiOperation(value = "Perform show dbs command",
@@ -75,18 +149,17 @@
Loading
75 149
      new ApiResponse(code = 500, message = "Internal server error")
76 150
    ))
77 151
  def showDbs: Route =
78 -
    pathPrefix("commands") {
79 -
      extractRequest { request =>
80 -
        request.header
81 -
        path("dbs") {
82 -
          (pathEnd & get) {
83 -
            onComplete(readCoordinator ? GetDbs) {
84 -
              case Success(DbsGot(dbs)) =>
85 -
                complete(HttpEntity(ContentTypes.`application/json`, write(ShowDbsResponse(dbs))))
86 -
              case Success(_)  => complete(HttpResponse(InternalServerError, entity = "Unknown reason"))
87 -
              case Failure(ex) => complete(HttpResponse(InternalServerError, entity = ex.getMessage))
88 -
            }
89 -
          }
152 +
    path("dbs") {
153 +
      (pathEnd & get) {
154 +
        onComplete(readCoordinator ? GetDbs) {
155 +
          case Success(DbsGot(dbs)) =>
156 +
            complete(HttpEntity(ContentTypes.`application/json`, write(ShowDbsResponse(dbs))))
157 +
          case Success(wrongResponse) =>
158 +
            logger.error(s"received unexpected response $wrongResponse")
159 +
            complete(HttpResponse(InternalServerError, entity = "Unknown reason"))
160 +
          case Failure(ex) =>
161 +
            logger.error(ex, "unexpected error")
162 +
            complete(HttpResponse(InternalServerError, entity = ex.getMessage))
90 163
        }
91 164
      }
92 165
    }
@@ -106,17 +179,19 @@
Loading
106 179
      new ApiResponse(code = 500, message = "Internal server error")
107 180
    ))
108 181
  def showNamespaces: Route =
109 -
    pathPrefix("commands") {
110 -
      pathPrefix(Segment) { db =>
111 -
        path("namespaces") {
112 -
          (pathEnd & get) {
113 -
            withDbAuthorization(db, false, authorizationProvider) {
114 -
              onComplete(readCoordinator ? GetNamespaces(db)) {
115 -
                case Success(NamespacesGot(_, namespaces)) =>
116 -
                  complete(HttpEntity(ContentTypes.`application/json`, write(ShowNamespacesResponse(namespaces))))
117 -
                case Success(_)  => complete(HttpResponse(InternalServerError, entity = "Unknown reason"))
118 -
                case Failure(ex) => complete(HttpResponse(InternalServerError, entity = ex.getMessage))
119 -
              }
182 +
    pathPrefix(Segment) { db =>
183 +
      path("namespaces") {
184 +
        (pathEnd & get) {
185 +
          withDbAuthorization(db, false, authorizationProvider) {
186 +
            onComplete(readCoordinator ? GetNamespaces(db)) {
187 +
              case Success(NamespacesGot(_, namespaces)) =>
188 +
                complete(HttpEntity(ContentTypes.`application/json`, write(ShowNamespacesResponse(namespaces))))
189 +
              case Success(wrongResponse) =>
190 +
                logger.error(s"received unexpected response $wrongResponse")
191 +
                complete(HttpResponse(InternalServerError, entity = "Unknown reason"))
192 +
              case Failure(ex) =>
193 +
                logger.error(ex, "unexpected error")
194 +
                complete(HttpResponse(InternalServerError, entity = ex.getMessage))
120 195
            }
121 196
          }
122 197
        }
@@ -143,17 +218,19 @@
Loading
143 218
      new ApiResponse(code = 500, message = "Internal server error")
144 219
    ))
145 220
  def dropNamespace: Route = {
146 -
    pathPrefix("commands") {
147 -
      pathPrefix(Segment) { db =>
148 -
        pathPrefix(Segment) { namespace =>
149 -
          pathEnd {
150 -
            delete {
151 -
              withNamespaceAuthorization(db, namespace, true, authorizationProvider) {
152 -
                onComplete(writeCoordinator ? DeleteNamespace(db, namespace)) {
153 -
                  case Success(NamespaceDeleted(_, _)) => complete("Ok")
154 -
                  case Success(_)                      => complete(HttpResponse(InternalServerError, entity = "Unknown reason"))
155 -
                  case Failure(ex)                     => complete(HttpResponse(InternalServerError, entity = ex.getMessage))
156 -
                }
221 +
    pathPrefix(Segment) { db =>
222 +
      pathPrefix(Segment) { namespace =>
223 +
        pathEnd {
224 +
          delete {
225 +
            withNamespaceAuthorization(db, namespace, true, authorizationProvider) {
226 +
              onComplete(writeCoordinator ? DeleteNamespace(db, namespace)) {
227 +
                case Success(NamespaceDeleted(_, _)) => complete("Ok")
228 +
                case Success(wrongResponse) =>
229 +
                  logger.error(s"received unexpected response $wrongResponse")
230 +
                  complete(HttpResponse(InternalServerError, entity = "Unknown reason"))
231 +
                case Failure(ex) =>
232 +
                  logger.error(ex, "unexpected error")
233 +
                  complete(HttpResponse(InternalServerError, entity = ex.getMessage))
157 234
              }
158 235
            }
159 236
          }
@@ -182,19 +259,21 @@
Loading
182 259
      new ApiResponse(code = 500, message = "Internal server error")
183 260
    ))
184 261
  def showMetrics: Route =
185 -
    pathPrefix("commands") {
186 -
      pathPrefix(Segment) { db =>
187 -
        pathPrefix(Segment) { namespace =>
188 -
          path("metrics") {
189 -
            pathEnd {
190 -
              get {
191 -
                withNamespaceAuthorization(db, namespace, false, authorizationProvider) {
192 -
                  onComplete(readCoordinator ? GetMetrics(db, namespace)) {
193 -
                    case Success(MetricsGot(_, _, metrics)) =>
194 -
                      complete(HttpEntity(ContentTypes.`application/json`, write(ShowMetricsResponse(metrics))))
195 -
                    case Success(_)  => complete(HttpResponse(InternalServerError, entity = "Unknown reason"))
196 -
                    case Failure(ex) => complete(HttpResponse(InternalServerError, entity = ex.getMessage))
197 -
                  }
262 +
    pathPrefix(Segment) { db =>
263 +
      pathPrefix(Segment) { namespace =>
264 +
        path("metrics") {
265 +
          pathEnd {
266 +
            get {
267 +
              withNamespaceAuthorization(db, namespace, false, authorizationProvider) {
268 +
                onComplete(readCoordinator ? GetMetrics(db, namespace)) {
269 +
                  case Success(MetricsGot(_, _, metrics)) =>
270 +
                    complete(HttpEntity(ContentTypes.`application/json`, write(ShowMetricsResponse(metrics))))
271 +
                  case Success(wrongResponse) =>
272 +
                    logger.error(s"received unexpected response $wrongResponse")
273 +
                    complete(HttpResponse(InternalServerError, entity = "Unknown reason"))
274 +
                  case Failure(ex) =>
275 +
                    logger.error(ex, "unexpected error")
276 +
                    complete(HttpResponse(InternalServerError, entity = ex.getMessage))
198 277
                }
199 278
              }
200 279
            }
@@ -225,55 +304,57 @@
Loading
225 304
      new ApiResponse(code = 500, message = "Internal server error")
226 305
    ))
227 306
  def describeMetric: Route =
228 -
    pathPrefix("commands") {
229 -
      pathPrefix(Segment) { db =>
230 -
        pathPrefix(Segment) { namespace =>
231 -
          pathPrefix(Segment) { metric =>
232 -
            (pathEnd & get) {
233 -
              withMetricAuthorization(db, namespace, metric, false, authorizationProvider) {
234 -
                onComplete(
235 -
                  Future.sequence(
236 -
                    Seq((readCoordinator ? GetSchema(db, namespace, metric)).mapTo[SchemaGot],
237 -
                        (metadataCoordinator ? GetMetricInfo(db, namespace, metric)).mapTo[MetricInfoGot])
238 -
                  )) {
239 -
                  case Success(SchemaGot(_, _, _, Some(schema)) :: MetricInfoGot(_, _, _, metricInfo) :: Nil) =>
240 -
                    complete(
241 -
                      HttpEntity(
242 -
                        ContentTypes.`application/json`,
243 -
                        write(
244 -
                          DescribeMetricResponse(
245 -
                            schema.fieldsMap.map {
246 -
                              case (_, field) =>
247 -
                                Field(name = field.name, `type` = field.indexType.getClass.getSimpleName)
248 -
                            }.toSet,
249 -
                            metricInfo
250 -
                          )
307 +
    pathPrefix(Segment) { db =>
308 +
      pathPrefix(Segment) { namespace =>
309 +
        pathPrefix(Segment) { metric =>
310 +
          (pathEnd & get) {
311 +
            withMetricAuthorization(db, namespace, metric, false, authorizationProvider) {
312 +
              onComplete(
313 +
                Future.sequence(
314 +
                  Seq((readCoordinator ? GetSchema(db, namespace, metric)).mapTo[SchemaGot],
315 +
                      (metadataCoordinator ? GetMetricInfo(db, namespace, metric)).mapTo[MetricInfoGot])
316 +
                )) {
317 +
                case Success(SchemaGot(_, _, _, Some(schema)) :: MetricInfoGot(_, _, _, metricInfo) :: Nil) =>
318 +
                  complete(
319 +
                    HttpEntity(
320 +
                      ContentTypes.`application/json`,
321 +
                      write(
322 +
                        DescribeMetricResponse(
323 +
                          schema.fieldsMap.map {
324 +
                            case (_, field) =>
325 +
                              Field(name = field.name, `type` = field.indexType.getClass.getSimpleName)
326 +
                          }.toSet,
327 +
                          metricInfo
251 328
                        )
252 329
                      )
253 330
                    )
254 -
                  case Success(SchemaGot(_, _, _, schemaOpt) :: MetricInfoGot(_, _, _, Some(metricInfo)) :: Nil) =>
255 -
                    complete(
256 -
                      HttpEntity(
257 -
                        ContentTypes.`application/json`,
258 -
                        write(
259 -
                          DescribeMetricResponse(
260 -
                            schemaOpt
261 -
                              .map(s =>
262 -
                                s.fieldsMap.map {
263 -
                                  case (_, field) =>
264 -
                                    Field(name = field.name, `type` = field.indexType.getClass.getSimpleName)
265 -
                                }.toSet)
266 -
                              .getOrElse(Set.empty),
267 -
                            Some(metricInfo)
268 -
                          )
331 +
                  )
332 +
                case Success(SchemaGot(_, _, _, schemaOpt) :: MetricInfoGot(_, _, _, Some(metricInfo)) :: Nil) =>
333 +
                  complete(
334 +
                    HttpEntity(
335 +
                      ContentTypes.`application/json`,
336 +
                      write(
337 +
                        DescribeMetricResponse(
338 +
                          schemaOpt
339 +
                            .fold(Set.empty[Field])(s =>
340 +
                              s.fieldsMap.foldLeft(Set.empty[Field]) {
341 +
                                case (acc: Set[Field], (_, schemaField)) =>
342 +
                                  acc + Field(name = schemaField.name,
343 +
                                              `type` = schemaField.indexType.getClass.getSimpleName)
344 +
                            }),
345 +
                          Some(metricInfo)
269 346
                        )
270 347
                      )
271 348
                    )
272 -
                  case Success(SchemaGot(_, _, _, None) :: _ :: Nil) =>
273 -
                    complete(HttpResponse(NotFound))
274 -
                  case Failure(ex) => complete(HttpResponse(InternalServerError, entity = ex.getMessage))
275 -
                  case _           => complete(HttpResponse(InternalServerError, entity = "Unknown reason"))
276 -
                }
349 +
                  )
350 +
                case Success(SchemaGot(_, _, _, None) :: _ :: Nil) =>
351 +
                  complete(HttpResponse(NotFound))
352 +
                case Success(wrongResponse) =>
353 +
                  logger.error(s"received unexpected response $wrongResponse")
354 +
                  complete(HttpResponse(InternalServerError, entity = "Unknown reason"))
355 +
                case Failure(ex) =>
356 +
                  logger.error(ex, "unexpected error")
357 +
                  complete(HttpResponse(InternalServerError, entity = ex.getMessage))
277 358
              }
278 359
            }
279 360
          }
@@ -302,17 +383,19 @@
Loading
302 383
      new ApiResponse(code = 500, message = "Internal server error")
303 384
    ))
304 385
  def dropMetric: Route =
305 -
    pathPrefix("commands") {
306 -
      pathPrefix(Segment) { db =>
307 -
        pathPrefix(Segment) { namespace =>
308 -
          pathPrefix(Segment) { metric =>
309 -
            delete {
310 -
              withMetricAuthorization(db, namespace, metric, true, authorizationProvider) {
311 -
                onComplete(writeCoordinator ? DropMetric(db, namespace, metric)) {
312 -
                  case Success(MetricDropped(_, _, _)) => complete("Ok")
313 -
                  case Success(_)                      => complete(HttpResponse(InternalServerError, entity = "Unknown reason"))
314 -
                  case Failure(ex)                     => complete(HttpResponse(InternalServerError, entity = ex.getMessage))
315 -
                }
386 +
    pathPrefix(Segment) { db =>
387 +
      pathPrefix(Segment) { namespace =>
388 +
        pathPrefix(Segment) { metric =>
389 +
          delete {
390 +
            withMetricAuthorization(db, namespace, metric, true, authorizationProvider) {
391 +
              onComplete(writeCoordinator ? DropMetric(db, namespace, metric)) {
392 +
                case Success(MetricDropped(_, _, _)) => complete("Ok")
393 +
                case Success(wrongResponse) =>
394 +
                  logger.error(s"received unexpected response $wrongResponse")
395 +
                  complete(HttpResponse(InternalServerError, entity = "Unknown reason"))
396 +
                case Failure(ex) =>
397 +
                  logger.error(ex, "unexpected error")
398 +
                  complete(HttpResponse(InternalServerError, entity = ex.getMessage))
316 399
              }
317 400
            }
318 401
          }
@@ -320,8 +403,9 @@
Loading
320 403
      }
321 404
    }
322 405
323 -
  def commandsApi: Route = {
324 -
    showDbs ~ showNamespaces ~ showMetrics ~ dropNamespace ~ describeMetric ~ dropMetric
325 -
  }
406 +
  def commandsApi: Route =
407 +
    pathPrefix("commands") {
408 +
      topologyApi ~ locationsApi ~ showDbs ~ showNamespaces ~ showMetrics ~ dropNamespace ~ describeMetric ~ dropMetric
409 +
    }
326 410
327 411
}
328 412
imilarity index 77%
329 413
ename from nsdb-http/src/test/scala/io/radicalbit/nsdb/web/CommandApiTest.scala
330 414
ename to nsdb-http/src/test/scala/io/radicalbit/nsdb/web/CommandApiSpec.scala

@@ -33,7 +33,7 @@
Loading
33 33
  */
34 34
object FileUtils {
35 35
36 -
  private val NODE_ID_EXTENSION = ".name"
36 +
  private val NODE_ID_EXTENSION = "name"
37 37
  private val NODE_ID_LENGTH    = 10
38 38
  private val BUFFER_SIZE       = 4096
39 39
  private val buffer            = new Array[Byte](BUFFER_SIZE)

@@ -25,8 +25,6 @@
Loading
25 25
import akka.util.Timeout
26 26
import io.radicalbit.nsdb.cluster.NsdbPerfLogger
27 27
import io.radicalbit.nsdb.cluster.PubSubTopics._
28 -
import io.radicalbit.nsdb.cluster.coordinator.MetadataCoordinator.commands.GetLocations
29 -
import io.radicalbit.nsdb.cluster.coordinator.MetadataCoordinator.events.LocationsGot
30 28
import io.radicalbit.nsdb.cluster.logic.ReadNodesSelection
31 29
import io.radicalbit.nsdb.common.NSDbNumericType
32 30
import io.radicalbit.nsdb.common.configuration.NSDbConfig.HighLevel.precision

@@ -33,10 +33,12 @@
Loading
33 33
    * commands executed among NSDb actors.
34 34
    */
35 35
  object Commands {
36 -
    case object GetDbs                                                  extends ControlMessage with NSDbSerializable
37 -
    case class GetNamespaces(db: String)                                extends ControlMessage with NSDbSerializable
38 -
    case class GetMetrics(db: String, namespace: String)                extends ControlMessage with NSDbSerializable
39 -
    case class GetSchema(db: String, namespace: String, metric: String) extends NSDbSerializable
36 +
    case object GetTopology                                                extends NSDbSerializable
37 +
    case class GetLocations(db: String, namespace: String, metric: String) extends NSDbSerializable
38 +
    case object GetDbs                                                     extends ControlMessage with NSDbSerializable
39 +
    case class GetNamespaces(db: String)                                   extends ControlMessage with NSDbSerializable
40 +
    case class GetMetrics(db: String, namespace: String)                   extends ControlMessage with NSDbSerializable
41 +
    case class GetSchema(db: String, namespace: String, metric: String)    extends NSDbSerializable
40 42
41 43
    case class GetMetricInfo(db: String, namespace: String, metric: String) extends NSDbSerializable
42 44
@@ -106,6 +108,7 @@
Loading
106 108
    case object GetPublishers            extends NSDbSerializable
107 109
108 110
    case class NodeAlive(nodeId: String, nodeAddress: String) extends NSDbSerializable
111 +
109 112
  }
110 113
111 114
  /**
@@ -113,6 +116,9 @@
Loading
113 116
    */
114 117
  object Events {
115 118
119 +
    case class LocationsGot(db: String, namespace: String, metric: String, locations: Seq[Location])
120 +
        extends NSDbSerializable
121 +
116 122
    @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
117 123
    @JsonSubTypes(
118 124
      Array(
@@ -231,6 +237,9 @@
Loading
231 237
    case class PublisherUnSubscribed(nodeName: String)            extends NSDbSerializable
232 238
233 239
    case class MigrationStarted(inputPath: String) extends NSDbSerializable
240 +
241 +
    case class NSDbNode(address: String, nodeId: String) extends NSDbSerializable
242 +
    case class TopologyGot(nodes: Set[NSDbNode])         extends NSDbSerializable
234 243
  }
235 244
236 245
}

@@ -16,9 +16,6 @@
Loading
16 16
17 17
package io.radicalbit.nsdb.cluster.coordinator
18 18
19 -
import java.time.Duration
20 -
import java.util.concurrent.TimeUnit
21 -
22 19
import akka.actor.{ActorRef, Props, Stash}
23 20
import akka.cluster.pubsub.DistributedPubSubMediator.{Publish, Subscribe}
24 21
import akka.util.Timeout
@@ -26,10 +23,9 @@
Loading
26 23
import io.radicalbit.nsdb.cluster.PubSubTopics.{COORDINATORS_TOPIC, NODE_GUARDIANS_TOPIC}
27 24
import io.radicalbit.nsdb.cluster.actor.MetricsDataActor.ExecuteDeleteStatementInternalInLocations
28 25
import io.radicalbit.nsdb.cluster.actor.SequentialFutureProcessing
29 -
import io.radicalbit.nsdb.cluster.coordinator.MetadataCoordinator.commands.{GetLocations, GetWriteLocations}
26 +
import io.radicalbit.nsdb.cluster.coordinator.MetadataCoordinator.commands.GetWriteLocations
30 27
import io.radicalbit.nsdb.cluster.coordinator.MetadataCoordinator.events._
31 28
import io.radicalbit.nsdb.cluster.logic.WriteConfig
32 -
import io.radicalbit.nsdb.util.ErrorManagementUtils._
33 29
import io.radicalbit.nsdb.commit_log.CommitLogWriterActor._
34 30
import io.radicalbit.nsdb.common.configuration.NSDbConfig
35 31
import io.radicalbit.nsdb.common.protocol.Bit
@@ -39,7 +35,10 @@
Loading
39 35
import io.radicalbit.nsdb.protocol.MessageProtocol.Commands._
40 36
import io.radicalbit.nsdb.protocol.MessageProtocol.Events._
41 37
import io.radicalbit.nsdb.util.ActorPathLogging
38 +
import io.radicalbit.nsdb.util.ErrorManagementUtils._
42 39
40 +
import java.time.Duration
41 +
import java.util.concurrent.TimeUnit
43 42
import scala.collection.mutable
44 43
import scala.concurrent.Future
45 44
import scala.concurrent.duration.FiniteDuration
@@ -71,8 +70,6 @@
Loading
71 70
    TimeUnit.SECONDS)
72 71
  import context.dispatcher
73 72
74 -
  log.info("WriteCoordinator is ready.")
75 -
76 73
  lazy val shardingInterval: Duration = context.system.settings.config.getDuration("nsdb.sharding.interval")
77 74
78 75
  override lazy val indexStorageStrategy: StorageStrategy =
@@ -375,6 +372,8 @@
Loading
375 372
      log.debug("WriteCoordinator data actor : {}", metricsDataActors.size)
376 373
      log.debug("WriteCoordinator commit log  actor : {}", commitLogCoordinators.size)
377 374
    }
375 +
376 +
    log.info("WriteCoordinator is ready.")
378 377
  }
379 378
380 379
  override def receive: Receive = {

@@ -35,7 +35,7 @@
Loading
35 35
                   val writeCoordinator: ActorRef,
36 36
                   val metadataCoordinator: ActorRef,
37 37
                   val authorizationProvider: NSDbAuthorizationProvider)(override implicit val timeout: Timeout,
38 -
                                                                         implicit val logger: LoggingAdapter,
38 +
                                                                         override implicit val logger: LoggingAdapter,
39 39
                                                                         override implicit val ec: ExecutionContext,
40 40
                                                                         override implicit val formats: Formats)
41 41
    extends CommandApi

@@ -17,7 +17,6 @@
Loading
17 17
package io.radicalbit.nsdb.cluster.coordinator
18 18
19 19
import java.util.concurrent.TimeUnit
20 -
21 20
import akka.actor._
22 21
import akka.cluster.ddata.DurableStore.{LoadAll, LoadData}
23 22
import akka.cluster.ddata._
@@ -307,6 +306,8 @@
Loading
307 306
  }
308 307
309 308
  override def receive: Receive = {
309 +
    case GetTopology =>
310 +
      sender() ! TopologyGot(NSDbClusterSnapshot(context.system).nodes.toSet)
310 311
    case AllMetricInfoWithRetentionGot(metricInfoes) =>
311 312
      implicit val timeContext: TimeContext = TimeContext()
312 313
      log.debug(s"check for retention for {}", metricInfoes)
@@ -483,12 +484,14 @@
Loading
483 484
                            if (nodeMetrics.nodeMetrics.nonEmpty)
484 485
                              writeNodesSelectionLogic
485 486
                                .selectWriteNodes(nodeMetrics.nodeMetrics, replicationFactor)
486 -
                                .map(address => (address, nsdbClusterSnapshot.getId(address)))
487 +
                                .map(address => nsdbClusterSnapshot.getId(address))
487 488
                            else {
488 489
                              Random.shuffle(clusterAliveMembers.toSeq).take(replicationFactor)
489 490
                            }
490 491
491 -
                          val locations = nodes.map { case (_, id) => Location(metric, id, start, end) }
492 +
                          val locations = nodes.map { node =>
493 +
                            Location(metric, node.nodeId, start, end)
494 +
                          }
492 495
                          performAddLocationIntoCache(db, namespace, locations, None)
493 496
                        }
494 497
                      } yield
@@ -618,7 +621,6 @@
Loading
618 621
619 622
  object commands {
620 623
621 -
    case class GetLocations(db: String, namespace: String, metric: String) extends NSDbSerializable
622 624
    case class GetWriteLocations(db: String, namespace: String, metric: String, timestamp: Long)
623 625
        extends NSDbSerializable
624 626
    case class AddLocations(db: String,
@@ -643,9 +645,6 @@
Loading
643 645
644 646
  object events {
645 647
646 -
    case class LocationsGot(db: String, namespace: String, metric: String, locations: Seq[Location])
647 -
        extends NSDbSerializable
648 -
649 648
    @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
650 649
    @JsonSubTypes(
651 650
      Array(

@@ -25,11 +25,12 @@
Loading
25 25
class LocalityReadNodesSelection(localNode: String) extends ReadNodesSelection {
26 26
27 27
  override def getDistinctLocationsByNode(locationsWithReplicas: Seq[Location]): Map[String, Seq[Location]] = {
28 +
    assert(localNode != null)
28 29
    locationsWithReplicas
29 30
      .groupBy(l => (l.from, l.to))
30 31
      .map {
31 32
        case ((_, _), locations) if locations.size > 1 =>
32 -
          locations.find(_.node == localNode).getOrElse(locations.minBy(_.node))
33 +
          locations.find(_.node == localNode).getOrElse(locations.head)
33 34
        case ((_, _), locations) => locations.minBy(_.node)
34 35
      }
35 36
      .toSeq

No yaml found.

Create your codecov.yml to customize your Codecov experience

Sunburst
The inner-most circle is the entire project, moving away from the center are folders then, finally, a single file. The size and color of each slice is representing the number of statements and the coverage, respectively.
Icicle
The top section represents the entire project. Proceeding with folders and finally individual files. The size and color of each slice is representing the number of statements and the coverage, respectively.
Grid
Each block represents a single file in the project. The size and color of each block is represented by the number of statements and the coverage, respectively.
Loading