본문 바로가기

소프트웨어-이야기/데이터 저장소 + 시각화

(ElasticSearch) 엘라스틱서치에 데이터 싱크하기 ( 2 ) - Using Queues to Manage Batches

이 글은 [elastic] Keeping Elasticsearch in Sync 문서를 정리한 글 입니다 ㅎㅅㅎ


이전 글에서는 엘라스틱서치에 저장된 문서를 업데이트할 때, Batch로 한번에 업데이트하는게 효과적이라는 내용을 살펴봤었다. - (ElasticSearch) 엘라스틱서치에 데이터 싱크하기 ( 1 ) 

이번 글에서는 Batch로 한번에 업데이트할 문서 정보들을 원본 데이터에서 가져오는 방법에 대해서 정리해보려고 한다. 


Using Queues to Manage Batches

배치로 데이터들을 일괄 업데이트하려면, 수정해야하는 데이터들만 골라서 가져와야한다.  

이 때 큐를 사용하면 효율적으로 관리할 수 있다. 

큐 대기열을 관리할 저장소로는 SQL 테이블, Redis Sorted Set 등을 사용할 수 있다.  


참고 문서에 작성된 pseudocode를 읽어보니, 대략 이런 구조다. 

워커는 큐에서 얻은 포스트 아이디로 DB를 조회해서, 이 데이터를 엘라스틱 서치에 저장한다. 


Queue / Worker 구조로 변경했을 때, 3가지 장점이 있다.

1. 엘라스틱서치 복제와 비즈니스 로직의 의존성이 줄어든다. 

   그래서 코드의 가독성, 확장성이 높아진다. 

2. 엘라스틱서치 Update 요청수를 조절할 수 있다. 

   Queue의 갯수와 Worker가 도는 주기를 조정하면서, 엘라스틱 서치 Update 요청수를 조정할 수 있다. 

3. 엘라스틱서치 장애의 영향도가 줄어든다.

   - 큐를 사용하지 않는다면, 

      * 엘라스틱서치 장애는 포스트 조회 이벤트의 장애로 이어진다. 

      * Update Request가 늘어나는 경우, 엘라스틱서치에 부하를 발생시킨다. 


참고한 pseudocode는 다음과 같다. ORM Model에서 Update가 발생할 때마다, after filter가 큐에 데이터를 쏴주는 구조다.  

큐에 싱크될 포스트가 포함되어있는 경우에는 큐에 데이터를 추가해주지 않는다. 이렇게해야 같은 포스트를 중복으로 수정할 일이 없다. 

// Our ORM Model, linked to a table in our database class PostModel < ORMModel { function after_update(function (instance) { elasticsearch_queue.add(instance); }) } // Our queue implementation, it formats the queue entry for an instance // of the ORM model. The queue server could be anything from an SQL table to an AMQP queue class ElasticsearchQueue < Queue { function add(instance) { queue_server.insert_unless_exists(record_type: instance.class, record_id: instance.id); } } // Our worker, scheduled to run every N minutes. // It will read 1000 entries off the queue, issue a single batch request // to elasticsearch, and repeat if more items are still on the queue class QueueWorker < Worker { function work() { errorCount = 0; // Transactionally reserve 1000 entries at a time, but don't delete them from the queue yet // The entries will be only be fully deleted once the bulk operation has successfully completed while ((queue_entries = Queue.reserve(limit: 1000)) && records.length > 0) { try { // In this example we will build up the JSON-like body for the // elasticsearch queue API request. It consists of newline separated // JSON documents comprising action metadata, and document values bulk_body = ""; record_class = Class.named(queue_entry.record_type); i = 0; queue_entries.each(function (queue_entry) { i++; record = record_class.find(id: queue_entry.record_id); // Note, a production ready version of this code would double-check that the document still exists // and would create a bulk delete request if the record was no longer present action_metadata = {index: { _index: record_class.elasticsearch_index, _type: record_class.elasticsearch_type, _id: record.id}} bulk_body += "\n" unless i == 1; bulk_body += encode_json(action_metadata); bulk_body += "\n"; bulk_body += encode_json(record); }); bulk_body += "\n"; // The bulk API requires termination by a newline http_client.post("http://host.for.elasticsearch:9200/_bulk", body: bulk_body); // Now that we're sure processing has succeeded we can fully delete the queue entries queue_entries.delete } catch (StandardError ex) { queue_entries.unreserve; // Simply let the while loop retry up to 5 times. errorCount += 1 if (errorCount >= 5) { throw(CannotReplicateElasticsearchError); } else { Logger.warn("Error processing elasticsearchqueue, will retry. Attempt: $errorCount", ex) } } } } }