Sådan migrerer du fra Elasticsearch 1.7 til 6.8 uden nul nedetid

Min sidste opgave på BigPanda var at opgradere en eksisterende tjeneste, der brugte Elasticsearch version 1.7 til en nyere Elasticsearch-version, 6.8.1.

I dette indlæg vil jeg dele, hvordan vi migrerede fra Elasticsearch 1.6 til 6.8 med barske begrænsninger som nul nedetid, intet datatab og nul bugs. Jeg giver dig også et script, der udfører migrationen for dig.

Dette indlæg indeholder 6 kapitler (og et er valgfrit):

  • Hvad får jeg ud af det? -> Hvad var de nye funktioner, der fik os til at opgradere vores version?
  • Begrænsningerne -> Hvad var vores forretningskrav?
  • Problemløsning -> Hvordan tacklede vi begrænsningerne?
  • Fremad -> Planen.
  • [Valgfrit kapitel] -> Hvordan håndterede vi det berygtede kortlægningseksplosionsproblem?
  • Endelig -> Sådan foretages datamigrering mellem klynger.

Kapitel 1 - Hvad er der for mig?

Hvilke fordele forventede vi at løse ved at opgradere vores datalager?

Der var et par grunde:

  1. Problemer med ydeevne og stabilitet - Vi oplevede et stort antal afbrydelser med lang MTTR, der forårsagede os en masse hovedpine. Dette afspejles i hyppige høje latenstider, høj CPU-brug og flere problemer.
  2. Ikke-eksisterende support i gamle Elasticsearch-versioner - Vi manglede noget operativ viden i Elasticsearch, og da vi søgte efter ekstern konsulent, blev vi opfordret til at migrere fremad for at modtage support.
  3. Dynamiske kortlægninger i vores skema - Vores nuværende skema i Elasticsearch 1.7 brugte en funktion kaldet dynamiske kortlægninger, der fik vores klynge til at eksplodere flere gange. Så vi ville tage fat på dette problem.
  4. Dårlig synlighed på vores eksisterende klynge - Vi ønskede et bedre overblik under emhætten og så, at senere versioner havde gode metrics-eksportværktøjer.

Kapitel 2 - Begrænsningerne

  • NUL downtime migration - Vi har aktive brugere på vores system, og vi havde ikke råd til, at systemet var nede, mens vi migrerede.
  • Gendannelsesplan - Vi havde ikke råd til at "miste" eller "korrupte" data, uanset omkostningerne. Så vi havde brug for at udarbejde en genopretningsplan, hvis vores migration mislykkedes.
  • Nul bugs - Vi kunne ikke ændre eksisterende søgefunktionalitet for slutbrugere.

Kapitel 3 - Problemløsning og tænkning af en plan

Lad os tackle begrænsningerne fra det enkleste til det sværeste:

Nul bugs

For at imødekomme dette krav undersøgte jeg alle de mulige anmodninger, som tjenesten modtager, og hvad dens output var. Derefter tilføjede jeg enhedstest, hvor det var nødvendigt.

Derudover tilføjede jeg flere metrics (til Elasticsearch Indexerog the new Elasticsearch Indexer) for at spore latenstid, kapacitet og ydeevne, hvilket gjorde det muligt for mig at validere, at vi kun forbedrede dem.

Genopretningsplan

Dette betyder, at jeg havde brug for at løse følgende situation: Jeg implementerede den nye kode til produktion, og ting fungerede ikke som forventet. Hvad kan jeg gøre ved det da

Da jeg arbejdede i en tjeneste, der brugte hændelsessourcing, kunne jeg tilføje en anden lytter (diagrammet vedhæftet nedenfor) og begynde at skrive til en ny Elasticsearch-klynge uden at påvirke produktionsstatus

Nul nedetid migrering

Den aktuelle tjeneste er i live-tilstand og kan ikke "deaktiveres" i perioder, der er længere end 5-10 minutter. Tricket for at få det rigtige er dette:

  • Gem en logbog over alle de handlinger, din tjeneste håndterer (vi bruger Kafka i produktion)
  • Start migreringsprocessen offline (og hold styr på offset, før du startede migrationen)
  • Når migrationen slutter, skal du starte den nye tjeneste mod loggen med den registrerede forskydning og indhente forsinkelsen
  • Når forsinkelsen er færdig, skal du ændre din frontend til forespørgsel mod den nye tjeneste, og du er færdig

Kapitel 4 - Planen

Vores nuværende tjeneste bruger følgende arkitektur (baseret på meddelelse, der sendes i Kafka):

  1. Event topicindeholder begivenheder produceret af andre applikationer (for eksempel UserId 3 created)
  2. Command topicindeholder oversættelsen af ​​disse begivenheder til specifikke kommandoer, der bruges af denne applikation (for eksempel Add userId 3:)
  3. Elasticsearch 1.7 - Datalageret af command Topiclæst af Elasticsearch Indexer.

Vi planlagde at tilføje en anden forbruger ( new Elasticsearch Indexer) til den command topic, som læser de samme nøjagtige meddelelser og skriver dem parallelt med Elasticsearch 6.8.

Hvor skal jeg starte?

For at være ærlig betragtede jeg mig selv som en nybegynder Elasticsearch-bruger. For at være sikker på at udføre denne opgave var jeg nødt til at tænke over den bedste måde at komme ind på dette emne på og lære det. Et par ting, der hjalp var:

  1. Dokumentation - Det er en sindssygt nyttig ressource til alt Elasticsearch. Tag dig tid til at læse den og tage noter (gå ikke glip af: Mapping og QueryDsl).
  2. HTTP API - alt under CAT API. Dette var super nyttigt at debugge ting lokalt og se, hvordan Elasticsearch reagerer (gå ikke glip af: klyngesundhed, katteindeks, søg, slet indeks).
  3. Metrics (❤️) - Fra den første dag konfigurerede vi et skinnende nyt instrumentbræt med masser af seje metrics (taget fra elasticsearch-eksportør-for-Prometheus ), der hjalp og skubbede os til at forstå mere om Elasticsearch.

Koden

Vores kodebase brugte et bibliotek kaldet elastic4s og brugte den ældste version, der var tilgængelig i biblioteket - en rigtig god grund til at migrere! Så den første ting at gøre var bare at migrere versioner og se, hvad der brød.

Der er et par taktikker til, hvordan man udfører denne kodemigrering. Den valgte taktik var at forsøge at gendanne eksisterende funktionalitet først i den nye Elasticsearch-version uden at omskrive al koden fra starten. Med andre ord at nå eksisterende funktionalitet, men på en nyere version af Elasticsearch.

Heldigvis for os indeholdt koden allerede næsten fuld testdækning, så vores opgave var meget meget enklere, og det tog omkring 2 ugers udviklingstid.

Det er vigtigt at bemærke, at hvis det ikke var tilfældet, ville vi have været nødt til at investere lidt tid i at udfylde denne dækning. Først da kunne vi migrere, da en af ​​vores begrænsninger ikke var at bryde eksisterende funktionalitet.

Kapitel 5 - Kortlægningseksplosionsproblemet

Lad os beskrive vores brugssag mere detaljeret. Dette er vores model:

class InsertMessageCommand(tags: Map[String,String])

Og for eksempel vil en forekomst af denne meddelelse være:

new InsertMessageCommand(Map("name"->"dor","lastName"->"sever"))

Og i betragtning af denne model var vi nødt til at understøtte følgende forespørgselskrav:

  1. Forespørgsel efter værdi
  2. Forespørgsel efter tagnavn og værdi

Den måde, hvorpå dette blev modelleret i vores Elasticsearch 1.7-skema, brugte et dynamisk skabelonskema (da tag-tasterne er dynamiske og ikke kan modelleres avanceret).

Den dynamiske skabelon forårsagede os flere afbrydelser på grund af kortlægningseksplosionsproblemet, og skemaet så sådan ud:

curl -X PUT "localhost:9200/_template/my_template?pretty" -H 'Content-Type: application/json' -d ' { "index_patterns": [ "your-index-names*" ], "mappings": { "_doc": { "dynamic_templates": [ { "tags": { "mapping": { "type": "text" }, "path_match": "actions.tags.*" } } ] } }, "aliases": {} }' curl -X PUT "localhost:9200/your-index-names-1/_doc/1?pretty" -H 'Content-Type: application/json' -d' { "actions": { "tags" : { "name": "John", "lname" : "Smith" } } } ' curl -X PUT "localhost:9200/your-index-names-1/_doc/2?pretty" -H 'Content-Type: application/json' -d' { "actions": { "tags" : { "name": "Dor", "lname" : "Sever" } } } ' curl -X PUT "localhost:9200/your-index-names-1/_doc/3?pretty" -H 'Content-Type: application/json' -d' { "actions": { "tags" : { "name": "AnotherName", "lname" : "AnotherLastName" } } } ' 
 curl -X GET "localhost:9200/_search?pretty" -H 'Content-Type: application/json' -d' { "query": { "match" : { "actions.tags.name" : { "query" : "John" } } } } ' # returns 1 match(doc 1) curl -X GET "localhost:9200/_search?pretty" -H 'Content-Type: application/json' -d' { "query": { "match" : { "actions.tags.lname" : { "query" : "John" } } } } ' # returns zero matches # search by value curl -X GET "localhost:9200/_search?pretty" -H 'Content-Type: application/json' -d' { "query": { "query_string" : { "fields": ["actions.tags.*" ], "query" : "Dor" } } } ' 

Indlejrede dokumenter løsning

Vores første instinkt til at løse problemet med kortlægningseksplosionen var at bruge indlejrede dokumenter.

Vi læste den indlejrede datatypevisning i Elastiske dokumenter og definerede følgende skema og forespørgsler:

curl -X PUT "localhost:9200/my_index?pretty" -H 'Content-Type: application/json' -d' { "mappings": { "_doc": { "properties": { "tags": { "type": "nested" } } } } } ' curl -X PUT "localhost:9200/my_index/_doc/1?pretty" -H 'Content-Type: application/json' -d' { "tags" : [ { "key" : "John", "value" : "Smith" }, { "key" : "Alice", "value" : "White" } ] } ' # Query by tag key and value curl -X GET "localhost:9200/my_index/_search?pretty" -H 'Content-Type: application/json' -d' { "query": { "nested": { "path": "tags", "query": { "bool": { "must": [ { "match": { "tags.key": "Alice" }}, { "match": { "tags.value": "White" }} ] } } } } } ' # Returns 1 document curl -X GET "localhost:9200/my_index/_search?pretty" -H 'Content-Type: application/json' -d' { "query": { "nested": { "path": "tags", "query": { "bool": { "must": [ { "match": { "tags.value": "Smith" }} ] } } } } } ' # Query by tag value # Returns 1 result 

Og denne løsning fungerede. Da vi forsøgte at indsætte rigtige kundedata, så vi dog, at antallet af dokumenter i vores indeks steg med omkring 500 gange.

Vi tænkte på følgende problemer og fortsatte med at finde en bedre løsning:

  1. Mængden af ​​dokumenter, vi havde i vores klynge, var omkring 500 millioner dokumenter. Dette betød, at vi med det nye skema skulle nå to hundrede og halvtreds milliarder dokumenter (det er 250.000.000.000 dokumenter?).
  2. We read this really good blog post — //blog.gojekengineering.com/elasticsearch-the-trouble-with-nested-documents-e97b33b46194 which highlights that nested documents can cause high latency in queries and heap usage problems.
  3. Testing — Since we were converting 1 document in the old cluster to an unknown number of documents in the new cluster, it would be much harder to track if the migration process worked without any data loss. If our conversion was 1:1, we could assert that the count in the old cluster equalled the count in the new cluster.

Avoiding nested documents

The real trick in this was to focus on what supported queries we were running: search by tag value, and search by tag key and value.

The first query does not require nested documents since it works on a single field. For the latter, we did the following trick. We created a field that contains the combination of the key and the value. Whenever a user queries on a key, value match, we translate their request to the corresponding text and query against that field.

Example:

curl -X PUT "localhost:9200/my_index_2?pretty" -H 'Content-Type: application/json' -d' { "mappings": { "_doc": { "properties": { "tags": { "type": "object", "properties": { "keyToValue": { "type": "keyword" }, "value": { "type": "keyword" } } } } } } } ' curl -X PUT "localhost:9200/my_index_2/_doc/1?pretty" -H 'Content-Type: application/json' -d' { "tags" : [ { "keyToValue" : "John:Smith", "value" : "Smith" }, { "keyToValue" : "Alice:White", "value" : "White" } ] } ' # Query by key,value # User queries for key: Alice, and value : White , we then query elastic with this query: curl -X GET "localhost:9200/my_index_2/_search?pretty" -H 'Content-Type: application/json' -d' { "query": { "bool": { "must": [ { "match": { "tags.keyToValue": "Alice:White" }}] }}} ' # Query by value only curl -X GET "localhost:9200/my_index_2/_search?pretty" -H 'Content-Type: application/json' -d' { "query": { "bool": { "must": [ { "match": { "tags.value": "White" }}] }}} ' 

Chapter 6 — The migration process

We planned to migrate about 500 million documents with zero downtime. To do that we needed:

  1. A strategy on how to transfer data from the old Elastic to the new Elasticsearch
  2. A strategy on how to close the lag between the start of the migration and the end of it

And our two options in closing the lag:

  1. Our messaging system is Kafka based. We could have just taken the current offset before the migration started, and after the migration ended, start consuming from that specific offset. This solution requires some manual tweaking of offsets and some other stuff, but will work.
  2. Another approach to solving this issue was to start consuming messages from the beginning of the topic in Kafka and make our actions on Elasticsearch idempotent — meaning, if the change was “applied” already, nothing would change in Elastic store.

The requests made by our service against Elastic were already idempotent, so we choose option 2 because it required zero manual work (no need to take specific offsets, and then set them afterward in a new consumer group).

How can we migrate the data?

These were the options we thought of:

  1. If our Kafka contained all messages from the beginning of time, we could just play from the start and the end state would be equal. But since we apply retention to out topics, this was not an option.
  2. Dump messages to disk and then ingest them to Elastic directly – This solution looked kind of weird. Why store them in disk instead of just writing them directly to Elastic?
  3. Transfer messages between old Elastic to new Elastic — This meant, writing some sort of “script” (did anyone say Python? ?) that will connect to the old Elasticsearch cluster, query for items, transform them to the new schema, and index them in the cluster.

We choose the last option. These were the design choices we had in mind:

  1. Let’s not try to think about error handling unless we need to. Let’s try to write something super simple, and if errors occur, let’s try to address them. In the end, we did not need to address this issue since no errors occurred during the migration.
  2. It’s a one-off operation, so whatever works first / KISS.
  3. Metrics — Since the migration processes can take hours to days, we wanted the ability from day 1 to be able to monitor the error count and to track the current progress and copy rate of the script.

We thought long and hard and choose Python as our weapon of choice. The final version of the code is below:

dictor==0.1.2 - to copy and transform our Elasticsearch documentselasticsearch==1.9.0 - to connect to "old" Elasticsearchelasticsearch6==6.4.2 - to connect to the "new" Elasticsearchstatsd==3.3.0 - to report metrics 
from elasticsearch import Elasticsearch from elasticsearch6 import Elasticsearch as Elasticsearch6 import sys from elasticsearch.helpers import scan from elasticsearch6.helpers import parallel_bulk import statsd ES_SOURCE = Elasticsearch(sys.argv[1]) ES_TARGET = Elasticsearch6(sys.argv[2]) INDEX_SOURCE = sys.argv[3] INDEX_TARGET = sys.argv[4] QUERY_MATCH_ALL = {"query": {"match_all": {}}} SCAN_SIZE = 1000 SCAN_REQUEST_TIMEOUT = '3m' REQUEST_TIMEOUT = 180 MAX_CHUNK_BYTES = 15 * 1024 * 1024 RAISE_ON_ERROR = False def transform_item(item, index_target): # implement your logic transformation here transformed_source_doc = item.get("_source") return {"_index": index_target, "_type": "_doc", "_id": item['_id'], "_source": transformed_source_doc} def transformedStream(es_source, match_query, index_source, index_target, transform_logic_func): for item in scan(es_source, query=match_query, index=index_source, size=SCAN_SIZE, timeout=SCAN_REQUEST_TIMEOUT): yield transform_logic_func(item, index_target) def index_source_to_target(es_source, es_target, match_query, index_source, index_target, bulk_size, statsd_client, logger, transform_logic_func): ok_count = 0 fail_count = 0 count_response = es_source.count(index=index_source, body=match_query) count_result = count_response['count'] statsd_client.gauge(stat='elastic_migration_document_total_count,index={0},type=success'.format(index_target), value=count_result) with statsd_client.timer('elastic_migration_time_ms,index={0}'.format(index_target)): actions_stream = transformedStream(es_source, match_query, index_source, index_target, transform_logic_func) for (ok, item) in parallel_bulk(es_target, chunk_size=bulk_size, max_chunk_bytes=MAX_CHUNK_BYTES, actions=actions_stream, request_timeout=REQUEST_TIMEOUT, raise_on_error=RAISE_ON_ERROR): if not ok: logger.error("got error on index {} which is : {}".format(index_target, item)) fail_count += 1 statsd_client.incr('elastic_migration_document_count,index={0},type=failure'.format(index_target), 1) else: ok_count += 1 statsd_client.incr('elastic_migration_document_count,index={0},type=success'.format(index_target), 1) return ok_count, fail_count statsd_client = statsd.StatsClient(host='localhost', port=8125) if __name__ == "__main__": index_source_to_target(ES_SOURCE, ES_TARGET, QUERY_MATCH_ALL, INDEX_SOURCE, INDEX_TARGET, BULK_SIZE, statsd_client, transform_item) 

Conclusion

Migrating data in a live production system is a complicated task that requires a lot of attention and careful planning. I recommend taking the time to work through the steps listed above and figure out what works best for your needs.

Som en tommelfingerregel skal du altid prøve at reducere dine krav så meget som muligt. Er der f.eks. Behov for en nul nedetid for nedetid? Har du råd til datatab?

Opgradering af datalagre er normalt et maraton og ikke en sprint, så tag en dyb indånding og prøv at nyde turen.

  • Hele processen nævnt ovenfor tog mig omkring 4 måneders arbejde
  • Alle eksempler på Elasticsearch, der vises i dette indlæg, er testet mod version 6.8.1