En begyndervejledning til RxJS & Redux Observable

Redux-Observable er en RxJS-baseret middleware til Redux, der giver udviklere mulighed for at arbejde med async-handlinger. Det er et alternativ til redux-thunk og redux-saga.

Denne artikel dækker det grundlæggende i RxJS, hvordan du opsætter Redux-Observables og nogle af dens praktiske brugssager. Men før det er vi nødt til at forstå observatørmønsteret .

Observatørmønster

I observatørmønsteret opretholder et objekt kaldet "Observable" eller "Subject" en samling abonnenter kaldet "Observers." Når motivets tilstand ændres, underretter den alle sine observatører.

I JavaScript ville det enkleste eksempel være begivenhedsemittere og begivenhedshåndterere.

Når du gør det .addEventListener, skubber du en observatør ind i motivets samling af observatører. Når begivenheden sker, underretter emnet alle observatører.

RxJS

I henhold til det officielle websted,

RxJS er JavaScript-implementering af ReactiveX, et bibliotek til komponering af asynkrone og begivenhedsbaserede programmer ved hjælp af observerbare sekvenser.

Enkelt sagt er RxJS en implementering af observatørmønsteret. Det udvider også observatørmønsteret ved at tilbyde operatører, der giver os mulighed for at komponere observerbare og emner på en deklarativ måde.

Observatører, observerbare, operatører og emner er byggestenene i RxJS. Så lad os se mere på hver enkelt nu.

Observatører

Observatører er objekter, der kan abonnere på Observables and Subjects. Efter abonnement kan de modtage underretninger af tre typer - næste, fejl og fuldfør.

Ethvert objekt med følgende struktur kan bruges som observatør.

interface Observer { closed?: boolean; next: (value: T) => void; error: (err: any) => void; complete: () => void; }

Når det observerbare skubber næste, fejl- og fuldstændige meddelelser, observatørens .next, .errorog .completemetoder er gældende.

Observerbare

Observable er objekter, der kan udsende data over en periode. Det kan repræsenteres ved hjælp af "marmordiagrammet".

Hvor den vandrette linje repræsenterer tiden, repræsenterer de cirkulære knudepunkter de data, der udsendes af den observerbare, og den lodrette linje indikerer, at den observerbare er gennemført med succes.

Observerbare kan støde på en fejl. Korset repræsenterer den fejl, der udsendes af den observerbare.

Tilstandene "afsluttet" og "fejl" er endelige. Det betyder, Observables kan ikke udsende nogen data, efter at de er fuldført eller stødt på en fejl.

Oprettelse af en observerbar

Observable oprettes ved hjælp af new Observablekonstruktøren, der tager et argument - abonnementsfunktionen. Observable kan også oprettes ved hjælp af nogle operatorer, men vi vil tale om det senere, når vi taler om Operators.

import { Observable } from 'rxjs'; const observable = new Observable(subscriber => { // Subscribe function });

Abonnere på en observerbar

Observable kan abonneres ved hjælp af deres .subscribemetode og passerer en Observer.

observable.subscribe({ next: (x) => console.log(x), error: (x) => console.log(x), complete: () => console.log('completed'); });

Udførelse af en observerbar

Abonnefunktionen, som vi sendte til new Observablekonstruktøren, udføres hver gang den observerbare abonnerer.

Abonnementsfunktionen tager et argument - abonnenten. Abonnentens ligner strukturen af en observatør, og det har de samme 3 metoder: .next, .error, og .complete.

Observerbare kan skubbe data til observatøren ved hjælp af .nextmetoden. Hvis den observerbare er gennemført med succes, kan den underrette observatøren ved hjælp af .completemetoden. Hvis den observerbare har stødt på en fejl, kan den skubbe fejlen til observatøren ved hjælp af .errormetoden.

// Create an Observable const observable = new Observable(subscriber => { subscriber.next('first data'); subscriber.next('second data'); setTimeout(() => { subscriber.next('after 1 second - last data'); subscriber.complete(); subscriber.next('data after completion'); //  console.log(x), error: (x) => console.log(x), complete: () => console.log('completed') }); // Outputs: // // first data // second data // third data // after 1 second - last data // completed

Observerbare er Unicast

Observables er unicast , hvilket betyder Observables kan højst have en abonnent. Når en observatør abonnerer på en observerbar, får den en kopi af den observerbare, der har sin egen eksekveringssti, hvilket gør Observables unicast.

Det er som at se en YouTube-video. Alle seere ser det samme videoindhold, men de kan se på forskellige segmenter af videoen.

Eksempel : lad os oprette en observerbar, der udsender 1 til 10 i løbet af 10 sekunder. Abonner derefter med det observerbare en gang med det samme og igen efter 5 sekunder.

// Create an Observable that emits data every second for 10 seconds const observable = new Observable(subscriber => { let count = 1; const interval = setInterval(() => { subscriber.next(count++); if (count > 10) { clearInterval(interval); } }, 1000); }); // Subscribe to the Observable observable.subscribe({ next: value => { console.log(`Observer 1: ${value}`); } }); // After 5 seconds subscribe again setTimeout(() => { observable.subscribe({ next: value => { console.log(`Observer 2: ${value}`); } }); }, 5000); /* Output Observer 1: 1 Observer 1: 2 Observer 1: 3 Observer 1: 4 Observer 1: 5 Observer 2: 1 Observer 1: 6 Observer 2: 2 Observer 1: 7 Observer 2: 3 Observer 1: 8 Observer 2: 4 Observer 1: 9 Observer 2: 5 Observer 1: 10 Observer 2: 6 Observer 2: 7 Observer 2: 8 Observer 2: 9 Observer 2: 10 */

I output kan du bemærke, at den anden observatør begyndte at udskrive fra 1, selvom den abonnerede efter 5 sekunder. Dette sker, fordi den anden observatør modtog en kopi af den observerbare, hvis abonnementsfunktion blev påberåbt igen. Dette illustrerer Observables unicast-opførsel.

Emner

Et emne er en særlig type observerbar.

Oprettelse af et emne

Et emne oprettes ved hjælp af new Subjectkonstruktøren.

import { Subject } from 'rxjs'; // Create a subject const subject = new Subject();

Abonnere på et emne

Abonnement på et emne svarer til at abonnere på en observerbar: du bruger .subscribemetoden og sender en observatør.

subject.subscribe({ next: (x) => console.log(x), error: (x) => console.log(x), complete: () => console.log("done") });

Udførelse af et emne

I modsætning til observable, et Emne kalder sine egne .next, .errorog .completemetoder til at sende data til observatører.

// Push data to all observers subject.next('first data'); // Push error to all observers subject.error('oops something went wrong'); // Complete subject.complete('done');

Emner er Multicast

Emner er multicast: flere observatører deler det samme emne og dets eksekveringssti. Det betyder, at alle meddelelser udsendes til alle observatører. Det er som at se et live program. Alle seere ser det samme segment af det samme indhold på samme tid.

Eksempel: lad os oprette et emne, der udsender 1 til 10 i løbet af 10 sekunder. Abonner derefter med det observerbare en gang med det samme og igen efter 5 sekunder.

// Create a subject const subject = new Subject(); let count = 1; const interval = setInterval(() => { subscriber.next(count++); if (count > 10) { clearInterval(interval); } }, 1000); // Subscribe to the subjects subject.subscribe(data => { console.log(`Observer 1: ${data}`); }); // After 5 seconds subscribe again setTimeout(() => { subject.subscribe(data => { console.log(`Observer 2: ${data}`); }); }, 5000); /* OUTPUT Observer 1: 1 Observer 1: 2 Observer 1: 3 Observer 1: 4 Observer 1: 5 Observer 2: 5 Observer 1: 6 Observer 2: 6 Observer 1: 7 Observer 2: 7 Observer 1: 8 Observer 2: 8 Observer 1: 9 Observer 2: 9 Observer 1: 10 Observer 2: 10 */ 

I output kan du bemærke, at den anden observatør startede udskrivning fra 5 i stedet for at starte med 1. Dette sker, fordi den anden observatør deler det samme emne. Da emnet abonnerer efter 5 sekunder, har emnet allerede udsendt 1 til 4. Dette illustrerer et emnets multicast-opførsel.

Emner er både observerbare og observatører

Emner har .next, .errorog .completemetoder. Det betyder, at de følger observatørens struktur. Derfor kan et emne også bruges som observatør og overføres til .subscribefunktionen af ​​observerbare eller andre emner.

Eksempel: lad os oprette et observerbart og et emne. Abonner derefter på det observerbare ved hjælp af emnet som observatør. Til sidst, abonner på emnet. Alle de værdier, der udsendes af den observerbare, skubbes til emnet, og emnet sender de modtagne værdier til alle dets observatører.

// Create an Observable that emits data every second const observable = new Observable(subscriber => { let count = 1; const interval = setInterval(() => { subscriber.next(count++); if (count > 5) { clearInterval(interval); } }, 1000); }); // Create a subject const subject = new Subject(); // Use the Subject as Observer and subscribe to the Observable observable.subscribe(subject); // Subscribe to the subject subject.subscribe({ next: value => console.log(value) }); /* Output 1 2 3 4 5 */

Operatører

Operatører er det, der gør RxJS nyttigt. Operatører er rene funktioner, der returnerer en ny observerbar. De kan kategoriseres i 2 hovedkategorier:

  1. Skabelsesoperatører
  2. Rørbare operatører

Skabelsesoperatører

Oprettelsesoperatører er funktioner, der kan skabe en ny observerbar.

Eksempel: vi kan oprette en observerbar, der udsender hvert element i et array ved hjælp af fromoperatøren.

const observable = from([2, 30, 5, 22, 60, 1]); observable.subscribe({ next: (value) => console.log("Received", value), error: (err) => console.log(err), complete: () => console.log("done") }); /* OUTPUTS Received 2 Received 30 Received 5 Received 22 Received 60 Received 1 done */

Det samme kan være observerbart ved hjælp af marmordiagrammet.

Rørbare operatører

Rørbare operatører er funktioner, der tager en observerbar som input og returnerer en ny observerbar med ændret adfærd.

Eksempel: lad os tage det observerbare, som vi oprettede ved hjælp af fromoperatøren. Nu ved hjælp af denne observerbare kan vi oprette en ny observerbar, der kun udsender tal større end 10 ved hjælp af filteroperatøren.

const greaterThanTen = observable.pipe(filter(x => x > 10)); greaterThanTen.subscribe(console.log, console.log, () => console.log("completed")); // OUTPUT // 11 // 12 // 13 // 14 // 15

Det samme kan repræsenteres ved hjælp af marmordiagrammet.

Der er mange flere nyttige operatører derude. Du kan se den komplette operatørliste sammen med eksempler i den officielle RxJS-dokumentation her.

Det er afgørende at forstå alle de almindeligt anvendte operatører. Her er nogle operatører, som jeg bruger ofte:

  1. mergeMap
  2. switchMap
  3. exhaustMap
  4. map
  5. catchError
  6. startWith
  7. delay
  8. debounce
  9. throttle
  10. interval
  11. from
  12. of

Redux Observables

I henhold til det officielle websted,

RxJS-baseret middleware til Redux. Komponer og annuller asynkroniseringshandlinger for at skabe bivirkninger og mere.

I Redux, når en handling sendes, kører den gennem alle reduceringsfunktionerne, og en ny tilstand returneres.

Redux-observerbar tager alle disse afsendte handlinger og nye stater og skaber to observerbare ud af det - observerbare handlinger og observerbare action$stater state$.

Handlinger, der kan observeres, udsender alle de handlinger, der sendes ved hjælp af store.dispatch(). Stater, der kan observeres, udsender alle de nye statsobjekter, der returneres af rodreducerende.

Epics

I henhold til det officielle websted,

Det er en funktion, der tager en strøm af handlinger og returnerer en strøm af handlinger. Handlinger i, handlinger ud.

Epics are functions that can be used to subscribe to Actions and States Observables. Once subscribed, epics will receive the stream of actions and states as input, and it must return a stream of actions as an output. Actions In - Actions Out.

const someEpic = (action$, state$) => { return action$.pipe( // subscribe to actions observable map(action => { // Receive every action, Actions In return someOtherAction(); // return an action, Actions Out }) ) }

It is important to understand that all the actions received in the Epic have already finished running through the reducers.

Inside an Epic, we can use any RxJS observable patterns, and this is what makes redux-observables useful.

Example: we can use the .filter operator to create a new intermediate observable. Similarly, we can create any number of intermediate observables, but the final output of the final observable must be an action, otherwise an exception will be raised by redux-observable.

const sampleEpic = (action$, state$) => { return action$.pipe( filter(action => action.payload.age >= 18), // can create intermediate observables and streams map(value => above18(value)) // where above18 is an action creator ); }

Every action emitted by the Epics are immediately dispatched using the store.dispatch().

Setup

First, let's install the dependencies.

npm install --save rxjs redux-observable

Create a separate folder named epics to keep all the epics. Create a new file index.js inside the epics folder and combine all the epics using the combineEpics function to create the root epic. Then export the root epic.

import { combineEpics } from 'redux-observable'; import { epic1 } from './epic1'; import { epic2 } from './epic2'; const epic1 = (action$, state$) => { ... } const epic2 = (action$, state$) => { ... } export default combineEpics(epic1, epic2);

Create an epic middleware using the createEpicMiddleware function and pass it to the createStore Redux function.

import { createEpicMiddleware } from 'redux-observable'; import { createStore, applyMiddleware } from 'redux'; import rootEpic from './rootEpics'; const epicMiddleware = createEpicMiddlware(); const store = createStore( rootReducer, applyMiddleware(epicMiddlware) );

Finally, pass the root epic to epic middleware's .run method.

epicMiddleware.run(rootEpic);

Some Practical Usecases

RxJS has a big learning curve, and the redux-observable setup worsens the already painful Redux setup process. All that makes Redux observable look like an overkill. But here are some practical use cases that can change your mind.

Throughout this section, I will be comparing redux-observables with redux-thunk to show how redux-observables can be helpful in complex use-cases. I don't hate redux-thunk, I love it, and I use it every day!

1. Make API Calls

Usecase: Make an API call to fetch comments of a post. Show loaders when the API call is in progress and also handle API errors.

A redux-thunk implementation will look like this,

function getComments(postId){ return (dispatch) => { dispatch(getCommentsInProgress()); axios.get(`/v1/api/posts/${postId}/comments`).then(response => { dispatch(getCommentsSuccess(response.data.comments)); }).catch(() => { dispatch(getCommentsFailed()); }); } }

and this is absolutely correct. But the action creator is bloated.

We can write an Epic to implement the same using redux-observables.

const getCommentsEpic = (action$, state$) => action$.pipe( ofType('GET_COMMENTS'), mergeMap((action) => from(axios.get(`/v1/api/posts/${action.payload.postId}/comments`).pipe( map(response => getCommentsSuccess(response.data.comments)), catchError(() => getCommentsFailed()), startWith(getCommentsInProgress()) ) );

Now it allows us to have a clean and simple action creator like this,

function getComments(postId) { return { type: 'GET_COMMENTS', payload: { postId } } }

2. Request Debouncing

Usecase: Provide autocompletion for a text field by calling an API whenever the value of the text field changes. API call should be made 1 second after the user has stopped typing.

A redux-thunk implementation will look like this,

let timeout; function valueChanged(value) { return dispatch => { dispatch(loadSuggestionsInProgress()); dispatch({ type: 'VALUE_CHANGED', payload: { value } }); // If changed again within 1 second, cancel the timeout timeout && clearTimeout(timeout); // Make API Call after 1 second timeout = setTimeout(() => { axios.get(`/suggestions?q=${value}`) .then(response => dispatch(loadSuggestionsSuccess(response.data.suggestions))) .catch(() => dispatch(loadSuggestionsFailed())) }, 1000, value); } }

It requires a global variable timeout. When we start using global variables, our action creators are not longer pure functions. It also becomes difficult to unit test the action creators that use a global variable.

We can implement the same with redux-observable using the .debounce operator.

const loadSuggestionsEpic = (action$, state$) => action$.pipe( ofType('VALUE_CHANGED'), debounce(1000), mergeMap(action => from(axios.get(`/suggestions?q=${action.payload.value}`)).pipe( map(response => loadSuggestionsSuccess(response.data.suggestions)), catchError(() => loadSuggestionsFailed()) )), startWith(loadSuggestionsInProgress()) );

Now, our action creators can be cleaned up, and more importantly, they can be pure functions again.

function valueChanged(value) { return { type: 'VALUE_CHANGED', payload: { value } } }

3. Request Cancellation

Usecase: Continuing the previous use-case, assume that the user didn't type anything for 1 second, and we made our 1st API call to fetch the suggestions.

Let's say the API itself takes an average of 2-3 seconds to return the result. Now, if the user types something while the 1st API call is in progress, after 1 second, we will make our 2nd API. We can end up having two API calls at the same time, and it can create a race condition.

To avoid this, we need to cancel the 1st API call before making the 2nd API call.

A redux-thunk implementation will look like this,

let timeout; var cancelToken = axios.cancelToken; let apiCall; function valueChanged(value) { return dispatch => { dispatch(loadSuggestionsInProgress()); dispatch({ type: 'VALUE_CHANGED', payload: { value } }); // If changed again within 1 second, cancel the timeout timeout && clearTimeout(timeout); // Make API Call after 1 second timeout = setTimeout(() => { // Cancel the existing API apiCall && apiCall.cancel('Operation cancelled'); // Generate a new token apiCall = cancelToken.source(); axios.get(`/suggestions?q=${value}`, { cancelToken: apiCall.token }) .then(response => dispatch(loadSuggestionsSuccess(response.data.suggestions))) .catch(() => dispatch(loadSuggestionsFailed())) }, 1000, value); } }

Now, it requires another global variable to store the Axios's cancel token. More global variables = more impure functions!

To implement the same using redux-observable, all we need to do is replace .mergeMap with .switchMap.

const loadSuggestionsEpic = (action$, state$) => action$.pipe( ofType('VALUE_CHANGED'), throttle(1000), switchMap(action => from(axios.get(`/suggestions?q=${action.payload.value}`)).pipe( map(response => loadSuggestionsSuccess(response.data.suggestions)), catchError(() => loadSuggestionsFailed()) )), startWith(loadSuggestionsInProgress()) );

Since it doesn't require any changes to our action creators, they can continue to be pure functions.

Tilsvarende er der mange brugssager, hvor Redux-Observables faktisk skinner! For eksempel afstemning af en API, visning af snackbarer, administration af WebSocket-forbindelser osv.

Til afslutning

Hvis du udvikler et Redux-program, der involverer så komplekse brugssager, anbefales det stærkt at bruge Redux-Observables. Når alt kommer til alt er fordelene ved at bruge den direkte proportional med kompleksiteten i din applikation, og det fremgår af de ovennævnte praktiske brugssager.

Jeg tror stærkt på, at brug af det rigtige sæt biblioteker vil hjælpe os med at udvikle meget renere og vedligeholdelige applikationer, og på lang sigt vil fordelene ved at bruge dem opveje ulemperne.