wherby / doradilla

@@ -4,6 +4,7 @@
Loading
4 4
import akka.event.slf4j.Logger
5 5
import doracore.base.BaseActor
6 6
import doracore.base.query.QueryTrait.{ChildInfo, QueryChild}
7 +
import doracore.core.driver.DriverActor.FSMDecrease
7 8
import doracore.core.fsm.FsmActor._
8 9
import doracore.core.msg.Job._
9 10
import doracore.core.msg.JobControlMsg.ResetFsm
@@ -136,6 +137,12 @@
Loading
136 137
    case Event(queryChild: QueryChild, _) => val childInfo = ChildInfo(context.self.path.toString, getChildren(), System.currentTimeMillis() / 1000)
137 138
      queryChild.actorRef ! childInfo
138 139
      stay()
140 +
    case Event(fsmDecrease: FSMDecrease, _) =>{
141 +
      log.info(s" Receive decrease msg : $fsmDecrease from : $sender(). This FSMActor will be killed.")
142 +
      driverActor = null
143 +
      self ! PoisonPill
144 +
      stay()
145 +
    }
139 146
    case Event(QueryState(), data) =>
140 147
      sender() ! data
141 148
      log.info(s"QueryState: $data")

@@ -4,7 +4,7 @@
Loading
4 4
import doracore.base.BaseActor
5 5
import akka.event.LoggingReceive
6 6
import com.datastax.driver.core.utils.UUIDs
7 -
import doracore.core.driver.DriverActor.{FetchQueue, ProxyActorMsg}
7 +
import doracore.core.driver.DriverActor._
8 8
import doracore.core.fsm.FsmActor
9 9
import doracore.core.fsm.FsmActor.{FetchJob, RegistToDriver, SetDriver}
10 10
import doracore.core.msg.Job.{JobMeta, JobRequest}
@@ -18,14 +18,19 @@
Loading
18 18
  * Created by whereby[Tao Zhou](187225577@qq.com) on 2019/3/30
19 19
  */
20 20
class DriverActor(queue: Option[ActorRef] = None, setDefaultFsmActor: Option[Boolean] = Some(true)) extends BaseActor {
21 +
  var fsmToBeDecrease = 0
21 22
  val queueActor = queue match {
22 23
    case Some(queue) => queue
23 24
    case _ =>
24 -
      context.actorOf(QueueActor.queueActorProps, CNaming.timebasedName( "queueActor"))
25 +
      context.actorOf(QueueActor.queueActorProps, CNaming.timebasedName("queueActor"))
25 26
  }
26 27
27 -
  if(setDefaultFsmActor == Some(true)){
28 -
    val fsmActor: ActorRef = context.actorOf(DriverActor.fsmProps, CNaming.timebasedName( "fsmActor"))
28 +
  if (setDefaultFsmActor == Some(true)) {
29 +
    createOneFSMActor()
30 +
  }
31 +
32 +
  private def createOneFSMActor() = {
33 +
    val fsmActor: ActorRef = context.actorOf(DriverActor.fsmProps, CNaming.timebasedName("fsmActor"))
29 34
    fsmActor ! SetDriver(self)
30 35
  }
31 36
@@ -36,16 +41,21 @@
Loading
36 41
  def handleRequest(jobRequestOrg: JobRequest) = {
37 42
    val jobRequest = jobRequestOrg.jobMetaOpt match {
38 43
      case Some(_) => jobRequestOrg
39 -
      case _=> jobRequestOrg.copy(jobMetaOpt = Some(JobMeta(UUIDs.timeBased().toString)))
44 +
      case _ => jobRequestOrg.copy(jobMetaOpt = Some(JobMeta(UUIDs.timeBased().toString)))
40 45
    }
41 -
    val proxyActor = createProxy(CNaming.timebasedName( jobRequest.taskMsg.operation ))
46 +
    val proxyActor = createProxy(CNaming.timebasedName(jobRequest.taskMsg.operation))
42 47
    log.info(s"{${jobRequest.jobMetaOpt}} is handled by proxy $proxyActor")
43 48
    proxyActor ! jobRequest
44 49
    sender() ! ProxyActorMsg(proxyActor)
45 50
  }
46 51
47 52
  def hundleFetchJob() = {
48 -
    queueActor ! FetchTask(1,sender())
53 +
    if(fsmToBeDecrease > 0){
54 +
      fsmToBeDecrease = fsmToBeDecrease -1
55 +
      sender() ! FSMDecrease(1)
56 +
    }else{
57 +
      queueActor ! FetchTask(1, sender())
58 +
    }
49 59
  }
50 60
51 61
  def hundleRequestListResponse(requestListResponse: RequestListResponse) = {
@@ -54,21 +64,32 @@
Loading
54 64
    }
55 65
  }
56 66
57 -
  def handleRegister(registToDriver: RegistToDriver) ={
67 +
  def handleRegister(registToDriver: RegistToDriver) = {
58 68
    registToDriver.actorRef ! SetDriver(self)
59 69
  }
60 70
61 -
  def handleFetchQueue()={
71 +
  def handleFetchQueue() = {
62 72
    sender() ! queueActor
63 73
  }
64 74
75 +
  def handleFSMControl(fsmControl: FSMControl) = {
76 +
    fsmControl match {
77 +
      case FSMIncrease(num) if (num > 0  && num < 1000)=> for (_ <- 1 to num) {
78 +
        log.info("Increase FSMActor.")
79 +
        createOneFSMActor()
80 +
      }
81 +
      case FSMDecrease(num) => fsmToBeDecrease = fsmToBeDecrease + num
82 +
    }
83 +
  }
84 +
65 85
66 86
  override def receive: Receive = LoggingReceive {
67 87
    case jobRequest: JobRequest => handleRequest(jobRequest)
68 88
    case fetchJob: FetchJob => hundleFetchJob()
69 -
    case requestListResponse: RequestListResponse =>hundleRequestListResponse(requestListResponse)
89 +
    case requestListResponse: RequestListResponse => hundleRequestListResponse(requestListResponse)
70 90
    case registToDriver: RegistToDriver => handleRegister(registToDriver)
71 91
    case _: FetchQueue => handleFetchQueue()
92 +
    case fsmControl: FSMControl => handleFSMControl(fsmControl)
72 93
  }
73 94
}
74 95
@@ -78,7 +99,7 @@
Loading
78 99
  }
79 100
80 101
  def driverActorPropsWithoutFSM(queue: Option[ActorRef] = None) = {
81 -
    Props(new DriverActor(queue,None))
102 +
    Props(new DriverActor(queue, None))
82 103
  }
83 104
84 105
  def fsmProps: Props = Props(new FsmActor)
@@ -87,4 +108,10 @@
Loading
87 108
88 109
  case class FetchQueue()
89 110
111 +
  sealed trait FSMControl
112 +
113 +
  case class FSMIncrease(increase: Int) extends FSMControl
114 +
115 +
  case class FSMDecrease(decrease: Int) extends FSMControl
116 +
90 117
}

@@ -2,10 +2,13 @@
Loading
2 2
3 3
import akka.util.Timeout
4 4
import doracore.api.JobApi
5 +
import doracore.core.driver.DriverActor.{FSMDecrease, FSMIncrease}
5 6
import doracore.core.msg.Job.{JobMsg, JobRequest, JobResult}
6 7
import doracore.tool.receive.ReceiveActor
7 8
import doracore.util.CNaming
8 9
import doracore.vars.ConstVars
10 +
import javax.print.attribute.standard.JobName
11 +
9 12
import scala.concurrent.{ExecutionContext, Future}
10 13
11 14
/**
@@ -35,4 +38,13 @@
Loading
35 38
    val processJobRequest = JobRequest(processJob, receiveActor, jobApi.processTranActor, priority)
36 39
    getProcessCommandFutureResult(processJobRequest, jobApi.defaultDriver, receiveActor,timeout)
37 40
  }
41 +
42 +
  def changeFSMForNamedJob(jobName: String, num:Int)={
43 +
    val jobApi = getNamedJobApi(jobName)
44 +
    if(num >0){
45 +
      jobApi.defaultDriver ! FSMIncrease(num)
46 +
    }else{
47 +
      jobApi.defaultDriver ! FSMDecrease(Math.abs(num))
48 +
    }
49 +
  }
38 50
}
Files Coverage
doradilla-core/src/main/scala 100.00%
Project Totals (45 files) 100.00%
351.1
default=
TRAVIS_OS_NAME=linux
350.1
default=
TRAVIS_OS_NAME=linux

No yaml found.

Create your codecov.yml to customize your Codecov experience

Sunburst
The inner-most circle is the entire project, moving away from the center are folders then, finally, a single file. The size and color of each slice is representing the number of statements and the coverage, respectively.
Icicle
The top section represents the entire project. Proceeding with folders and finally individual files. The size and color of each slice is representing the number of statements and the coverage, respectively.
Grid
Each block represents a single file in the project. The size and color of each block is represented by the number of statements and the coverage, respectively.
Loading