Et streamingbibliotek med en supermagt: FS2 og funktionel programmering
Scala har et meget specielt streaming-bibliotek kaldet FS2 (Functional Streams for Scala). Dette bibliotek indeholder alle fordelene ved funktionel programmering (FP). Ved at forstå dets designmål får du eksponering for de centrale ideer, der gør FP så tiltalende.
FS2 har en central type: Stream[Effect,Output]
Du kan få fra denne type, at det er en, Stream
og at det udsender værdier af typen Output
.
Det åbenlyse spørgsmål her er hvad der er Effect
? Hvad er forbindelsen mellem Effect
og Output
? Og hvilke fordele har FS2 i forhold til andre streamingbiblioteker?
Oversigt
Jeg starter med at gennemgå hvilke problemer FS2 løser. Derefter sammenligner jeg List
og Stream
med flere kodeeksempler. Derefter vil jeg fokusere på, hvordan man bruger Stream
med en DB eller en hvilken som helst anden IO. Det er her FS2 skinner, og hvor Effect
typen bruges. Når du først har forstået, hvad der Effect
er, skal fordelene ved funktionel programmering være tydelige for dig.
I slutningen af dette indlæg får du svarene på følgende spørgsmål:
- Hvilke problemer kan jeg løse med FS2?
- Hvad kan jeg gøre med det,
Stream
derList
ikke kan? - Hvordan kan jeg føje data fra en API / fil / DB til
Stream
? - Hvad er denne
Effect
type, og hvordan relaterer den sig til funktionel programmering?
Bemærk: Koden er i Scala og skal kunne forstås selv uden forudgående kendskab til syntaksen.
Hvilke problemer kan jeg løse med FS2?
- Streaming I / O: Indlæser trinvist store datasæt, der ikke passer ind i hukommelsen og fungerer på dem uden at sprænge din bunke.
- Control Flow (ikke dækket): Flytning af data fra en / flere DB'er / filer / API'er til andre på en flot deklarativ måde.
- Samtidighed (ikke dækket): Kør forskellige streams parallelt og få dem til at kommunikere sammen. For eksempel indlæser data fra flere filer og behandler dem samtidigt i modsætning til sekventielt. Du kan lave nogle avancerede ting her. Streams kan kommunikere sammen under behandlingsfasen og ikke kun i slutningen.
List
vs. Stream
List
er den mest kendte og anvendte datastruktur. For at få en fornemmelse for, hvordan det adskiller sig fra en FS2 Stream
, gennemgår vi et par brugssager. Vi vil se, hvordan vi Stream
kan løse problemer, der List
ikke kan.
Dine data er for store og passer ikke i hukommelsen
Lad os sige, at du har en meget stor fil (40 GB) fahrenheit.txt
. Filen har en temperatur på hver linje, og du vil konvertere den til celsius.txt
.
Indlæser en stor fil ved hjælp af List
import scala.io.Source val list = Source.fromFile("testdata/fahrenheit.txt").getLines.toList java.lang.OutOfMemoryError: Java heap space java.util.Arrays.copyOfRange(Arrays.java:3664) java.lang.String.(String.java:207) java.io.BufferedReader.readLine(BufferedReader.java:356) java.io.BufferedReader.readLine(BufferedReader.java:389)
List
mislykkes ulykkeligt, fordi filen selvfølgelig er for stor til at passe i hukommelsen. Hvis du er nysgerrig, kan du gå og tjekke den fulde løsning ved hjælp af Stream
her - men gør det senere, læs videre :)
Når List ikke gør ... Stream til undsætning!
Lad os sige, at det lykkedes mig at læse min fil, og jeg vil skrive den tilbage. Jeg vil gerne bevare linjestrukturen. Jeg er nødt til at indsætte en ny linjetegn \n
efter hver temperatur.
Jeg kan bruge intersperse
kombinatoren til at gøre det
import fs2._ Stream(1,2,3,4).intersperse("\n").toList
En anden dejlig er zipWithNext
scala> Stream(1,2,3,4).zipWithNext.toList res1: List[(Int, Option[Int])] = List((1,Some(2)), (2,Some(3)), (3,Some(4)), (4,None))
Det bundter sammenhængende ting sammen, meget nyttigt, hvis du vil fjerne fortløbende duplikater.
Disse er kun få fra mange meget nyttige, her er den fulde liste.
Stream
Kan åbenbart gøre mange ting, der List
ikke kan, men den bedste funktion kommer i næste afsnit, det handler om, hvordan man bruger Stream
i den virkelige verden med DB'er / filer / API'er ...
Hvordan kan jeg føje data fra en API / fil / DB til Stream
?
Lad os bare sige for nu, at dette er vores program
scala> Stream(1,2,3) res2: fs2.Stream[fs2.Pure,Int] = Stream(..)
Hvad betyder det Pure
? Her er scaladoc fra kildekoden:
/** * Indicates that a stream evaluates no effects. * * A `Stream[Pure,O]` can be safely converted to a `Stream[F,O]` for all `F`. */ type Pure[A] <: Nothing
Det betyder ingen effekter, ok…, men hvad er en effekt? og mere specifikt hvad er effekten af vores program Stream(1,2,3)
?
Dette program har bogstaveligt talt ingen effekt på verden. Dens eneste virkning er at få din CPU til at fungere og forbruge noget strøm !! Det påvirker ikke verden omkring dig.
Ved at påvirke verden mener jeg, at den bruger enhver meningsfuld ressource som en fil, en database, eller den producerer noget som en fil, uploader nogle data et eller andet sted, skriver til din terminal og så videre.
Hvordan vender jeg en Pure
stream til noget nyttigt?
Lad os sige, at jeg vil indlæse bruger-id'er fra en DB, jeg får denne funktion, den ringer til DB'en og returnerer userId som en Long
.
import scala.concurrent.Future def loadUserIdByName(userName: String): Future[Long] = ???
Det returnerer a, Future
som indikerer, at dette opkald er asynkront, og værdien vil være tilgængelig på et eller andet tidspunkt i fremtiden. Den indpakker den værdi, der returneres af DB.
Jeg har denne Pure
strøm.
scala> val names = Stream("bob", "alice", "joe") names: fs2.Stream[fs2.Pure,String] = Stream(..)
Hvordan får jeg en Stream
af id'er?
Den naive tilgang ville være at bruge map
funktionen, den skulle køre funktionen for hver værdi i Stream
.
scala> userIdsFromDB.compile res5: fs2.Stream.ToEffect[scala.concurrent.Future,Long] = [email protected]
Jeg fik stadig tilbage a Pure
! Jeg gav Stream
funktionen en funktion, der påvirker verden, og jeg fik stadig en Pure
, ikke sej ... Det ville have været pænt, hvis FS2 automatisk ville have opdaget, at loadUserIdByName
funktionen har en effekt på verden og returneret noget, der IKKE er, Pure
men det gør fungerer ikke sådan. Du skal bruge en speciel kombinator i stedet for map
: du skal bruge evalMap
.
scala> userIdsFromDB.toList :18: error: value toList is not a member of fs2.Stream[scala.concurrent.Future,Long] userIdsFromDB.toList ^
Ikke mere Pure
! vi fik i Future
stedet, yay! Hvad skete der lige?
Det tog:
loadUserIdByName: Future[Long]
Stream[Pure, String]
Og skiftede strømmen til
Stream[Future, Long]
Det adskilt Future
og isoleret det! Den venstre side, der var Effect
typeparameteren, er nu den konkrete Future
type.
Pænt trick, men hvordan hjælper det mig?
Du har lige været vidne til ægte adskillelse af bekymringer. Du kan fortsætte med at operere på streamen med alle de fine List
lignende kombinatorer, og du behøver ikke bekymre dig om, hvis DB er nede, langsom eller alle de ting, der er relateret til netværket (effekt).
Det hele fungerer, indtil jeg vil bruge toList
til at få værdierne tilbage
scala> userIdsFromDB.toList :18: error: value toList is not a member of fs2.Stream[scala.concurrent.Future,Long] userIdsFromDB.toList ^
Hvad???!!! Jeg kunne sværge, at jeg brugte toList
før, og det fungerede, hvordan kan det sige, at toList
det ikke er medlem af fs2.Stream[Future,String]
mere? Det er som om denne funktion blev fjernet, det øjeblik jeg begyndte at bruge en effektfuld strøm, det er imponerende! Men hvordan får jeg mine værdier tilbage?
scala> userIdsFromDB.compile res5: fs2.Stream.ToEffect[scala.concurrent.Future,Long] = [email protected]
First we use compile
to tell the Stream
to combine all the effects into one, effectively it folds all the calls to loadUserIdByName
into one big Future
. This is needed by the framework, and it will become apparent why this step is needed soon.
Now toList
should work
scala> userIdsFromDB.compile.toList :18: error: could not find implicit value for parameter F: cats.effect.Sync[scala.concurrent.Future] userIdsFromDB.compile.toList ^
What?! the compiler is still complaining. That’s because Future
is not a good Effect
type — it breaks the philosophy of separation of concerns as explained in the next very important section.
IMPORTANT: The ONE thing to take away from this post
A key point here, is that the DB has not been called at this point. Nothing happened really, the full program does not produce anything.
def loadUserIdByName(userName: String): Future[Long] = ??? Stream("bob", "alice", "joe").evalMap(loadUserIdByName).compile
Separating program description from evaluation
Yes it might be surprising but the major theme in FP is separating the
- Description of your program: a good example is the program we just wrote, it’s a pure description of the problem “I give you names and a DB, give me back IDs”
And the
- Execution of your program: running the actual code and asking it to go to the DB
One more time our program has literally no effect on the world besides making your computer warm, exactly like our Pure
stream.
Code that does not have an effect is called pure and that’s what all Functional Programming is about: writing programs with functions that are pure. Bravo, you now know what FP is all about.
Why would you want write code this way? Simple: to achieve separation of concerns between the IO parts and the rest of our code.
Now let’s fix our program and take care of this Future
problem.
As we said Future
is a bad Effect
type, it goes against the separation of concerns principle. Indeed, Future
is eager in Scala: the moment you create one it starts to executes on some thread, you don't have control of the execution and thus it breaks. FS2 is well aware of that and does not let you compile. To fix this we have to use a type called IO
that wraps our bad Future
.
That brings us to the last part, what is this IO
type? and how do I finally get my list of usedIds
back?
scala> import cats.effect.IO import cats.effect.IO scala> Stream("bob", "alice", "joe").evalMap(name => IO.fromFuture(IO(loadUserIdByName(name)))).compile.toList res8: cats.effect.IO[List[Long]] = IO$2104439279
It now gives us back a List
but still, we didn't get our IDs back, so one last thing must be missing.

What does IO
really mean?
IO
comes from cats-effect library. First let's finish our program and finally get out the ids back from the DB.
scala> userIds.compile.toList.unsafeRunSync :18: error: not found: value userIds userIds.compile.toList.unsafeRunSync ^
The proof that it’s doing something is the fact that it’s failing.
loadUserIdByName(userName: String): Future[Long] = ???
When ???
is called you will get this exception, it means the function was executed (as opposed to before when we made the point that nothing was really happening). When we implement this function it will go to the DB and load the ids, and it will have an effect on the world (network/files system).
IO[Long]
is a description of how to get a value of type Long
and it most certainly involves doing some I/O i.e going to the network, loading a file,...
It’s the How and not the What. It describes how to get the value from the network. If you want to execute this description, you can use unsafeRunSync
(or other functions prefixed unsafe
). You can guess why they are called this way: indeed a call to a DB is inherently unsafe as it could fail if, for example, your Internet connection is out.
Recap
Let’s take a last look at Stream[Effect,Output]
.
Output
is the type that the stream emits (could be a stream of String
, Long
or whatever type you defined).
Effect
is the way (the recipe) to produce the Output
(i.e go to the DB and give me an id
of type Long
).
It’s important to understand that if these types are separated to make things easier, breaking down a problem in subproblems allows you to reason about the subproblems independently. You can then solve them and combine their solutions.
The link between these 2 types is the following :
In order for the Stream
to emit an element of type
Output
It needs to evaluate a type
Effect
A special type that encodes an effective action as a value of type IO
, this IO
value allows the separation of 2 concerns:
- Description:
IO
is a simple immutable value, it’s a recipe to get a typeA
by doing some kind of IO(network/filesystem/…) - Execution: in order for
IO
to do something, you need to execute/run it usingio.unsafeRunSync
Putting it all together
Stream[IO,Long]
says:
This is a Stream
that emits values of type Long
and in order to do so, it needs to run an effective function that producesIO[Long]
for each value.
That’s a lot of details packed in this very short type. The more details you get about how things happen the fewer errors you make.
Takeaways
Stream
is a super charged version ofList
Stream(1,2,3)
is of typeStream[Pure, Int]
, the second typeInt
is the type of all values that this stream will emitPure
means no effect on the world. It just makes your CPU work and consumes some power, but besides that it does not affect the world around you.- Use
evalMap
instead ofmap
when you want to apply a function that has an effect likeloadUserIdByName
to aStream
. Stream[IO, Long]
adskiller bekymringerne for hvad og hvordan ved kun at lade dig arbejde med værdierne og ikke bekymre dig om, hvordan du får dem (indlæsning fra db).- At adskille programbeskrivelse fra evaluering er et centralt aspekt af FP.
- Alle de programmer, du skriver med
Stream
, gør intet, før du bruger detunsafeRunSync
. Før det er din kode effektivt ren. IO[Long]
er en effekttype, der fortæller dig: du fårLong
værdier fra IO (kan være en fil, netværket, konsollen ...). Det er en beskrivelse og ikke en indpakning! RFuture
ikke overholder denne filosofi og er derfor ikke kompatibel med FS2, skal du brugeIO
typen i stedet.
FS2 videoer
- Hands on screencast af Michael Pilquist: //www.youtube.com/watch?v=B1wb4fIdtn4
- Tale af Fabio Labella //www.youtube.com/watch?v=x3GLwl1FxcA