celery执行异步任务
在Django中使用celery可以实现异步执行任务的功能。以下是在Django中使用celery的步骤:
第一步:配置celery.py文件
在Django项目的根目录dx_movie/dx_movie下新建celery.py文件,并将以下配置信息复制进去:
import os
from celery import Celery
# Set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'dx_movie.settings')
app = Celery('dx_movie')
# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')
# Load task modules from all registered Django apps.
app.autodiscover_tasks()
@app.task(bind=True, ignore_result=True)
def debug_task(self):
print(f'Request: {self.request!r}')
第二步:导入celery.py文件
在dx_movie/dx_movie/__init__.py全局配置的初始化文件中,导入celery.py, 代码如下:
import pymysql
from .celery import app as celery_app
pymysql.install_as_MySQLdb()
__all__ = ('celery_app',)
第三步:配置settings.py文件
在Django项目的settings.py中添加以下配置信息:
# settings.py
# 配置Celery
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/1'
CELERY_TIMEZONE = "Asia/Shanghai"
CELERY_TASK_TRACK_STARTED = True
CELERY_TASK_TIME_LIMIT = 30 * 60
第四步:编写任务
在trade应用下,创建一个名为tasks.py的文件,编写任务函数:
from datetime import timedelta
from django.utils import timezone
from celery import shared_task
from .models import Order
@shared_task
def add(x, y):
return x + y
@shared_task
def mul(x, y):
return x * y
@shared_task
def xsum(numbers):
return sum(numbers)
第五步: 配置路由
配置一个路由,用于显示task接口。
urlpatterns = [
# 新增代码
path('api/tasks/', trade_views.TaskAPIView.as_view())
]
第六步: 创建视图
在trade/views.py下创建TaskAPIView, 代码如下:
from .tasks import add, mul, xsum
class TaskAPIView(APIView):
def get(self, request):
result1 = add.delay(3, 4)
print(f'add : {result1}')
result2 = mul.delay(4,5)
print(f'mul: {result2}')
result3 = xsum.delay([1,2,3])
print(f'xsum: {result3}')
return Response('执行task')
注意:task.delay()表示异步执行任务,task()表示同步执行任务。
第七步:启动redis 和 celery
先启动redis, 查看redis小结。
执行以下命令启动celery worker:
celery -A dx_movie worker -l info
第八步:监测celery任务
可以安装并启动Flower监测celery任务:
celery -A dx_movie flower
测试
访问tasks接口,示例如下:

接下来,就查看flower, 示例如下:

每次访问tasks接口,flower都会检测到新增任务。
以上就是在Django中使用celery执行异步任务的步骤。通过使用celery,我们可以将一些耗时的任务放到后台执行,提高系统的响应速度。
请根据你的项目实际情况进行相应的配置和调用任务函数。
【大熊课堂精品课程】
Python零基础入门动画课: https://www.bilibili.com/cheese/play/ss7988
Django+Vue:全栈开发: https://www.bilibili.com/cheese/play/ss8134
PyQT6开发桌面软件: https://www.bilibili.com/cheese/play/ss12314
Python办公自动化: https://www.bilibili.com/cheese/play/ss14990
Cursor AI编程+MCP:零基础实战项目课: https://www.bilibili.com/cheese/play/ss105194189
Pandas数据分析实战: https://www.bilibili.com/cheese/play/ss734522035
AI大模型+Python小白应用实战: https://www.bilibili.com/cheese/play/ss3844