skip to content
Back

Celery on ElasticCache

/ 2 min read

Updated:

I have spent nearly the entire day trying to get Celery working on ElasticCache as a Redis backend.

ElasticCache is set up to deploy clusters, but Celery doesn’t support that.

There is one open-source project that attempts to solve this problem, but it is not maintained (and doesn’t work with current versions of Celery).

If you find yourself in the same scenario, I recommend switching to SQS as a queue and S3 as a backend.

Unless you have very performance-critical tasks, this is a good solution. And also cut my infrastructure cost by about 26%.

For s3, you need to set a bunch of config options:

s3_bucket = os.getenv("CELERY_S3_BUCKET", "")
s3_access_key_id = os.getenv("CELERY_S3_ACCESS_KEY_ID", "")
s3_secret_access_key = os.getenv("CELERY_S3_SECRET_ACCESS_KEY", "")
s3_base_path = "/celery_results"
s3_region = os.getenv("AWS_REGION", "eu-central-1")
s3_endpoint_url = f"https://{s3_bucket}.s3.{s3_region}.amazonaws.com"

And feed in the endpoint url as the backend url.

For SQS, it’s a little trickier.

I had originally set up redis with multiple different queues, because in Redis they are just different keys.

But in SQS, you would need to create a queue for each task type.

But routing_keys are here to help.

Routing keys are used to route invocations to the right task from the same queue.

My settings:

broker_transport_options = {
'predefined_queues': {
'celery': { # Keep single queue
'url': SQS_URL,
'access_key_id': AWS_ACCESS_KEY_ID,
'secret_access_key': AWS_SECRET_ACCESS_KEY,
}
},
'polling_interval': 1,
'wait_time_seconds': 20,
'visibility_timeout': 3600,
'region': AWS_REGION
}
task_queues = {
'celery': {
'exchange': 'default',
'exchange_type': 'direct',
'routing_key': '#' # Match all routing keys
}
}
task_routes = {
'detect': { # Simplified from 'detect.*'
'queue': 'celery',
'routing_key': 'detect',
}
}

Now you can send the tasks with the routing key:

celery_app.send_task(
name="detect",
kwargs=task_kwargs,
queue='celery',
routing_key='detect', # Add routing key
retry=True,
retry_policy={
'max_retries': 3,
'interval_start': 0,
'interval_step': 0.2,
'interval_max': 0.5,
}
)

There might be a cleaner way to do this, but this works.

The documentation is a bit sparse, so I hope this helps someone.