본문 바로가기

소프트웨어 이야기/프로그래밍 언어와 프레임워크

(Celery) 트랜잭션이 커밋된 이후에 Celery Task 전송하기

트랜잭션 안에서 Celery Task가 호출되는 경우, 트랜잭션이 커밋되기 전에 Task가 실행될 수 있다.

이 때, Task가 커밋되지 않은 데이터를 참조하는 경우, 오류가 발생할 수 있다. 

위와 같은 상황의 예시와 이를 회피하는 방법에 대해서 정리해보고자 한다.

 

문제상황

아래의 코드는 회원가입을 처리하는 가상의 코드이다.

from django.db import transaction

from coupon.util import create_welcome_coupon
from mileage.util import create_welcome_mileage

@transaction.atomic()
def create_user(user_data):
    user = User.objects.create(**user_data)
    send_email_welcome_join.delay(user.pk)
    create_welcome_coupon(user.pk)
    create_welcome_mileage(user.pk)

 

이 때, 트랜잭션이 실행되는 도중에 이메일 발송 태스크가 먼저 실행되는 경우, 이메일 발송 태스크에서는 회원 데이터를 조회할 수 없다. 

 

Celery에서는 회원가입 로직과 다른 DB session에서 데이터를 조회하기 때문이다. 

 

해결방법

django transaction의 on_commit를 활용하면, commit이 성공한 이후에 셀러리 함수가 호출되도록 할 수 있다. 

from django.db import transaction

from coupon.util import create_welcome_coupon
from mileage.util import create_welcome_mileage

@transaction.atomic()
def create_user(user_data):
    user = User.objects.create(**user_data)
    transaction.on_commit(lambda: send_email_welcome_join.delay(user.pk))
    create_welcome_coupon(user.pk)
    create_welcome_mileage(user.pk)

 

Celery Task  함수를 on_commit 함수로 감싸주면 된다.

 

참고

 

Database transactions | Django documentation | Django

Django The web framework for perfectionists with deadlines. Overview Download Documentation News Community Code Issues About ♥ Donate

docs.djangoproject.com

 

Tasks — Celery 4.3.0 documentation

Let’s take a real world example: a blog where comments posted need to be filtered for spam. When the comment is created, the spam filter runs in the background, so the user doesn’t have to wait for it to finish. I have a Django blog application allowing co

docs.celeryproject.org

 

  • 2020.07.13 15:31 댓글주소 수정/삭제 댓글쓰기

    안녕하세요 . 유용한 글 잘 읽었습니다.
    저같은 경우에는 with transaction.atomic() 구문 바깥에서 Celery Task를 전송하는데도 간헐적으로 Task 실행이 안되는 케이스가 종종 발생하여서, celery task 보내는 부분에 글쓴이님이 하신 것처럼 Celery Task 함수를 on_commit 함수로 감싸주니 해결되었습니다.

    이론적으로는 Transaction 바깥에서 호출하면 커밋 이후니까 문제가 없을 것처럼 생각이 되는데, 종종 오류가 발생하여 당혹스럽네요.
    AS-IS 소스는 아래와 같은데, 혹시 경험하셨거나 왜 이런지 아신다면 답변 부탁드리겠습니다. 감사합니다!
    try:
    with transaction.atomic():
    ~~랄라
    except Exception:
    ~~랄라
    send_email_welcome_join.delay(user.pk)

    • 간헐적으로 태스크가 실행되지 않은건 아닌 것 같아요. 실행되긴 했지만, 함수 안에서 오류가 발생했거나 방어로직 때문에 의도한 기능이 동작하지 않은게 아닐지 의심되네요.

      그리고 Exception이 발생한 경우에는 트랜잭션이 커밋되지 않기 때문에, 샐러리 함수에서 참조하는 데이터가 유효하지 않았을 것으로 추정됩니다~

      그래서 on_commit을 추가하셨을 때에만 잘 동작하셨던것같아요.

      이건 오류없이 커밋이 잘 되었을 때에 샐러리 태스크를 실행하겠다는 것을 의미하기 때문에, 예제 코드와는 다르게 동작합니다 ^^

  • 궁금한게 있어 질문드려 봅니다.

    하나의 함수에서 절차적으로 수행한다면
    user에 object의 값이 들어오는 경우에 다음 라인인
    send_email_welcome_join.delay(user.pk)가 실행되는게 아닌가요?
    user 반환이 되지 않은 상태에서 다음 행의 실행이 동작되는 걸까요?
    혹시 테스트케이스나 프린트를 찍어서 확인해보신적이 있는지 너무 궁금해서 문의 드려요.

    이제 celery를 사용하려고 공부를 시작하고 있는데
    파이썬의 트랜잭션 결과가 return되지 않았는데도 다음 행이 수행될 수 있다는 사실이
    너무 충격적으로 놀라워서요....

    • Celery 태스크를 호출한다는 것은 비동기로 함수를 호출한다는 것을 의미합니다. 그래서 메인 함수에서는 Celery 태스크 실행이 완료될 때까지 대기하지 않는거죠.

      이 덕분에 메인 함수의 실행 시간을 단축시킬 수 있어 비동기 프레임워크를 사용하는거라 이해하시면 될 것 같아요.