Celery 实现异步和定时任务
在 Web 应用中,当一个请求涉及耗时的操作,例如生成报告、数据分析或复杂计算,且用户不需要实时返回结果时,可以通过异步化处理将这些操作从主请求流程中剥离,提高应用响应速度和用户体验。
异步化处理的目的:
- 减少用户等待时间。
- 提高服务器资源利用率,避免阻塞其他请求。
- 将操作分离为后台任务,用户可以稍后获取结果。
实现方式:
- 多线程/多进程:使用 Python 标准库的
threading
或multiprocessing
模块来异步执行任务,适合简单场景,但对复杂任务的扩展性较差。 - 使用三方库:
Celery
是一个流行的任务队列工具,支持任务分发和调度,能够结合消息队列(如 RabbitMQ、Redis)处理复杂的异步任务。Celery
提供了强大的任务追踪功能和自动重试机制,非常适合大型项目。
对比缓存优化:
- 缓存主要用于减少重复计算和数据查询,提升读取性能。
- 异步化处理则是针对复杂或耗时任务的拆分,避免阻塞请求,提高整体吞吐量。
使用 Celery 实现异步化
Celery 和 RabbitMQ 介绍
Celery 是一个简单、灵活、可靠的且可以处理大量消息的分布式系统。它是一个专注于实时处理的任务队列,同时也支持任务调度。
Celery 本身不提供队列服务,官方推荐使用 RabbitMQ 或 Redis 来实现消息队列服务。RabbitMQ 对高级消息队列协议(AMQP)有非常好的实现。
因为 RabbitMQ 依赖 Erlang 环境[^Erlang环境],首先要安装 Erlang。
[^Erlang环境]:Erlang 是一种通用的编程语言和运行环境,特别适合开发并发、分布式和容错应用。Erlang 环境为 RabbitMQ 提供了其底层分布式运行时环境。
版本兼容性矩阵,以帮助你选择合适的版本组合:
Django 版本 | Celery 版本 | Kombu 版本 | RabbitMQ 版本 |
---|---|---|---|
3.x - 4.x | 5.x | 5.x | 3.x |
2.x | 4.x | 4.x | 3.x |
配置 RabbitMQ
安装 RabbitMQ
安装配置参考 RabbitMQ 官方文档
更新包元数据
yum update -y
安装 Erlang 和 RabbitMQ
yum install -y erlang rabbitmq-server
启动、查看状态、停止 RabbitMQ
systemctl start rabbitmq-server
systemctl status rabbitmq-server
systemctl stop rabbitmq-server
在系统启动时默认启动守护进程
systemctl enable rabbitmq-server
访问 RabbitMQ 管理界面
启用 RabbitMQ 管理插件:
rabbitmq-plugins enable rabbitmq_management
然后,访问插件管理界面:http://localhost:15672/, 默认的用户名和密码是 guest
。
管理界面如下:
如果访问插件管理界面失败,检查 RabbitMQ 配置文件查看网络接口绑定是否正确;以及防火墙是否允许访问。一般需要对 RabbitMQ 服务设置和管理。
rabbitmqctl 命令管理配置
使用 rabbitmqctl
工具在命令行中管理 RabbitMQ 的配置和用户权限。
使用 CLI 创建用户、虚拟主机和分配权限:
# 创建管理用户
rabbitmqctl add_user hellokitty 123456
# 为用户设置管理员标签
rabbitmqctl set_user_tags hellokitty administrator
# 创建虚拟主机
rabbitmqctl add_vhost myvhost
# 指定用户对虚拟主机的配置、写和读权限(. 表示对所有资源)
rabbitmqctl set_permissions -p myvhost myuser ".*" ".*" ".*"
说明:
hellokitty 123456
:指定用户名和密码。- 设置用户标签,常见的用户标签有:
administrator
:完全控制权限monitoring
:监控相关权限management
:访问管理插件的权限
- 使用
rabbitmqctl set_permissions
命令用于设置用户在指定的虚拟主机上设置权限。该命令语法如下:
rabbitmqctl set_permissions -p <virtual_host> <username> <configure_regex> <write_regex> <read_regex>
- 其中,
<configure_regex>
、<write_regex>
和<read_regex>
是正则表达式,用于设置权限。<configure_regex>
:决定用户可以配置哪些资源(如队列、交换器)。<write_regex>
:决定用户可以向哪些队列或交换器发布消息(即生产者权限)。<read_regex>
:决定用户可以从哪些队列或交换器消费消息(即消费者权限)。
验证配置:
列出用户和虚拟主机:
[root@hadoop01 rabbitmq]# rabbitmqctl list_users
WARNING: ignoring /etc/rabbitmq/rabbitmq.conf -- location has moved to /etc/rabbitmq/rabbitmq-env.conf
Listing users ...
guest [administrator]
hellokitty [administrator]
...done.
[root@hadoop01 rabbitmq]# rabbitmqctl list_vhosts
WARNING: ignoring /etc/rabbitmq/rabbitmq.conf -- location has moved to /etc/rabbitmq/rabbitmq-env.conf
Listing vhosts ...
/
myvhost
...done.
以上操作,也可以使用自动化完成。见附件脚本 [[异步任务和定时任务#^ec4024|setup_rabbitmq.sh]],运行命令 bash setup_rabbitmq.sh
rabbitmq.conf 配置
这是 RabbitMQ 的主要配置文件,用于定义服务器的行为和网络设置。如果 rabbitmq.conf
配置文件不存在,创建 /etc/rabbitmq/rabbitmq.conf
。
rabbitmq.conf
配置内容如下:
# 配置 RabbitMQ 默认端口(5672)
listeners.tcp.default = 5672
# 配置 Web 管理插件端口(15672)和网络接口
management.listener.port = 15672
management.listener.ip = 0.0.0.0
# 默认用户和虚拟主机
default_user = hellokitty
default_pass = 123456
default_vhost = myvhost
# 日志级别
log.file.level = info
rabbitmq-env.conf 配置
这个文件用于设置 RabbitMQ 环境变量,以便启动 RabbitMQ 服务时使用。RabbitMQ 环境变量配置文件路径为: /etc/rabbitmq-env.conf
。
rabbitmq-env.conf
配置内容如下:
# 指定 RabbitMQ 节点的名称
NODENAME=rabbit@localhost
# 指定 RabbitMQ 服务监听的 IP 地址
NODE_IP_ADDRESS=0.0.0.0
# 指定 RabbitMQ 服务监听的端口号
NODE_PORT=5672
# 指定 RabbitMQ 的配置文件路径
RABBITMQ_CONFIG_FILE=/etc/rabbitmq/rabbitmq.conf
重启服务,访问插件管理,界面显示效果:
创建 Celery 实例
Celery 实例,可以称之为 Celery 应用程序或 app。它是在 Celery 中执行的所有操作的入口点。例如创建任务和管理工作器,因此,其他模块必须能够导入它。
在 Django 项目中配置 Celery,包括创建独立的 Celery 配置文件 celery.py
以及在 Django 配置文件中进行相关设置。
安装三方库 celery
pip install celery[redis] # 如果你使用 Redis 作为消息队列
pip install celery[rabbitmq] # 如果你使用 RabbitMQ 作为消息队列
pip install eventlet # 如果在 Windows 下执行 Celery
创建 Celery 实例文件
创建异步任务应用 asyncTasks
,用于管理相关资源。
在 Django 设置中安装应用:
INSTALLED_APPS = [
# ... 其他已安装的应用
'asyncTasks',
]
在 Django 项目中创建 aysncTask/celerys.py
文件,内容如下:
# aysncTask/celerys.py
import os
from celery import Celery
from django.conf import settings
# 设置Django环境变量
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'vote.settings')
# 创建第一个Celery实例
app = Celery('tasks')
# 对于复杂的项目,推荐使用专用的配置模块
# app.configfromobject('celeryconfig')
# 从 Django settings.py 中加载配置,在Django配置文件中以 CELERY 开头的配置项被识别和使用。
app.config_from_object('django.conf:settings', namespace='CELERY')
# 自动发现任务模块
# app.autodiscovertasks()
# 让Celery自动从项目中所有注册的应用里发现任务
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
# 定义调试任务
@app.task(bind=True)
def debug_task(self):
print(f'Request: {self.request!r}')
说明:
@app.task(bind=True)
的作用:- Celery 中的
@app.task
装饰器将一个普通的 Python 函数转换为 Celery 任务。 bind=True
参数的作用将任务绑定到任务实例本身,从而使任务方法能够访问实例属性和方法。
- Celery 中的
self.request
是任务实例的请求对象,包含当前任务的执行上下文信息,例如任务名称、ID、传递的参数、重试次数等。!r
是 Python 的 格式化字符串转换标志,用于调用对象的__repr__
方法。- 相比
!s
(或默认调用__str__
方法),!r
更适合调试,能提供更正式和准确的对象表示。
在 Django 中配置 Celery
在 settings.py
文件中添加 Celery 的相关配置:使用的消息代理、结果后端、内容类型、序列化格式以及时区等。
# vote/settings.py
# Celery settings
CELERY_BROKER_URL = 'amqp://hellokitty:123456@192.168.83.130:5672/myvhost' # RabbitMQ服务地址
CELERY_RESULT_BACKEND = 'rpc://' # 结果存储到RabbitMQ
CELERY_ACCEPT_CONTENT = ['json'] # 只接受json格式数据
CELERY_TASK_SERIALIZER = 'json' # 任务序列化和反序列化使用json
CELERY_RESULT_SERIALIZER = 'json' # 结果序列化和反序列化使用json
CELERY_TIMEZONE = 'UTC' # 使用UTC时间
# 自动加载Django app中的tasks.py任务
CELERY_IMPORTS = ('asyncTasks.tasks',)
说明:
CELERY_RESULT_BACKEND
用于指定任务结果后端[^结果后端]的存储位置。不同的后端支持不同的存储方式,以下常见配置选项:'rpc://'
:指定使用 RabbitMQ 的 Remote Procedure Call(RPC)机制来存储和检索任务结果。'db+sqlite:///results.sqlite'
:支持 SQLite、PostgreSQL、MySQL 等关系型数据库。'file:///path/to/results'
:使用文件系统作为结果存储。'django-db'
:使用 Django 的数据库作为结果存储。
[^结果后端]:结果后端是 Celery 用来存储任务执行结果的地方。当任务执行完成后,结果会被存储在结果后端中,供应用程序或其他任务后续获取和处理。RabbitMQ 可以作为结果后端,通常用于存储任务执行的结果。
异步任务
任务的异步执行流程:
- 定义任务:通过
@app.task
装饰器定义任务。 - 消息生产者:在需要执行任务的地方,通过调用
delay
方法将任务添加到 Celery 的任务队列中。 - Celery Worker:Celery 的工作进程(
worker
)从任务队列中获取任务,并执行任务函数。
创建异步任务
创建 Celery 异步任务 add
:
# 任务定义文件 asyncTasks/tasks.py
from asyncTasks.celerys import app
# 示例任务
@app.task
def send_email(from_user, to_user, cc_user, subject, content):
# 假设这里是发送邮件的逻辑
print(f"Sending email from {from_user} to {to_user}, cc {cc_user}, subject {subject}, content {content}")
# 在 Django 的某个视图或其他地方调用任务
# from .tasks import sendemail
#
# sendemail.delay('from@example.com', ['to@example.com'], ['cc@example.com'], 'Subject', 'Email content')
# 定义一个异步任务
@app.task
def add(x, y):
print(f"Executing task: add({x}, {y})")
return x + y
应用模块初始化
在 Python 模块的初始化文件__init__.py
,导入 Celery 应用实例,用于暴露特定的对象,以便在其他模块中可以使用这些对象。
# asyncTasks/init.py
from asyncTasks.celerys import app as celery_app
__all__ = ('celery_app',)
说明:__all__
是一个特殊的变量,用于指定模块(文件)哪些内容暴露给外部使用,明确模块中的公共接口。
调用异步任务
创建一个专门的模块调用异步任务。
# asyncTasks/runtasks.py
import time
from asyncTasks.tasks import add
# 调度任务
result = add.delay(16, 15)
for i in range(10):
if not result.ready():
print("Task status:", result.status)
time.sleep(10)
result_value = result.get()
print("Task status:", result.status)
print("Task result:", result_value)
time.sleep(1)
说明:
add.delay()
方法用于将任务异步地添加到 Celery 的任务队列中,并立即返回,不会阻塞当前线程。
启动 Celery Worker
启动 Celery Worker 来处理异步任务。
celery -A asyncTasks worker -P eventlet -l info
说明:
-A asyncTasks
指定 Celery 应用所在的模块。worker
启动一个 Celery 工作进程,负责接收和执行任务。-P eventlet
: Windows 环境下需要指定eventlet
进程池,否则,可能在work
接收到任务时阻塞执行。参考文末[[异步任务和定时任务#^22bd29|问题1]]。-l
指定日志级别。- 可以在启动 Celery worker 或 beat 时指定
-n
参数值,为当前工作进程指定一个唯一的名称:
celery -A <module_name> worker -l <log_level> -P <pool_type> -n <worker_name>
- 也可以根据主机名动态生成工作进程的名称,例如
-n "worker1@%(hostname)s"
提示:Windows 下可以使用
taskkill
终止进程:taskkill /F /IM celery*
验证任务执行情况
运行 Python 程序 run_tasks.py
。
Python 程序终端打印如下:
D:\PycharmProjects\vote\venv\Scripts\python.exe D:\PycharmProjects\myproject\vote\asyncTasks\run_tasks.py
Task status: PENDING
Task status: SUCCESS
Task result: 31
Task status: SUCCESS
Task result: 31
...
Celery Woker 日志打印:
(venv) PS D:\PycharmProjects\myproject\vote> celery -A asyncTasks worker -P eventlet -l info
...
[2024-06-19 20:05:15,980: INFO/MainProcess] pidbox: Connected to amqp://hellokitty:**@192.168.83.130:5672/myvhost.
[2024-06-19 20:05:15,983: INFO/MainProcess] celery@DESKTOP-TE8F48P ready.
[2024-06-19 20:05:15,984: INFO/MainProcess] Task asyncTasks.tasks.add[bd21b484-a492-42fe-9831-8f71303c9362] received
[2024-06-19 20:05:15,985: WARNING/MainProcess] Executing task: add(16, 15)
[2024-06-19 20:05:15,997: INFO/MainProcess] Task asyncTasks.tasks.add[bd21b484-a492-42fe-9831-8f71303c9362] succeeded in 0.015000000013969839s: 31
定时任务
Celery Worker 负责从任务队列中提取异步任务并执行。这是 Celery 的核心功能。主要处理那些耗时操作或需要后台运行的任务。
而 Celery Beat 专注于定时任务的调度[^调度存储]和存储管理。它充当调度器的角色,根据预定义的周期或触发时间,将任务放到队列,供 Celery Worker 执行。
[^调度存储]:调度存储(Beat Schedule Storage)是 Celery Beat 用来存储定时任务调度信息的地方。这些信息包括定时任务的执行时间、参数等。通常可以使用文件系统或者数据库作为调度存储,但不推荐使用 RabbitMQ。对于 Celery Beat 的定时任务调度信息,推荐使用文件系统(如 SQLite 文件)或数据库(如 MySQL、PostgreSQL)来作为持久化存储。
配置 django-celery-beat
django-celery-beat
是一个用于与 Django 集成的 Celery 扩展包,它提供了管理 定时任务调度 的功能。通过它,你可以在 Django 的 Admin 界面中直观地添加、修改和删除定时任务,而无需直接修改代码。
安装:
pip install django-celery-beat
在 settings.py
文件中,添加配置:
INSTALLED_APPS = [
# ... 其他已安装的应用
'asyncTasks',
'django_celery_beat',
]
创建 Celery 应用实例
定时任务实例 app_schedule
:
# asyncTasks/celery.py
# celerys.py
import os
from celery import Celery
from celery.schedules import crontab
from django.conf import settings
# 设置Django环境变量
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'vote.settings')
# 创建第一个Celery实例
app = Celery('tasks')
# 对于复杂的项目,推荐使用专用的配置模块
# app.configfromobject('celeryconfig')
# 从 Django settings.py 中加载配置
app.config_from_object('django.conf:settings', namespace='CELERY')
# 自动发现任务模块
# app.autodiscovertasks()
# 让Celery自动从项目中所有注册的应用里发现异步任务/定时任务
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
@app.task(bind=True)
def debug_task(self):
print(f'Request: {self.request!r}')
# 创建定时任务的Celery实例
app_schedule = Celery('app_schedule')
# 从 Django settings.py 中加载配置
app_schedule.config_from_object('django.conf:settings', namespace='CELERY')
app_schedule.autodiscover_tasks()
app_schedule.conf.update(
# imports=['asyncTasks.tasks'],
timezone=settings.TIME_ZONE,
enable_utc=True,
beat_schedule={ # 使用正确的配置项名称
'task2': {
'task': 'asyncTasks.tasks.scheduled_task',
'schedule': crontab(minute='*/1'), # 每1分钟执行一次
'args': ('Argument for task2',)
},
'task3': {
'task': 'asyncTasks.tasks.scheduled_task',
'schedule': crontab(minute='0', hour='*/2'), # 每2小时执行一次
'args': ('Argument for task3',)
},
'task4': {
'task': 'asyncTasks.tasks.scheduled_task',
'schedule': crontab(minute='0', hour='12', day_of_week='mon'), # 每周一中午12点执行
'args': ('Argument for task4',)
},
},
)
应用模块初始化
# asyncTasks/init.py
from asyncTasks.celerys import app as celery_app, app_schedule
__all__ = ('celery_app', 'app_schedule',)
定时任务内容
from celery import shared_task
import logging
logger = logging.getLogger('django')
@shared_task
def scheduled_task(arg):
# 任务实现逻辑
logger.info(f'Scheduled task with arg: {arg}')
print(f'Executing scheduled task with arg: {arg}')
说明:
@app.task
装饰器用于将一个函数注册为 Celery 任务。它需要绑定到具体的 Celery 实例上,通常用于在 Celery 实例的上下文中定义任务。任务与特定的 Celery 实例绑定,不太适合在多项目或共享任务的场景中使用。@shared_task
装饰器用于将一个函数注册为 Celery 任务,但它不需要绑定到具体的 Celery 实例上。它通常用于 Django 项目中的应用模块,使任务更易于被多个 Celery 实例共享和发现。
运行定时任务
启动 Django 项目
python manage.py runserver
启动 Celery Worker
celery -A asyncTasks worker -P eventlet -l info
启动 Celery Beat
celery -A asyncTasks.celerys.app_schedule beat --loglevel=debug
说明:
asyncTasks.celerys.app_schedule
:当有多个 Celery 实例时,启动 Celery Beat 需要指定对应的定时任务实例。
检查任务结果
Celery Woker 日志输出:
[2024-06-19 21:29:25,023: INFO/MainProcess] Task asyncTasks.tasks.add[b042c567-9642-4e46-89dc-722d9dc65864] received
[2024-06-19 21:29:25,025: WARNING/MainProcess] Executing task: add(16, 15)
[2024-06-19 21:29:25,036: INFO/MainProcess] Task asyncTasks.tasks.add[b042c567-9642-4e46-89dc-722d9dc65864] succeeded in 0.0s: 31
[2024-06-19 21:30:15,181: INFO/MainProcess] Task asyncTasks.tasks.scheduled_task[6c4e45c4-649d-4591-a9db-33d1f5b5568b] received
INFO Scheduled task with arg: Argument for task2
[2024-06-19 21:30:15,182: INFO/MainProcess] Scheduled task with arg: Argument for task2
[2024-06-19 21:30:15,182: WARNING/MainProcess] Executing scheduled task with arg: Argument for task2
[2024-06-19 21:30:15,195: INFO/MainProcess] Task asyncTasks.tasks.scheduled_task[6c4e45c4-649d-4591-a9db-33d1f5b5568b] succeeded in 0.01600000000325963s:
None
Celery Beat 日志输出:
[2024-06-19 21:30:15,105: INFO/MainProcess] beat: Starting...
[2024-06-19 21:30:15,162: DEBUG/MainProcess] Current schedule:
<ScheduleEntry: task2 asyncTasks.tasks.scheduled_task('Argument for task2') <crontab: */1 * * * * (m/h/dM/MY/d)>
<ScheduleEntry: task3 asyncTasks.tasks.scheduled_task('Argument for task3') <crontab: 0 */2 * * * (m/h/dM/MY/d)>
<ScheduleEntry: task4 asyncTasks.tasks.scheduled_task('Argument for task4') <crontab: 0 12 * * mon (m/h/dM/MY/d)>
[2024-06-19 21:30:15,162: DEBUG/MainProcess] beat: Ticking with max interval->5.00 minutes
...
[2024-06-19 21:30:15,172: INFO/MainProcess] Scheduler: Sending due task task2 (asyncTasks.tasks.scheduled_task)
[2024-06-19 21:30:15,174: DEBUG/MainProcess] using channel_id: 1
[2024-06-19 21:30:15,175: DEBUG/MainProcess] Channel open
[2024-06-19 21:30:15,180: DEBUG/MainProcess] beat: Synchronizing schedule...
[2024-06-19 21:30:15,181: DEBUG/MainProcess] asyncTasks.tasks.scheduled_task sent. id->6c4e45c4-649d-4591-a9db-33d1f5b5568b
[2024-06-19 21:30:15,188: DEBUG/MainProcess] beat: Waking up in 44.80 seconds.
[2024-06-19 21:31:00,000: INFO/MainProcess] Scheduler: Sending due task task2 (asyncTasks.tasks.scheduled_task)
[2024-06-19 21:31:00,002: DEBUG/MainProcess] asyncTasks.tasks.scheduled_task sent. id->5d9c3da2-5f43-42dc-bccc-69be34e417a3
运行任务发现,定时任务的函数中的日志打印逻辑不会在 Python 运行的终端打印内容。- 这是因为在 Celery 的架构中,Celery 的任务是在独立的进程中执行的,任务函数的打印语句不会直接在 Python 运行终端中输出,不同于直接在 Python shell 或脚本中运行的函数。
如果想要查看定时任务内容内部的执行逻辑信息,可以使用日志记录器来捕获和管理。
Celery Flower 监控
Celery Flower 是一个 Web 监控工具,用于查看和管理 Celery 分布式任务队列。它提供了一个直观的界面来监控任务状态、队列和工作节点的实时信息。
安装 Flower
pip install flower
启动 Flower
celery --broker=amqp://hellokitty:123456@192.168.83.130:5672/myvhost flower
访问 Web 页面
访问 http://0.0.0.0:5555
,检查任务执行结果如下:
附件
脚本 setup_rabbitmq.sh
: ^ec4024
#!/bin/bash
# 创建用户
rabbitmqctl add_user hellokitty 123456
# 设置用户标签
rabbitmqctl set_user_tags hellokitty administrator
# 创建虚拟主机
rabbitmqctl add_vhost myvhost
# 分配权限
rabbitmqctl set_permissions -p myvhost hellokitty ".*" ".*" ".*"
# 验证结果
rabbitmqctl list_users
rabbitmqctl list_vhosts
rabbitmqctl list_permissions -p myvhost
参考
问题
问题1. Celery woker 程序在 Windows 环境下接受任务后执行阻塞,且进程难以 ctrl+c
停掉。
分析解决:Windows 下启动 Celery 程序需要指定使用 eventlet 进程池。 ^22bd29
- 安装
eventlet
:
pip install eventlet
- 启动 Celery 服务:
celery -A 应用任务 worker -P eventlet -c 2 -l info