Migrating from Celery to Django’s Task Framework is relaltively straight forward. There are only a few configuration changes needed, and a few simple substitutions. For more complicated tasks, consulting the tasks framework documentation is recommended.
Starting assumptions
For simplicity, I am going to assume running with uv though depending on one’s specific setup, you might need to replace it with the equivilant pip commands or other package manager.
Configuring settings.py
Any CELERY_ prefixed settings can be removed.
By default, tasks will use django.tasks.backends.immediate.ImmediateBackend, so generally you will want to use some other kind of backend.
django-tasks
is the origianl prototype and provides several backends that can be used.
For simplicity, the database backend is a good starting choice.
There may be other backends that can be found on PyPI
or Django Packages
.
# Install django-tasks into our project
uv add django-tasks
Then we need to update our settings.py
INSTALLED_APPS = [
# ...
"django_tasks",
"django_tasks.backends.database",
]
TASKS = {
"default": {
"BACKEND": "django_tasks.backends.database.DatabaseBackend"
}
}
and run our migrations.
uv run manage.py migrate
If you previously used multiple queues, you will need to define those in your task configuration. Celery did not check queue names but Tasks Framework does.
TASKS = {
"default": {
"BACKEND": "django_tasks.backends.database.DatabaseBackend",
"QUEUES": ["default", "high", "low", "extra"],
}
}
Updating our tasks
Most of the changes to migrate are faily simple and involve swapping the Celery decorators for Task decorators
from celery import shared_task
@shared_task
def my_task(a, b):
print("example", a, b)
will be rewritten as
from django.tasks import task
@task
def my_task(a, b):
print("example", a, b)
@task(priority=0, queue_name='default', backend='default', takes_context=False)
def task_with_extra_properties(a, b):
print("customized task", a, b)
If you have a ‘bound’ task in Celery, you can change it in this way.
from celery import shared_task
@shared_task(bind=True)
def my_bound_task(self, a, b):
print("example", self, a, b)
print("task name", self.name)
from django.tasks import TaskContext, task
@task(takes_context=True)
def my_task(context: TaskContext, a, b):
print("example", context, a, b)
print("task name", context.task_result.task.name)
Calling tasks
celery_task.delay(a=1, b=2)
new_task.enqueue(a=1, b=2)
If you need to change values, that changes a bit as well
celery_task.apply_async(queue="high", kwargs={"a": 1, "b": 2})
new_task.using(queue_name="high").enqueue(a=1, b=2)
Running
Running becomes somewhat simpler since we can now run as a django-admin command.
celery -A myproject.celery worker
# changes to
uv run manage.py db_worker
Periodic Tasks
Perodic tasks can be handled with the django-crontask library.
uv add django-crontask
# settings.py
INSTALLED_APPS = [
# ...
"crontask", # enables the management command
]
Tasks can be decorated with some new methods.
from crontask import CronTrigger, cron, IntervalTrigger
from django.tasks import task
@cron("0 0 * * *")
@task
def my_midnight_task():
print("this runs at midnight")
@cron(CronTrigger(hour=12, minute=0))
@task
def lunchtime_task():
print("this runs at midnight")
@cron(IntervalTrigger(hours=1))
@task
def hourly_task():
print("this runs every hour since app startup")
Then you can run our scheduler (similar to Celery’s ‘beat’ worker)
uv run manage.py crontask