본문 바로가기

소프트웨어-이야기/데이터 저장소 + 시각화

(ElasticSearch) 엘라스틱서치에 데이터 싱크하기 ( 3 ) - Batching Based on Ranges

이 글은 [elastic] Keeping Elasticsearch in Sync 문서를 정리한 글 입니다 ㅎㅅㅎ


이전글 ((ElasticSearch) 엘라스틱서치에 데이터 싱크하기 ( 2 ))에서는 계속 변경되는 데이터를 엘라스틱서치에 싱크하는 방법에 대해서 설명했다.

이번에는 Immutable한 데이터를 엘라스틱서치에 추가하는 방법에 대해서 설명하고자 한다.


Batching Based on Ranges

Immutable한 데이터의 예시로는 로그가 있다. 로그는 한번 추가되어도, 변경되지 않는 데이터다. 

그래서 이 경우에는 ElasticSearch에 Insert만한다. Update는 필요하지 않다. 


이 경우, 큐를 이용해서 배치에 사용될 데이터를 가져오는건 오버스펙이다. 

단순하게, 데이터를 가져와야하는 범위를 지정해서, 배치로 퍼올리면 된다.


그러나 데이터 범위 조회 기준이 되는 값 (ex. timestamp 필드)에 잘못된 값이 들어가거나, 배치 트랜잭션이 지연되는 경우, 데이터 싱크가 꼬일 수 있기 때문에, 이에 대한 주의가 필요하다. 


이런 식으로 관리하면 된다.

이 때, 타임라인별로 인덱스를 여러개로 쪼개서 저장하는게 좋다. 


왜냐하면, 아래의 2가지 이유 때문이다. 

1. 엘라스틱 서치 검색 성능 향상 

엘라스틱서치는 여러 인덱스에 한번에 쿼리를 날릴 수 있다. 

타임라인이 인덱스명에 반영되어있으면, 필요한 시기에 쌓인 인덱스만 조회할 수 있어서, 스캔 범위가 줄어든다.

2. 인덱스 삭제 관리 편의성

오래된 데이터를 지울 때, 인덱스명으로 인덱스를 찾아서, 통채로 지울 수 있다. 그래서 관리하기 편하다. 

그리고 인덱스 안에 있는 개별 문서들을 지우는 것보다, 인덱스 하나를 통채로 지우는게 더 빠르다.


위의 내용이 반영된 슈도코드는 아래와 같다.

아래는 사용자의 위치가 저장된 로그성 데이터를 엘라스틱서치에 퍼올리는 상황에 대한 예시 코드다. 

set_last_job_range_end에 데이터를 가져온 마지막 시간을 저장해두고, 배치를 돌릴때마다 꺼내쓴다. 데이터는 테이블, 파일, 캐시 등등에 저장하면 된다. 

// Gets executed every ten seconds class LocationImportWorker < PeriodicWorker { function work() { // If a job runs long, we want the next one to block waiting on this one // This is a naive way of handling this situation, ideally both jobs could run concurrently using // more careful state tracking that is beyond the scope of this example lock = acquire_lock("location_import_work"); last_job_range_end = get_last_job_range_end(); // Use the datastore's clock to prevent any bugs from clock skew // Choose 1 second ago, since the current second may yet see more records! this_job_range_end = Datastore.query("now() - '1 second'); locations = UserGeoLocations.where("timestamp > $last_job_ended_at AND timestamp < $this_job_range_end); // In this example the boilerplate for generating a bulk HTTP request has been omitted elasticsearch_client.bulk_index(locations); set_last_job_range_end(this_job_range_end); lock.release(); } }