Lærte erfaringer under behandling af Wikipedia med Apache Spark

Apache Spark er en open source fejltolerant klyngecomputer-ramme, der også understøtter SQL-analyse, maskinindlæring og grafbehandling.

Det fungerer ved at opdele dine data i partitioner og derefter behandle disse partitioner parallelt på alle knudepunkterne i klyngen. Hvis en hvilken som helst node går ned, tildeler den denne opgave til en anden node og giver dermed fejltolerance.

At være 100 gange hurtigere end Hadoop har gjort det enormt populært til Big Data-behandling. Spark er skrevet i Scala og kører på JVM, men den gode nyhed er, at det også giver API'er til Python og R samt C #. Det er veldokumenteret med eksempler, som du bør tjekke ud.

Når du er klar til at prøve det, vil denne artikel guide dig fra download og opsætning til performance tuning. Min lille gnistklynge udførte 100 millioner strengekampe over alle artiklerne i Wikipedia - på mindre end to timer.

Det er når du kommer forbi vejledningerne og laver noget seriøst arbejde, at du indser alle besværet med den tekniske stak, du bruger. At lære gennem fejl er den bedste måde at lære på. Men nogle gange har du bare kort tid og ønsker at du vidste alle mulige ting, der kunne gå galt.

Her beskriver jeg nogle af de problemer, jeg stod overfor, da jeg startede med Spark, og hvordan du kan undgå dem.

Sådan kommer du i gang

Download Spark-binærprogrammet, der kommer med pakkede Hadoop-afhængigheder

Hvis du planlægger at downloade Spark, vil du bemærke, at der findes forskellige binære filer til den samme version. Spark annoncerer, at det ikke har brug for Hadoop, så du kan downloade den bruger-forudsatte-hadoop-version, som er mindre i størrelse. Gør ikke det .

Selvom Spark ikke bruger Hadoop's MapReduce-ramme, har den afhængighed af andre Hadoop-biblioteker som HDFS og YARN. Den uden hadoop-version er til når du allerede har Hadoop-biblioteker leveret andetsteds.

Brug den enkeltstående klyngetilstand, ikke Mesos eller YARN

Når du tester de indbyggede eksempler på localklyngen og sørger for, at alt er installeret og fungerer korrekt, skal du fortsætte med at konfigurere din klynge.

Spark giver dig tre muligheder: Mesos, GARN og standalone.

De to første er ressourceallokatorer, der styrer dine replikanoder. Spark skal anmode dem om at tildele sine egne forekomster. Som nybegynder må du ikke øge din kompleksitet ved at gå den vej.

Den enkeltstående klynge er den nemmeste at opsætte. Det kommer med fornuftige standardindstillinger, som at bruge alle dine kerner til eksekutorer. Det er en del af selve Spark-distributionen og har et sbin/start-all.shscript, der kan hente den primære såvel som alle dine replikaer, der er angivet i conf/slavesved hjælp af ssh.

Mesos / YARN er separate programmer, der bruges, når din klynge ikke kun er en gnistklynge. De kommer heller ikke med fornuftige standardindstillinger: eksekutører bruger ikke alle kerner på replikerne, medmindre det er udtrykkeligt angivet.

Du har også muligheden for en høj tilgængelighedstilstand ved hjælp af Zookeeper, som holder en liste over sikkerhedskopieringsprimaries, hvis eventuelle primære fejl mislykkes. Hvis du er nybegynder, håndterer du sandsynligvis ikke en klynge med tusind knudepunkter, hvor risikoen for knudepunktfejl er betydelig. Du er mere tilbøjelige til at oprette en klynge på en administreret skyplatform som Amazons eller Googles, som allerede tager sig af nodefejl.

Du har ikke brug for høj tilgængelighed med skyinfrastruktur eller en lille klynge

Jeg fik min klynge oprettet i et fjendtligt miljø, hvor menneskelige faktorer var ansvarlige for strømsvigt og noder, der gik ud af nettet. (Dybest set er mit computerlaboratorium, hvor flittige studerende slukker for maskinen, og skødesløse studerende trækker LAN-kabler ud). Jeg kunne stadig trække mig uden høj tilgængelighed ved omhyggeligt valg af den primære knude. Du behøver ikke bekymre dig om det.

Tjek den Java-version, du bruger til at køre Spark

Et meget vigtigt aspekt er den Java-version, du bruger til at køre Spark. Normalt fungerer en senere version af Java med noget kompileret til ældre udgivelser.

Men med Project Jigsaw introducerede modularitet strengere isolering og grænser i Java 9, som bryder visse ting, der bruger refleksion. På Spark 2.3.0, der kørte på Java 9, fik jeg ulovlig refleksionsadgang. Java 8 havde ingen problemer.

Dette vil helt sikkert ændre sig i den nærmeste fremtid, men husk det indtil da.

Angiv den primære URL nøjagtigt som den er. Opløs ikke domænenavne til IP-adresser eller omvendt

Den uafhængige klynge er meget følsom over for webadresser, der bruges til at løse primære og replikanoder. Antag at du starter den primære knude som nedenfor:

> sbin/start-master.sh 

og din primære er oppe kl localhost:8080

Som standard vælges din pcs værtsnavn som den primære URL-adresse. x360løser det, localhostmen at starte en replika som nedenfor fungerer ikke.

# does not work > sbin/start-slave.sh spark://localhost:7077 
# works > sbin/start-slave.sh spark://x360:7077

Dette fungerer, og vores replika er blevet føjet til klyngen:

Vores replika har en IP-adresse i underdomænet 172.17.xx, som faktisk er det underdomæne, der er oprettet af Docker på min maskine.

Primæren kan kommunikere med denne replika, fordi begge er på samme maskine. Men repliken kan ikke kommunikere med andre replikaer på netværket eller en primær på en anden maskine, fordi dens IP-adresse ikke kan dirigeres.

Som i det primære tilfælde ovenfor vil en replika på en maskine uden primær optage maskinens værtsnavn. Når du har identiske maskiner, ender de alle med det samme værtsnavn som deres adresse. Dette skaber et totalt rod, og ingen kan kommunikere med den anden.

Så ovenstående kommandoer ændres til:

# start master> sbin/start-master.sh -h $myIP # start slave > sbin/start-slave.sh -h $myIP spark://:7077 # submit a job > SPARK_LOCAL_IP=$myIP bin/spark-submit ...

hvor myIPer maskinens IP-adresse, der kan dirigeres mellem klyngenoder. Det er mere sandsynligt, at alle noder er på det samme netværk, så du kan skrive et script, der indstilles myIPpå hver maskine.

# assume all nodes in the 10.1.26.x subdomain [email protected]:~$ myIP=`hostname -I | tr " " "\n" | grep 10.1.26. | head`

Flow af koden

Indtil videre har vi oprettet vores klynge og set, at den er funktionel. Nu er det tid til at kode. Spark er ret veldokumenteret og leveres med mange eksempler, så det er meget let at komme i gang med kodning. Hvad der er mindre indlysende er, hvordan det hele fungerer, hvilket resulterer i nogle meget svære at fejle fejl under løbetiden. Antag at du kodede noget som dette:

class SomeClass { static SparkSession spark; static LongAccumulator numSentences; 
 public static void main(String[] args) { spark = SparkSession.builder() .appName("Sparkl") .getOrCreate(); (1) numSentences = spark.sparkContext() .longAccumulator("sentences"); (2) spark.read() .textFile(args[0]) .foreach(SomeClass::countSentences); (3) } static void countSentences(String s) { numSentences.add(1); } (4) }

1 opret en gnistsession

2 Opret en lang tæller for at holde styr på jobfremskridt

3 krydser en fil linje for linje, der kalder countSentences for hver linje

4 tilføj 1 til akkumulatoren for hver sætning

Ovenstående kode fungerer på en localklynge, men mislykkes med en nul pointer-undtagelse, når den køres på en multinode-klynge. Både sparksåvel som numSentencesvil være nul på replika-maskinen.

For at løse dette problem skal du indkapsle alle initialiserede tilstande i ikke-statiske felter på et objekt. Brug maintil at oprette objektet og udsætte yderligere behandling til det.

Hvad du har brug for at forstå er, at koden, du skriver, køres af driverknudepunktet nøjagtigt som det er, men hvad replikanoderne udfører, er et serieljob, som gnisten giver dem. Dine klasser indlæses af JVM på repliken.

Statiske initialiserere kører som forventet, men fungerer som mainikke, så statiske værdier initialiseret i driveren kan ikke ses i repliken. Jeg er ikke sikker på, hvordan det hele fungerer, og udleder kun af erfaring, så tag min forklaring med et saltkorn. Så din kode ser nu ud:

class SomeClass { SparkSession spark; (1) LongAccumulator numSentences; String[] args; SomeClass(String[] args) { this.args = args; } public static void main(String[] args){ new SomeClass(args).process(); (2) } void process() { spark = SparkSession.builder().appName("Sparkl").getOrCreate(); numSentences = spark.sparkContext().longAccumulator("sentences"); spark.read().textFile(args[0]).foreach(this::countSentences); (3) } void countSentences(String s) { numSentences.add(1); }}

1 Gør felter ikke statiske

2 Opret forekomst af klassen og udfør derefter gnistjob

3 henvisning til thisi forgrunden lambda bringer objektet i lukningen af ​​tilgængelige objekter og bliver således seriel og sendt til alle replikaer.

De af jer, der programmerer i Scala, bruger muligvis Scala-objekter, der er singleton-klasser, og derfor måske aldrig kommer på tværs af dette problem. Ikke desto mindre er det noget, du bør vide.

Indsend app og afhængigheder

Der er mere ved kodning ovenfor, men før det skal du indsende din ansøgning til klyngen. Medmindre din app er ekstremt triviel, er chancerne for, at du bruger eksterne biblioteker.

Når du sender din app jar, skal du også fortælle Spark de afhængige biblioteker, du bruger, så det gør dem tilgængelige på alle noder. Det er ret ligetil. Syntaksen er:

bin/spark-submit --packages groupId:artifactId:version,...

I have had no issues with this scheme. It works flawlessly. I generally develop on my laptop and then submit jobs from a node on the cluster. So I need to transfer the app and its dependencies to whatever node I ssh into.

Spark looks for dependencies in the local maven repo, then the central repo and any repos you specify using --repositories option. It is a little cumbersome to sync all that on the driver and then type out all those dependencies on the command line. So I prefer all dependencies packaged in a single jar, called an uber jar.

Use Maven shade plugin to generate an uber jar with all dependencies so job submitting becomes easier

Just include the following lines in your pom.xml

   org.apache.maven.plugins maven-shade-plugin  shade      

When you build and package your project, the default distribution jar will have all dependencies included.

As you submit jobs, the application jars get accumulated in the work directory and fill up over time.

Set spark.worker.cleanup.enabled to true in conf/spark-defaults.conf

This option is false by default and is applicable to the stand-alone mode.

Input and Output files

This was the most confusing part that was difficult to diagnose.

Spark supports reading/writing of various sources such as hdfs, ftp, jdbc or local files on the system when the protocol is file:// or missing. My first attempt was to read from a file on my driver. I assumed that the driver would read the file, turn it into partitions, and then distribute those across the cluster. Turns out it doesn’t work that way.

When you read a file from the local filesystem, ensure that the file is present on all the worker nodes at exactly the same location. Spark does not implicitly distribute files from the driver to the workers.

So I had to copy the file to every worker at the same location. The location of the file was passed as an argument to my app. Since the file was located in the parent folder, I specified its path as ../wikiArticles.txt. This did not work on the worker nodes.

Always pass absolute file paths for reading

It could be a mistake from my side, but I know that the filepath made it as is into the textFile function and it caused “file not found” errors.

Spark supports common compression schemes, so most gzipped or bzipped text files will be uncompressed before use. It might seem that compressed files will be more efficient, but do not fall for that trap.

Don’t read from compressed text files, especially gzip. Uncompressed files are faster to process.

Gzip cannot be uncompressed in parallel like bzip2, so nodes spend the bulk of their time uncompressing large files.

It is a hassle to make the input files available on all workers. You can instead use Spark’s file broadcast mechanism. When submitting a job, specify a comma separated list of input files with the --files option. Accessing these files requires SparkFiles.get(filename). I could not find enough documentation on this feature.

To read a file broadcasted with the --files option, use SparkFiles.get( h>) as the pathname in read functions.

So a file submitted as --files /opt/data/wikiAbstracts.txt would be accesed as SparkFiles.get("WikiAbstracts.txt"). This returns a string which you can use in any read function that expects a path. Again, remember to specify absolute paths.

Since my input file was 5GB gzipped, and my network was quite slow at 12MB/s, I tried to use Spark’s file broadcast feature. But the decompression itself was taking so long that I manually copied the file to every worker. If your network is fast enough, you can use uncompressed files. Or alternatively, use HDFS or FTP server.

Writing files also follows the semantics of reading. I was saving my DataFrame to a csv file on the local system. Again I had the assumption that the results would be sent back to the driver node. Didn’t work for me.

When a DataFrame is saved to local file path, each worker saves its computed partitions to its own disk. No data is sent back to the driver

I was only getting a fraction of the results I was expecting. Initially I had misdiagnosed this problem as an error in my code. Later I found out that each worker was storing its computed results on its own disk.

Partitions

The number of partitions you make affects the performance. By default, Spark will make as many partitions as there are cores in the cluster. This is not always optimal.

Keep an eye on how many workers are actively processing tasks. If too few, increase the number of partitions.

If you read from a gzipped file, Spark creates just one partition which will be processed by only one worker. That is also one reason why gzipped files are slow to process. I have observed slower performance with small number of large partitions as compared to a large number of small partitions.

It’s better to explicitly set the number of partitions while reading data.

You may not have to do this when reading from HDFS, as Hadoop files are already partitioned.

Wikipedia and DBpedia

There are no gotchas here, but I thought it would be good to make you aware of alternatives. The entire Wikipedia xml dump is 14GB compressed and 65 GB uncompressed. Most of the time you only want the plain text of the article, but the dump is in MediaWiki markup so it needs some preprocessing. There are many tools available for this in various languages. Although I haven’t used them personally, I am pretty sure it must be a time consuming task. But there are alternatives.

If all you want is the Wikipedia article plaintext, mostly for NLP, then download the dataset made available by DBpedia.

I used the full article dump (NIF Context) available at DBpedia (direct download from here). This dataset gets rid of unwanted stuff like tables, infoboxes, and references. The compressed download is 4.3GB in the turtle format. You can covert it to tsv like so

Similar datasets are available for other properties like page links, anchor texts, and so on. Do check out DBpedia.

A word about databases

I never quite understood why there is a plethora of databases, all so similar, and on top of that people buy database licenses. Until this project I hadn’t seriously used any. I ever only used MySQL and Apache Derby.

For my project I used a SPARQL triple store database, Apache Jena TDB, accessed over a REST API served by Jena Fuseki. This database would give me RDF urls, labels, and predicates for all the resources mentioned in the supplied article. Every node would make a database call and only then would proceed with further processing.

My workload had become IO bound, as I could see near 0% CPU utilization on worker nodes. Each partition of the data would result in two SPARQL queries. In the worst case scenario, one of the two queries was taking 500–1000 seconds to process. Thankfully, the TDB database relies on Linux’s memory mapping. I could map the whole DB into RAM and significantly improve performance.

If you are IO bound and your database can fit into RAM, run it in memory.

I found a tool called vmtouch which would show what percentage of the database directory had been mapped into memory. This tool also allows you to explicitly map any files/directories into the RAM and optionally lock it so it wont get paged out.

My 16GB database could easily fit into my 32 GB RAM server. This boosted query performance by orders of magnitude to 1–2 seconds per query. Using a rudimentary form of database load balancing based on partition number, I could cut down my execution time to half by using 2 SPARQL servers instead of one.

Conclusion

I truly enjoyed distributed computing on Spark. Without it I could not have completed my project. It was quite easy to take my existing app and have it run on Spark. I definitely would recommend anyone to give it a try.

Originally published at siddheshrane.github.io.