Dyb ned i Spark internals og arkitektur

Apache Spark er en open source-distribueret klyngecomputerramme til almindeligt formål. En gnistapplikation er en JVM-proces, der kører en brugerkode, der bruger gnisten som et tredjepartsbibliotek.

Som en del af denne blog vil jeg vise, hvordan Spark fungerer på garnarkitektur med et eksempel og de forskellige underliggende baggrundsprocesser, der er involveret, såsom:

  • Gnistkontekst
  • Yarn Resource Manager, Application Master & lancering af eksekutorer (containere).
  • Opsætning af miljøvariabler, jobressourcer.
  • CoarseGrainedExecutorBackend & Netty-baseret RPC.
  • SparkListeners.
  • Udførelse af et job (Logisk plan, Fysisk plan).
  • Spark-WebUI.

Gnistkontekst

Gnistkontekst er det første indgangsniveau og hjertet i enhver gnistapplikation. Spark-shell er intet andet end en Scala-baseret REPL med gnistbinarier, der skaber et objekt sc kaldet gnistkontekst.

Vi kan starte gnistskallen som vist nedenfor:

spark-shell --master yarn \ --conf spark.ui.port=12345 \ --num-executors 3 \ --executor-cores 2 \ --executor-memory 500M

Som en del af gnistskallen har vi nævnt antallet af eksekutorer. De angiver antallet af arbejdsknudepunkter, der skal bruges, og antallet af kerner for hver af disse arbejdsknudepunkter til at udføre opgaver parallelt.

Eller du kan starte gnistskal ved hjælp af standardkonfigurationen.

spark-shell --master yarn

Konfigurationerne er til stede som en del af spark-env.sh

Vores driverprogram udføres på Gateway-noden, hvilket kun er en gnistskal. Det vil skabe en gnistkontekst og starte en applikation.

Du kan få adgang til gnistkontekstobjektet ved hjælp af sc.

Når gnistkonteksten er oprettet, venter den på ressourcerne. Når ressourcerne er tilgængelige, opretter Spark-kontekst interne tjenester og opretter en forbindelse til et Spark-eksekveringsmiljø.

Yarn Resource Manager, Application Master & lancering af eksekutorer (containere).

Når gnistkonteksten er oprettet, vil den tjekke med Cluster Manager og starte Application Master, dvs. starte en container og registrere signalhåndterere .

Når Application Master er startet, opretter den en forbindelse til driveren.

Derefter udløser ApplicationMasterEndPoint et proxy-program til at oprette forbindelse til ressourcehåndteringen.

Nu udfører garnbeholderen nedenstående operationer som vist i diagrammet.

ii) YarnRMClient registrerer sig hos Application Master.

iii) YarnAllocator: Vil anmode om 3 eksekutorcontainere, hver med 2 kerner og 884 MB hukommelse inklusive 384 MB overhead

iv) AM starter reportertråden

Nu modtager Yarn Allocator tokens fra Driver for at starte Executor-noder og starte containerne.

Opsætning af miljøvariabler, jobressourcer og lancering af containere.

Hver gang en container lanceres, gør den følgende 3 ting i hver af disse.

  • Opsætning af env-variabler

Spark Runtime Environment (SparkEnv) er runtime-miljøet med Sparks tjenester, der bruges til at interagere med hinanden for at etablere en distribueret computerplatform til en Spark-applikation.

  • Opsætning af jobressourcer
  • Lancering af container

YARN-eksekveringsstartkontekst tildeler hver eksekutor et eksekverings-id til at identificere den tilsvarende eksekutor (via Spark WebUI) og starter en CoarseGrainedExecutorBackend.

CoarseGrainedExecutorBackend & Netty-baseret RPC.

Efter at have opnået ressourcer fra Resource Manager, ser vi eksekutoren starte op

CoarseGrainedExecutorBackend er en ExecutorBackend, der styrer livscyklussen for en enkelt eksekutor. Det sender eksekutorens status til chaufføren.

Når ExecutorRunnable startes, registrerer CoarseGrainedExecutorBackend Executor RPC-slutpunktet og signalbehandlerne til at kommunikere med driveren (dvs. med CoarseGrainedScheduler RPC-slutpunktet) og informere om, at det er klar til at starte opgaver.

Netty-baseret RPC - Den bruges til at kommunikere mellem arbejdsknudepunkter, gnistkontekst, eksekutorer.

NettyRPCEndPoint bruges til at spore resultatstatus for arbejdsknudepunktet.

RpcEndpointAddress er den logiske adresse for et slutpunkt, der er registreret i et RPC-miljø med RpcAddress og navn.

Det er i formatet som vist nedenfor:

Dette er første øjeblik, når CoarseGrainedExecutorBackend initierer kommunikation med den driver, der er tilgængelig på driverUrl gennem RpcEnv.

SparkListeners

SparkListener (Scheduler-lytter) er en klasse, der lytter til eksekveringshændelser fra Sparks DAGScheduler og logger alle hændelsesoplysninger for et program, såsom eksekvereren, detaljer om driverallokering sammen med job, faser og opgaver og andre miljøegenskabsændringer.

SparkContext starter LiveListenerBus, der er inde i driveren. Det registrerer JobProgressListener med LiveListenerBus, som indsamler alle data for at vise statistikken i gnist-brugergrænsefladen.

Som standard er kun lytteren til WebUI aktiveret, men hvis vi vil tilføje andre lyttere, kan vi bruge spark.extraListeners.

Spark leveres med to lyttere, der viser de fleste af aktiviteterne

i) StatsReportListener

ii) EventLoggingListener

EventLoggingListener:Hvis du vil analysere ydeevnen for dine applikationer yderligere ud over, hvad der er tilgængeligt som en del af Spark-historienserveren, kan du behandle hændelseslogdataene. Spark Event Log registrerer info om behandlede job / faser / opgaver. Det kan aktiveres som vist nedenfor ...

Hændelseslogfilen kan læses som vist nedenfor

  • Spark-driveren logger ind på jobbelastning / perf-metrics i kataloget spark.evenLog.dir som JSON-filer.
  • Der er én fil pr. Applikation, filnavne indeholder applikations-id (derfor inklusive et tidsstempel) application_1540458187951_38909.

Det viser hændelsestypen og antallet af poster for hver.

Lad os nu tilføje StatsReportListener til gnisten.extraListenersog kontrollere status for jobbet.

Aktivér INFO-logningsniveau for org.apache.spark.scheduler.StatsReportListener-logger for at se Spark-begivenheder.

For at aktivere lytteren registrerer du den i SparkContext. Det kan gøres på to måder.

i) Brug af SparkContext.addSparkListener (lytter: SparkListener) -metoden i din Spark-applikation.

Klik på linket for at implementere brugerdefinerede lyttere - CustomListener

ii) Brug af kommandolinjemuligheden conf

Lad os læse en prøvefil og udføre en optælling for at se StatsReportListener.

Udførelse af et job (Logisk plan, Fysisk plan).

I Spark er RDD ( elastisk distribueret datasæt ) det første niveau i abstraktionslaget. Det er en samling af elementer fordelt på tværs af knudepunkterne i klyngen, der kan betjenes parallelt. RDD'er kan oprettes på 2 måder.

i) Foretage en eksisterende samling i dit driverprogram

ii) Henvisning til et datasæt i et eksternt lagersystem

RDD'er oprettes enten ved hjælp af en fil i Hadoop-filsystemet eller en eksisterende Scala-samling i driverprogrammet og transformerer den.

Lad os tage et prøveuddrag som vist nedenfor

Udførelsen af ​​ovenstående uddrag finder sted i 2 faser.

6.1 Logisk plan: I denne fase oprettes en RDD ved hjælp af et sæt transformationer.Det holder styr på disse transformationer i driverprogrammet ved at opbygge en computerkæde (en række RDD) som en graf af transformationer for at producere en RDD kaldet en Linjegraf .

Transformationer kan yderligere opdeles i 2 typer

  • Smal transformation: En pipeline af operationer, der kan udføres som et trin og ikke kræver, at dataene blandes på tværs af partitionerne - for eksempel kort, filter osv.

Nu læses dataene ind i driveren ved hjælp af udsendelsesvariablen.

  • Bred transformation: Her kræver hver operation, at dataene blandes, fremover for hver brede transformation oprettes et nyt trin - for eksempel reducereByKey osv.

Vi kan se linjegrafen ved hjælp af toDebugString

6.2 Fysisk plan:I denne fase, når vi udløser en handling på RDD, ser DAG Scheduler på RDD-afstamning og kommer med den bedste udførelsesplan med faser og opgaver sammen med TaskSchedulerImpl og udfører jobbet i et sæt opgaver parallelt.

Når vi først har udført en handling, udløser SparkContext et job og registrerer RDD indtil den første fase (dvs. inden nogen brede transformationer) som en del af DAGScheduler.

Inden det går videre til næste trin (Wide transformations), vil det kontrollere, om der er partitionsdata, der skal blandes, og om det har manglende overordnede operationelle resultater, som det afhænger af, hvis et sådant trin mangler, så gentager det udfører den del af operationen ved at bruge DAG (Directed Acyclic Graph), som gør den til fejltolerant.

I tilfælde af manglende opgaver tildeler det opgaver til eksekutører.

Hver opgave tildeles eksekutøren CoarseGrainedExecutorBackend.

Det får blokoplysninger fra Namenode.

nu udfører den beregningen og returnerer resultatet.

Derefter ser DAGScheduler efter de nyligt kørbare etaper og udløser den næste etape (reduceByKey) -operation.

ShuffleBlockFetcherIterator får blokke til at blive blandet.

Nu er reduceringsoperationen opdelt i 2 opgaver og udført.

Efter afslutningen af ​​hver opgave returnerer eksekutøren resultatet tilbage til driveren.

Når jobbet er afsluttet, vises resultatet.

Spark-WebUI

Spark-UI hjælper med at forstå strømmen af ​​kodeudførelse og den tid, det tager at fuldføre et bestemt job. Visualiseringen hjælper med at finde ud af eventuelle underliggende problemer, der finder sted under udførelsen og optimere gnistapplikationen yderligere.

Vi vil se Spark-UI-visualiseringen som en del af det foregående trin 6.

Når jobbet er afsluttet, kan du se jobdetaljerne, f.eks. Antallet af faser, antallet af opgaver, der blev planlagt under jobjobsførelsen af ​​et job.

Når du klikker på de afsluttede job, kan vi se DAG-visualiseringen, dvs. de forskellige brede og smalle transformationer som en del af det.

Du kan se udførelsestiden taget af hvert trin.

Når du klikker på et bestemt trin som en del af jobbet, viser det de komplette detaljer om, hvor datablokkene befinder sig, datastørrelse, den anvendte eksekutor, den anvendte hukommelse og den tid, det tager at gennemføre en bestemt opgave. Det viser også antallet af blandinger, der finder sted.

Desuden kan vi klikke på fanen Eksekutører for at se den anvendte eksekutor og driver.

Nu hvor vi har set, hvordan Spark fungerer internt, kan du bestemme strømmen af ​​udførelse ved at bruge Spark UI, logfiler og finjustere Spark EventListeners for at bestemme den optimale løsning ved indsendelse af et Spark-job.

Bemærk : Kommandoerne, der blev udført relateret til dette indlæg, tilføjes som en del af min GIT-konto.

På samme måde kan du også læse mere her:

  • Sqoop-arkitektur i dybden med kode.
  • HDFS-arkitektur i dybden med kode .
  • Hive Arkitektur i dybden med kode .

Hvis du også vil, kan du oprette forbindelse til mig på LinkedIn - Jayvardhan Reddy.

Hvis du nød at læse det, kan du klikke på klappen og lade andre vide om det. Hvis du vil have mig til at tilføje noget andet, er du velkommen til at efterlade et svar?