Et streamingbibliotek med en supermagt: FS2 og funktionel programmering

Scala har et meget specielt streaming-bibliotek kaldet FS2 (Functional Streams for Scala). Dette bibliotek indeholder alle fordelene ved funktionel programmering (FP). Ved at forstå dets designmål får du eksponering for de centrale ideer, der gør FP så tiltalende.

FS2 har en central type: Stream[Effect,Output]

Du kan få fra denne type, at det er en, Streamog at det udsender værdier af typen Output.

Det åbenlyse spørgsmål her er hvad der er Effect? Hvad er forbindelsen mellem Effectog Output? Og hvilke fordele har FS2 i forhold til andre streamingbiblioteker?

Oversigt

Jeg starter med at gennemgå hvilke problemer FS2 løser. Derefter sammenligner jeg Listog Streammed flere kodeeksempler. Derefter vil jeg fokusere på, hvordan man bruger Streammed en DB eller en hvilken som helst anden IO. Det er her FS2 skinner, og hvor Effecttypen bruges. Når du først har forstået, hvad der Effecter, skal fordelene ved funktionel programmering være tydelige for dig.

I slutningen af ​​dette indlæg får du svarene på følgende spørgsmål:

  • Hvilke problemer kan jeg løse med FS2?
  • Hvad kan jeg gøre med det, Streamder Listikke kan?
  • Hvordan kan jeg føje data fra en API / fil / DB til Stream?
  • Hvad er denne Effecttype, og hvordan relaterer den sig til funktionel programmering?

Bemærk: Koden er i Scala og skal kunne forstås selv uden forudgående kendskab til syntaksen.

Hvilke problemer kan jeg løse med FS2?

  1. Streaming I / O: Indlæser trinvist store datasæt, der ikke passer ind i hukommelsen og fungerer på dem uden at sprænge din bunke.
  2. Control Flow (ikke dækket): Flytning af data fra en / flere DB'er / filer / API'er til andre på en flot deklarativ måde.
  3. Samtidighed (ikke dækket): Kør forskellige streams parallelt og få dem til at kommunikere sammen. For eksempel indlæser data fra flere filer og behandler dem samtidigt i modsætning til sekventielt. Du kan lave nogle avancerede ting her. Streams kan kommunikere sammen under behandlingsfasen og ikke kun i slutningen.

List vs. Stream

Lister den mest kendte og anvendte datastruktur. For at få en fornemmelse for, hvordan det adskiller sig fra en FS2 Stream, gennemgår vi et par brugssager. Vi vil se, hvordan vi Streamkan løse problemer, der Listikke kan.

Dine data er for store og passer ikke i hukommelsen

Lad os sige, at du har en meget stor fil (40 GB) fahrenheit.txt. Filen har en temperatur på hver linje, og du vil konvertere den til celsius.txt.

Indlæser en stor fil ved hjælp af List

import scala.io.Source val list = Source.fromFile("testdata/fahrenheit.txt").getLines.toList java.lang.OutOfMemoryError: Java heap space java.util.Arrays.copyOfRange(Arrays.java:3664) java.lang.String.(String.java:207) java.io.BufferedReader.readLine(BufferedReader.java:356) java.io.BufferedReader.readLine(BufferedReader.java:389)

Listmislykkes ulykkeligt, fordi filen selvfølgelig er for stor til at passe i hukommelsen. Hvis du er nysgerrig, kan du gå og tjekke den fulde løsning ved hjælp af Streamher - men gør det senere, læs videre :)

Når List ikke gør ... Stream til undsætning!

Lad os sige, at det lykkedes mig at læse min fil, og jeg vil skrive den tilbage. Jeg vil gerne bevare linjestrukturen. Jeg er nødt til at indsætte en ny linjetegn \nefter hver temperatur.

Jeg kan bruge interspersekombinatoren til at gøre det

import fs2._ Stream(1,2,3,4).intersperse("\n").toList

En anden dejlig er zipWithNext

scala> Stream(1,2,3,4).zipWithNext.toList res1: List[(Int, Option[Int])] = List((1,Some(2)), (2,Some(3)), (3,Some(4)), (4,None))

Det bundter sammenhængende ting sammen, meget nyttigt, hvis du vil fjerne fortløbende duplikater.

Disse er kun få fra mange meget nyttige, her er den fulde liste.

StreamKan åbenbart gøre mange ting, der Listikke kan, men den bedste funktion kommer i næste afsnit, det handler om, hvordan man bruger Streami den virkelige verden med DB'er / filer / API'er ...

Hvordan kan jeg føje data fra en API / fil / DB til Stream?

Lad os bare sige for nu, at dette er vores program

scala> Stream(1,2,3) res2: fs2.Stream[fs2.Pure,Int] = Stream(..)

Hvad betyder det Pure? Her er scaladoc fra kildekoden:

/** * Indicates that a stream evaluates no effects. * * A `Stream[Pure,O]` can be safely converted to a `Stream[F,O]` for all `F`. */ type Pure[A] <: Nothing

Det betyder ingen effekter, ok…, men hvad er en effekt? og mere specifikt hvad er effekten af ​​vores program Stream(1,2,3)?

Dette program har bogstaveligt talt ingen effekt på verden. Dens eneste virkning er at få din CPU til at fungere og forbruge noget strøm !! Det påvirker ikke verden omkring dig.

Ved at påvirke verden mener jeg, at den bruger enhver meningsfuld ressource som en fil, en database, eller den producerer noget som en fil, uploader nogle data et eller andet sted, skriver til din terminal og så videre.

Hvordan vender jeg en Purestream til noget nyttigt?

Lad os sige, at jeg vil indlæse bruger-id'er fra en DB, jeg får denne funktion, den ringer til DB'en og returnerer userId som en Long.

import scala.concurrent.Future def loadUserIdByName(userName: String): Future[Long] = ???

Det returnerer a, Futuresom indikerer, at dette opkald er asynkront, og værdien vil være tilgængelig på et eller andet tidspunkt i fremtiden. Den indpakker den værdi, der returneres af DB.

Jeg har denne Purestrøm.

scala> val names = Stream("bob", "alice", "joe") names: fs2.Stream[fs2.Pure,String] = Stream(..)

Hvordan får jeg en Streamaf id'er?

Den naive tilgang ville være at bruge mapfunktionen, den skulle køre funktionen for hver værdi i Stream.

scala> userIdsFromDB.compile res5: fs2.Stream.ToEffect[scala.concurrent.Future,Long] = [email protected]

Jeg fik stadig tilbage a Pure! Jeg gav Streamfunktionen en funktion, der påvirker verden, og jeg fik stadig en Pure, ikke sej ... Det ville have været pænt, hvis FS2 automatisk ville have opdaget, at loadUserIdByNamefunktionen har en effekt på verden og returneret noget, der IKKE er, Puremen det gør fungerer ikke sådan. Du skal bruge en speciel kombinator i stedet for map: du skal bruge evalMap.

scala> userIdsFromDB.toList :18: error: value toList is not a member of fs2.Stream[scala.concurrent.Future,Long] userIdsFromDB.toList ^

Ikke mere Pure! vi fik i Futurestedet, yay! Hvad skete der lige?

Det tog:

  • loadUserIdByName: Future[Long]
  • Stream[Pure, String]

Og skiftede strømmen til

  • Stream[Future, Long]

Det adskilt Futureog isoleret det! Den venstre side, der var Effecttypeparameteren, er nu den konkrete Futuretype.

Pænt trick, men hvordan hjælper det mig?

Du har lige været vidne til ægte adskillelse af bekymringer. Du kan fortsætte med at operere på streamen med alle de fine Listlignende kombinatorer, og du behøver ikke bekymre dig om, hvis DB er nede, langsom eller alle de ting, der er relateret til netværket (effekt).

Det hele fungerer, indtil jeg vil bruge toListtil at få værdierne tilbage

scala> userIdsFromDB.toList :18: error: value toList is not a member of fs2.Stream[scala.concurrent.Future,Long] userIdsFromDB.toList ^

Hvad???!!! Jeg kunne sværge, at jeg brugte toListfør, og det fungerede, hvordan kan det sige, at toListdet ikke er medlem af fs2.Stream[Future,String]mere? Det er som om denne funktion blev fjernet, det øjeblik jeg begyndte at bruge en effektfuld strøm, det er imponerende! Men hvordan får jeg mine værdier tilbage?

scala> userIdsFromDB.compile res5: fs2.Stream.ToEffect[scala.concurrent.Future,Long] = [email protected]

First we use compile to tell the Stream to combine all the effects into one, effectively it folds all the calls to loadUserIdByName into one big Future. This is needed by the framework, and it will become apparent why this step is needed soon.

Now toList should work

scala> userIdsFromDB.compile.toList :18: error: could not find implicit value for parameter F: cats.effect.Sync[scala.concurrent.Future] userIdsFromDB.compile.toList ^

What?! the compiler is still complaining. That’s because Future is not a good Effect type — it breaks the philosophy of separation of concerns as explained in the next very important section.

IMPORTANT: The ONE thing to take away from this post

A key point here, is that the DB has not been called at this point. Nothing happened really, the full program does not produce anything.

def loadUserIdByName(userName: String): Future[Long] = ??? Stream("bob", "alice", "joe").evalMap(loadUserIdByName).compile

Separating program description from evaluation

Yes it might be surprising but the major theme in FP is separating the

  • Description of your program: a good example is the program we just wrote, it’s a pure description of the problem “I give you names and a DB, give me back IDs”

And the

  • Execution of your program: running the actual code and asking it to go to the DB

One more time our program has literally no effect on the world besides making your computer warm, exactly like our Pure stream.

Code that does not have an effect is called pure and that’s what all Functional Programming is about: writing programs with functions that are pure. Bravo, you now know what FP is all about.

Why would you want write code this way? Simple: to achieve separation of concerns between the IO parts and the rest of our code.

Now let’s fix our program and take care of this Future problem.

As we said Future is a bad Effect type, it goes against the separation of concerns principle. Indeed, Future is eager in Scala: the moment you create one it starts to executes on some thread, you don't have control of the execution and thus it breaks. FS2 is well aware of that and does not let you compile. To fix this we have to use a type called IO that wraps our bad Future.

That brings us to the last part, what is this IO type? and how do I finally get my list of usedIds back?

scala> import cats.effect.IO import cats.effect.IO scala> Stream("bob", "alice", "joe").evalMap(name => IO.fromFuture(IO(loadUserIdByName(name)))).compile.toList res8: cats.effect.IO[List[Long]] = IO$2104439279

It now gives us back a List but still, we didn't get our IDs back, so one last thing must be missing.

What does IO really mean?

IO comes from cats-effect library. First let's finish our program and finally get out the ids back from the DB.

scala> userIds.compile.toList.unsafeRunSync :18: error: not found: value userIds userIds.compile.toList.unsafeRunSync ^

The proof that it’s doing something is the fact that it’s failing.

loadUserIdByName(userName: String): Future[Long] = ???

When ??? is called you will get this exception, it means the function was executed (as opposed to before when we made the point that nothing was really happening). When we implement this function it will go to the DB and load the ids, and it will have an effect on the world (network/files system).

IO[Long] is a description of how to get a value of type Long and it most certainly involves doing some I/O i.e going to the network, loading a file,...

It’s the How and not the What. It describes how to get the value from the network. If you want to execute this description, you can use unsafeRunSync (or other functions prefixed unsafe). You can guess why they are called this way: indeed a call to a DB is inherently unsafe as it could fail if, for example, your Internet connection is out.

Recap

Let’s take a last look at Stream[Effect,Output].

Output is the type that the stream emits (could be a stream of String, Long or whatever type you defined).

Effect is the way (the recipe) to produce the Output (i.e go to the DB and give me an id of type Long).

It’s important to understand that if these types are separated to make things easier, breaking down a problem in subproblems allows you to reason about the subproblems independently. You can then solve them and combine their solutions.

The link between these 2 types is the following :

In order for the Stream to emit an element of type

  • Output

It needs to evaluate a type

  • Effect

A special type that encodes an effective action as a value of type IO, this IO value allows the separation of 2 concerns:

  • Description:IO is a simple immutable value, it’s a recipe to get a type A by doing some kind of IO(network/filesystem/…)
  • Execution: in order forIO to do something, you need to execute/run it using io.unsafeRunSync

Putting it all together

Stream[IO,Long] says:

This is a Stream that emits values of type Long and in order to do so, it needs to run an effective function that producesIO[Long] for each value.

That’s a lot of details packed in this very short type. The more details you get about how things happen the fewer errors you make.

Takeaways

  • Stream is a super charged version of List
  • Stream(1,2,3) is of type Stream[Pure, Int] , the second type Int is the type of all values that this stream will emit
  • Pure means no effect on the world. It just makes your CPU work and consumes some power, but besides that it does not affect the world around you.
  • Use evalMap instead of map when you want to apply a function that has an effect like loadUserIdByName to a Stream.
  • Stream[IO, Long] adskiller bekymringerne for hvad og hvordan ved kun at lade dig arbejde med værdierne og ikke bekymre dig om, hvordan du får dem (indlæsning fra db).
  • At adskille programbeskrivelse fra evaluering er et centralt aspekt af FP.
  • Alle de programmer, du skriver med Stream, gør intet, før du bruger det unsafeRunSync. Før det er din kode effektivt ren.
  • IO[Long]er en effekttype, der fortæller dig: du får Longværdier fra IO (kan være en fil, netværket, konsollen ...). Det er en beskrivelse og ikke en indpakning! R
  • Futureikke overholder denne filosofi og er derfor ikke kompatibel med FS2, skal du bruge IOtypen i stedet.

FS2 videoer

  • Hands on screencast af Michael Pilquist: //www.youtube.com/watch?v=B1wb4fIdtn4
  • Tale af Fabio Labella //www.youtube.com/watch?v=x3GLwl1FxcA