Sådan oprettes en enkel applikation med Akka Cluster

Hvis du læser min tidligere historie om Scalachain, har du sandsynligvis bemærket, at det langt fra er et distribueret system. Det mangler alle funktionerne til at fungere korrekt med andre noder. Tilføj til det, at en blockchain sammensat af en enkelt node er ubrugelig. Af denne grund besluttede jeg, at det er tid til at arbejde på problemet.

Da Scalachain er drevet af Akka, hvorfor ikke tage chancen for at spille med Akka Cluster? Jeg oprettede et simpelt projekt til at tinker lidt med Akka Cluster, og i denne historie vil jeg dele mine erfaringer. Vi skal oprette en klynge med tre noder ved hjælp af Cluster Aware Routers til at afbalancere belastningen blandt dem. Alt kører i en Docker-container, og vi bruger docker-compose til en nem implementering.

Ok, lad os rulle! ?

Hurtig introduktion til Akka Cluster

Akka Cluster yder stor støtte til oprettelsen af ​​distribuerede applikationer. Den bedste anvendelse er, når du har en node, som du vil replikere N gange i et distribueret miljø. Dette betyder, at alle N-noder er peers, der kører den samme kode. Akka Cluster giver dig out-of-the-box opdagelsen af ​​medlemmer i samme klynge. Ved hjælp af Cluster Aware Routers er det muligt at afbalancere beskederne mellem aktører i forskellige noder. Det er også muligt at vælge afbalanceringspolitik, hvilket gør belastningsafbalancering til et stykke kage!

Faktisk kan du vælge mellem to typer routere:

Group Router - De aktører, der skal sende beskederne til - kaldte routees - specificeres ved hjælp af deres aktørsti. Routerne deler rutinerne, der er oprettet i klyngen. Vi bruger en Group Router i dette eksempel.

Pool Router - Routerne oprettes og implementeres af routeren, så de er dens børn i skuespillerhierarkiet. Routees deles ikke mellem routere. Dette er ideelt til et primært replika-scenarie, hvor hver router er den primære og dens rutiner replikerne.

Dette er bare toppen af ​​isbjerget, så jeg opfordrer dig til at læse den officielle dokumentation for at få mere indsigt.

En klynge til matematiske beregninger

Lad os forestille os en brugssagsscenarie. Antag at designe et system til at udføre matematiske beregninger efter anmodning. Systemet implementeres online, så det har brug for en REST API for at modtage beregningsanmodningerne. En intern processor håndterer disse anmodninger, udfører beregningen og returnerer resultatet.

Lige nu kan processoren kun beregne Fibonacci-nummeret. Vi beslutter at bruge en klynge af noder til at fordele belastningen blandt noderne og forbedre ydeevnen. Akka Cluster håndterer klyngedynamik og belastningsbalancering mellem noder. Ok lyder godt!

Skuespillerhierarki

Første ting først: vi er nødt til at definere vores aktørhierarki. Systemet kan opdeles i tre funktionelle dele: forretningslogikken , klyngestyringen og selve noden . Der er også serveren, men den er ikke en skuespiller, og vi vil arbejde på det senere.

Forretningslogik

Applikationen skal foretage matematiske beregninger. Vi kan definere en simpel Processoraktør til at styre alle beregningsopgaver. Hver beregning, som vi understøtter, kan implementeres i en bestemt aktør, der vil være et af barnets Processor. På denne måde er applikationen modulær og lettere at udvide og vedligeholde. Lige nu vil det eneste barn Processorvære ProcessorFibonacciskuespilleren. Jeg formoder, at du kan gætte, hvad dens opgave er. Dette skal være nok til at starte.

Klyngestyring

For at styre klyngen har vi brug for en ClusterManager. Det lyder simpelt, ikke? Denne skuespiller håndterer alt relateret til klyngen, som at returnere sine medlemmer, når de bliver spurgt. Det ville være nyttigt at logge, hvad der sker inde i klyngen, så vi definerer en ClusterListenerskuespiller. Dette er et barn af ClusterManagerog abonnerer på klyngebegivenheder, der logger dem.

Node

Den Nodeskuespiller er roden til vores hierarki. Det er indgangsstedet for vores system, der kommunikerer med API'en. Det Processorog det ClusterManagerer dets børn sammen med ProcessorRouterskuespilleren. Dette er systemets belastningsafbalancering, der fordeler belastningen på Processors. Vi konfigurerer den som en Cluster Aware Router, så alle ProcessorRouterkan sende beskeder til Processors på hver node.

Skuespillerimplementering

Tid til at implementere vores skuespillere! Knytnæve implementerer vi de aktører, der er relateret til systemets forretningslogik. Vi fortsætter derefter skuespillerne til klyngestyring og rodaktøren ( Node) til sidst.

ProcessorFibonacci

Denne aktør udfører beregningen af ​​Fibonacci-nummeret. Den modtager en Computebesked, der indeholder nummeret, der skal beregnes, og referencen fra skuespilleren, der skal svares på. Henvisningen er vigtig, da der kan være forskellige anmodende aktører. Husk, at vi arbejder i et distribueret miljø!

Når Computemeddelelsen er modtaget, fibonacciberegner funktionen resultatet. Vi pakker det ind i et ProcessorResponseobjekt for at give oplysninger om den node, der udførte beregningen. Dette vil være nyttigt senere for at se round-robin-politikken i aktion.

Resultatet sendes derefter til den skuespiller, som vi skal svare på. Easy-peasy.

object ProcessorFibonacci { sealed trait ProcessorFibonacciMessage case class Compute(n: Int, replyTo: ActorRef) extends ProcessorFibonacciMessage def props(nodeId: String) = Props(new ProcessorFibonacci(nodeId)) def fibonacci(x: Int): BigInt = { @tailrec def fibHelper(x: Int, prev: BigInt = 0, next: BigInt = 1): BigInt = x match { case 0 => prev case 1 => next case _ => fibHelper(x - 1, next, next + prev) } fibHelper(x) } } class ProcessorFibonacci(nodeId: String) extends Actor { import ProcessorFibonacci._ override def receive: Receive = { case Compute(value, replyTo) => { replyTo ! ProcessorResponse(nodeId, fibonacci(value)) } } }

Processor

Den Processorskuespiller styrer de specifikke sub-processorer, som Fibonacci én. Det skal instantiere underbehandlerne og videresende anmodningerne til dem. Lige nu har vi kun én sub-processor, så den Processorfår en slags besked: ComputeFibonacci. Denne meddelelse indeholder det Fibonacci-nummer, der skal beregnes. Når det er modtaget, sendes det nummer, der skal beregnes, til a FibonacciProcessorsammen med henvisningen til sender().

object Processor { sealed trait ProcessorMessage case class ComputeFibonacci(n: Int) extends ProcessorMessage def props(nodeId: String) = Props(new Processor(nodeId)) } class Processor(nodeId: String) extends Actor { import Processor._ val fibonacciProcessor: ActorRef = context.actorOf(ProcessorFibonacci.props(nodeId), "fibonacci") override def receive: Receive = { case ComputeFibonacci(value) => { val replyTo = sender() fibonacciProcessor ! Compute(value, replyTo) } } }

ClusterListener

Vi vil gerne logge nyttige oplysninger om, hvad der sker i klyngen. Dette kan hjælpe os med at debugge systemet, hvis vi har brug for det. Dette er ClusterListenerskuespillerens formål. Før du abonnerer, abonnerer den på klyngens hændelsesmeddelelser. Skuespilleren reagerer på meddelelser gerne MemberUp, UnreachableMembereller MemberRemoved, logger den tilsvarende begivenhed. Når ClusterListenerstoppes, afmelder den sig fra klyngebegivenhederne.

object ClusterListener { def props(nodeId: String, cluster: Cluster) = Props(new ClusterListener(nodeId, cluster)) } class ClusterListener(nodeId: String, cluster: Cluster) extends Actor with ActorLogging { override def preStart(): Unit = { cluster.subscribe(self, initialStateMode = InitialStateAsEvents, classOf[MemberEvent], classOf[UnreachableMember]) } override def postStop(): Unit = cluster.unsubscribe(self) def receive = { case MemberUp(member) => log.info("Node {} - Member is Up: {}", nodeId, member.address) case UnreachableMember(member) => log.info(s"Node {} - Member detected as unreachable: {}", nodeId, member) case MemberRemoved(member, previousStatus) => log.info(s"Node {} - Member is Removed: {} after {}", nodeId, member.address, previousStatus) case _: MemberEvent => // ignore } }

ClusterManager

Den aktør, der er ansvarlig for ledelsen af ​​klyngen er ClusterManager. Det opretter ClusterListenerskuespilleren og leverer listen over klyngemedlemmer efter anmodning. Det kunne udvides for at tilføje flere funktioner, men lige nu er det nok.

object ClusterManager { sealed trait ClusterMessage case object GetMembers extends ClusterMessage def props(nodeId: String) = Props(new ClusterManager(nodeId)) } class ClusterManager(nodeId: String) extends Actor with ActorLogging { val cluster: Cluster = Cluster(context.system) val listener: ActorRef = context.actorOf(ClusterListener.props(nodeId, cluster), "clusterListener") override def receive: Receive = { case GetMembers => { sender() ! cluster.state.members.filter(_.status == MemberStatus.up) .map(_.address.toString) .toList } } }

ProcessorRouter

The load-balancing among processors is handled by the ProcessorRouter. It is created by the Node actor, but this time all the required information are provided in the configuration of the system.

class Node(nodeId: String) extends Actor { //... val processorRouter: ActorRef = context.actorOf(FromConfig.props(Props.empty), "processorRouter") //... }

Let’s analyse the relevant part in the application.conf file.

akka { actor { ... deployment { /node/processorRouter { router = round-robin-group routees.paths = ["/user/node/processor"] cluster { enabled = on allow-local-routees = on } } } } ... }

The first thing is to specify the path to the router actor, that is /node/processorRouter. Inside that property we can configure the behaviour of the router:

  • router: this is the policy for the load balancing of messages. I chose the round-robin-group, but there are many others.
  • routees.paths: these are the paths to the actors that will receive the messages handled by the router. We are saying: “When you receive a message, look for the actors corresponding to these paths. Choose one according to the policy and forward the message to it.” Since we are using Cluster Aware Routers, the routees can be on any node of the cluster.
  • cluster.enabled: are we operating in a cluster? The answer is on, of course!
  • cluster.allow-local-routees: here we are allowing the router to choose a routee in its node.

Using this configuration we can create a router to load balance the work among our processors.

Node

The root of our actor hierarchy is the Node. It creates the children actors — ClusterManager, Processor, and ProcessorRouter — and forwards the messages to the right one. Nothing complex here.

object Node { sealed trait NodeMessage case class GetFibonacci(n: Int) case object GetClusterMembers def props(nodeId: String) = Props(new Node(nodeId)) } class Node(nodeId: String) extends Actor { val processor: ActorRef = context.actorOf(Processor.props(nodeId), "processor") val processorRouter: ActorRef = context.actorOf(FromConfig.props(Props.empty), "processorRouter") val clusterManager: ActorRef = context.actorOf(ClusterManager.props(nodeId), "clusterManager") override def receive: Receive = { case GetClusterMembers => clusterManager forward GetMembers case GetFibonacci(value) => processorRouter forward ComputeFibonacci(value) } }

Server and API

Every node of our cluster runs a server able to receive requests. The Server creates our actor system and is configured through the application.conf file.

object Server extends App with NodeRoutes { implicit val system: ActorSystem = ActorSystem("cluster-playground") implicit val materializer: ActorMaterializer = ActorMaterializer() val config: Config = ConfigFactory.load() val address = config.getString("http.ip") val port = config.getInt("http.port") val nodeId = config.getString("clustering.ip") val node: ActorRef = system.actorOf(Node.props(nodeId), "node") lazy val routes: Route = healthRoute ~ statusRoutes ~ processRoutes Http().bindAndHandle(routes, address, port) println(s"Node $nodeId is listening at //$address:$port") Await.result(system.whenTerminated, Duration.Inf) }

Akka HTTP powers the server itself and the REST API, exposing three simple endpoints. These endpoints are defined in the NodeRoutes trait.

The first one is /health, to check the health of a node. It responds with a 200 OK if the node is up and running

lazy val healthRoute: Route = pathPrefix("health") { concat( pathEnd { concat( get { complete(StatusCodes.OK) } ) } ) }

The /status/members endpoint responds with the current active members of the cluster.

lazy val statusRoutes: Route = pathPrefix("status") { concat( pathPrefix("members") { concat( pathEnd { concat( get { val membersFuture: Future[List[String]] = (node ? GetClusterMembers).mapTo[List[String]] onSuccess(membersFuture) { members => complete(StatusCodes.OK, members) } } ) } ) } ) }

The last (but not the least) is the /process/fibonacci/n endpoint, used to request the Fibonacci number of n.

lazy val processRoutes: Route = pathPrefix("process") { concat( pathPrefix("fibonacci") { concat( path(IntNumber) { n => pathEnd { concat( get { val processFuture: Future[ProcessorResponse] = (node ? GetFibonacci(n)).mapTo[ProcessorResponse] onSuccess(processFuture) { response => complete(StatusCodes.OK, response) } } ) } } ) } ) }

It responds with a ProcessorResponse containing the result, along with the id of the node where the computation took place.

Cluster Configuration

Once we have all our actors, we need to configure the system to run as a cluster! The application.conf file is where the magic takes place. I’m going to split it in pieces to present it better, but you can find the complete file here.

Let’s start defining some useful variables.

clustering { ip = "127.0.0.1" ip = ${?CLUSTER_IP} port = 2552 port = ${?CLUSTER_PORT} seed-ip = "127.0.0.1" seed-ip = ${?CLUSTER_SEED_IP} seed-port = 2552 seed-port = ${?CLUSTER_SEED_PORT} cluster.name = "cluster-playground" }

Here we are simply defining the ip and port of the nodes and the seed, as well as the cluster name. We set a default value, then we override it if a new one is specified. The configuration of the cluster is the following.

akka { actor { provider = "cluster" ... /* router configuration */ ... } remote { log-remote-lifecycle-events = on netty.tcp { hostname = ${clustering.ip} port = ${clustering.port} } } cluster { seed-nodes = [ "akka.tcp://"${clustering.cluster.name}"@"${clustering.seed-ip}":"${clustering.seed-port} ] auto-down-unreachable-after = 10s } } ... /* server vars */ ... /* cluster vars */ }

Akka Cluster is build on top of Akka Remoting, so we need to configure it properly. First of all, we specify that we are going to use Akka Cluster saying that provider = "cluster". Then we bind cluster.ip and cluster.port to the hostname and port of the netty web framework.

The cluster requires some seed nodes as its entry points. We set them in the seed-nodes array, in the format akka.tcp://"{clustering.cluster.name}"@"{clustering.seed-ip}":”${clustering.seed-port}”. Right now we have one seed node, but we may add more later.

The auto-down-unreachable-after property sets a member as down after it is unreachable for a period of time. This should be used only during development, as explained in the official documentation.

Ok, the cluster is configured, we can move to the next step: Dockerization and deployment!

Dockerization and deployment

To create the Docker container of our node we can use sbt-native-packager. Its installation is easy: add addSbtPlugin("com.typesafe.sbt" % "sbt-native-packager" % "1.3.15") to the plugin.sbt file in the project/ folder. This amazing tool has a plugin for the creation of Docker containers. it allows us to configure the properties of our Dockerfile in the build.sbt file.

// other build.sbt properties enablePlugins(JavaAppPackaging) enablePlugins(DockerPlugin) enablePlugins(AshScriptPlugin) mainClass in Compile := Some("com.elleflorio.cluster.playground.Server") dockerBaseImage := "java:8-jre-alpine" version in Docker := "latest" dockerExposedPorts := Seq(8000) dockerRepository := Some("elleflorio")

Once we have setup the plugin, we can create the docker image running the command sbt docker:publishLocal. Run the command and taste the magic… ?

We have the Docker image of our node, now we need to deploy it and check that everything works fine. The easiest way is to create a docker-compose file that will spawn a seed and a couple of other nodes.

version: '3.5' networks: cluster-network: services: seed: networks: - cluster-network image: elleflorio/akka-cluster-playground ports: - '2552:2552' - '8000:8000' environment: SERVER_IP: 0.0.0.0 CLUSTER_IP: seed CLUSTER_SEED_IP: seed node1: networks: - cluster-network image: elleflorio/akka-cluster-playground ports: - '8001:8000' environment: SERVER_IP: 0.0.0.0 CLUSTER_IP: node1 CLUSTER_PORT: 1600 CLUSTER_SEED_IP: seed CLUSTER_SEED_PORT: 2552 node2: networks: - cluster-network image: elleflorio/akka-cluster-playground ports: - '8002:8000' environment: SERVER_IP: 0.0.0.0 CLUSTER_IP: node2 CLUSTER_PORT: 1600 CLUSTER_SEED_IP: seed CLUSTER_SEED_PORT: 2552

I won’t spend time going through it, since it is quite simple.

Let’s run it!

Time to test our work! Once we run the docker-compose up command, we will have a cluster of three nodes up and running. The seed will respond to requests at port :8000, while node1 and node2 at port :8001 and :8002. Play a bit with the various endpoints. You will see that the requests for a Fibonacci number will be computed by a different node each time, following a round-robin policy. That’s good, we are proud of our work and can get out for a beer to celebrate! ?

Conclusion

We are done here! We learned a lot of things in these ten minutes:

  • What Akka Cluster is and what can do for us.
  • How to create a distributed application with it.
  • How to configure a Group Router for load-balancing in the cluster.
  • How to Dockerize everything and deploy it using docker-compose.

Du kan finde den komplette ansøgning i min GitHub repo. Du er velkommen til at bidrage eller lege med det, som du vil! ?

Vi ses! ?