文章

Celery 实现异步和定时任务

在 Web 应用中,当一个请求涉及耗时的操作,例如生成报告、数据分析或复杂计算,且用户不需要实时返回结果时,可以通过异步化处理将这些操作从主请求流程中剥离,提高应用响应速度和用户体验。

异步化处理的目的:

  • 减少用户等待时间。
  • 提高服务器资源利用率,避免阻塞其他请求。
  • 将操作分离为后台任务,用户可以稍后获取结果。

实现方式:

  • 多线程/多进程:使用 Python 标准库的 threadingmultiprocessing 模块来异步执行任务,适合简单场景,但对复杂任务的扩展性较差。
  • 使用三方库:Celery 是一个流行的任务队列工具,支持任务分发和调度,能够结合消息队列(如 RabbitMQ、Redis)处理复杂的异步任务。Celery 提供了强大的任务追踪功能和自动重试机制,非常适合大型项目。

对比缓存优化:

  • 缓存主要用于减少重复计算和数据查询,提升读取性能。
  • 异步化处理则是针对复杂或耗时任务的拆分,避免阻塞请求,提高整体吞吐量。

使用 Celery 实现异步化

Celery 和 RabbitMQ 介绍

Celery 是一个简单、灵活、可靠的且可以处理大量消息的分布式系统。它是一个专注于实时处理的任务队列,同时也支持任务调度。

Celery 本身不提供队列服务,官方推荐使用 RabbitMQ 或 Redis 来实现消息队列服务。RabbitMQ 对高级消息队列协议(AMQP)有非常好的实现。

因为 RabbitMQ 依赖 Erlang 环境[^Erlang环境],首先要安装 Erlang。

[^Erlang环境]:Erlang 是一种通用的编程语言和运行环境,特别适合开发并发、分布式和容错应用。Erlang 环境为 RabbitMQ 提供了其底层分布式运行时环境。

版本兼容性矩阵,以帮助你选择合适的版本组合:

Django 版本 Celery 版本 Kombu 版本 RabbitMQ 版本
3.x - 4.x 5.x 5.x 3.x
2.x 4.x 4.x 3.x

配置 RabbitMQ

安装 RabbitMQ

安装配置参考 RabbitMQ 官方文档

更新包元数据

yum update -y

安装 Erlang 和 RabbitMQ

yum install -y erlang rabbitmq-server

启动、查看状态、停止 RabbitMQ

systemctl start rabbitmq-server

systemctl status rabbitmq-server

systemctl stop rabbitmq-server

在系统启动时默认启动守护进程

systemctl enable rabbitmq-server

访问 RabbitMQ 管理界面

启用 RabbitMQ 管理插件:

rabbitmq-plugins enable rabbitmq_management

然后,访问插件管理界面:http://localhost:15672/, 默认的用户名和密码是 guest

管理界面如下:

Pasted image 20240615093436.png

如果访问插件管理界面失败,检查 RabbitMQ 配置文件查看网络接口绑定是否正确;以及防火墙是否允许访问。一般需要对 RabbitMQ 服务设置和管理。

rabbitmqctl 命令管理配置

使用 rabbitmqctl 工具在命令行中管理 RabbitMQ 的配置和用户权限。

使用 CLI 创建用户、虚拟主机和分配权限:

# 创建管理用户
rabbitmqctl add_user hellokitty 123456
# 为用户设置管理员标签
rabbitmqctl set_user_tags hellokitty administrator
# 创建虚拟主机
rabbitmqctl add_vhost myvhost
# 指定用户对虚拟主机的配置、写和读权限(. 表示对所有资源)
rabbitmqctl set_permissions -p myvhost myuser ".*" ".*" ".*"

说明:

  • hellokitty 123456:指定用户名和密码。
  • 设置用户标签,常见的用户标签有:
    • administrator:完全控制权限
    • monitoring:监控相关权限
    • management:访问管理插件的权限
  • 使用 rabbitmqctl set_permissions 命令用于设置用户在指定的虚拟主机上设置权限。该命令语法如下:
rabbitmqctl set_permissions -p <virtual_host> <username> <configure_regex> <write_regex> <read_regex>
  • 其中,<configure_regex><write_regex><read_regex> 是正则表达式,用于设置权限。
    • <configure_regex>:决定用户可以配置哪些资源(如队列、交换器)。
    • <write_regex>:决定用户可以向哪些队列或交换器发布消息(即生产者权限)。
    • <read_regex>:决定用户可以从哪些队列或交换器消费消息(即消费者权限)。

验证配置:

列出用户和虚拟主机:

[root@hadoop01 rabbitmq]# rabbitmqctl list_users
WARNING: ignoring /etc/rabbitmq/rabbitmq.conf -- location has moved to /etc/rabbitmq/rabbitmq-env.conf
Listing users ...
guest	[administrator]
hellokitty	[administrator]
...done.

[root@hadoop01 rabbitmq]# rabbitmqctl list_vhosts
WARNING: ignoring /etc/rabbitmq/rabbitmq.conf -- location has moved to /etc/rabbitmq/rabbitmq-env.conf
Listing vhosts ...
/
myvhost
...done.

以上操作,也可以使用自动化完成。见附件脚本 [[异步任务和定时任务#^ec4024|setup_rabbitmq.sh]],运行命令 bash setup_rabbitmq.sh

rabbitmq.conf 配置

这是 RabbitMQ 的主要配置文件,用于定义服务器的行为和网络设置。如果 rabbitmq.conf 配置文件不存在,创建 /etc/rabbitmq/rabbitmq.conf

rabbitmq.conf 配置内容如下:

# 配置 RabbitMQ 默认端口(5672)
listeners.tcp.default = 5672

# 配置 Web 管理插件端口(15672)和网络接口
management.listener.port = 15672
management.listener.ip = 0.0.0.0

# 默认用户和虚拟主机
default_user = hellokitty
default_pass = 123456
default_vhost = myvhost

# 日志级别
log.file.level = info

rabbitmq-env.conf 配置

这个文件用于设置 RabbitMQ 环境变量,以便启动 RabbitMQ 服务时使用。RabbitMQ 环境变量配置文件路径为: /etc/rabbitmq-env.conf

rabbitmq-env.conf 配置内容如下:

# 指定 RabbitMQ 节点的名称
NODENAME=rabbit@localhost
# 指定 RabbitMQ 服务监听的 IP 地址
NODE_IP_ADDRESS=0.0.0.0
# 指定 RabbitMQ 服务监听的端口号
NODE_PORT=5672
# 指定 RabbitMQ 的配置文件路径
RABBITMQ_CONFIG_FILE=/etc/rabbitmq/rabbitmq.conf

重启服务,访问插件管理,界面显示效果:

Pasted image 20240615093219.png

创建 Celery 实例

Celery 实例,可以称之为 Celery 应用程序或 app。它是在 Celery 中执行的所有操作的入口点。例如创建任务和管理工作器,因此,其他模块必须能够导入它。

在 Django 项目中配置 Celery,包括创建独立的 Celery 配置文件 celery.py 以及在 Django 配置文件中进行相关设置。

安装三方库 celery

pip install celery[redis]  # 如果你使用 Redis 作为消息队列
pip install celery[rabbitmq]  # 如果你使用 RabbitMQ 作为消息队列
pip install eventlet  # 如果在 Windows 下执行 Celery

创建 Celery 实例文件

创建异步任务应用 asyncTasks,用于管理相关资源。

在 Django 设置中安装应用:

INSTALLED_APPS = [
    # ... 其他已安装的应用
    'asyncTasks',
]

在 Django 项目中创建 aysncTask/celerys.py 文件,内容如下:

# aysncTask/celerys.py

import os
from celery import Celery
from django.conf import settings

# 设置Django环境变量
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'vote.settings')

# 创建第一个Celery实例
app = Celery('tasks')

# 对于复杂的项目,推荐使用专用的配置模块
# app.configfromobject('celeryconfig')
# 从 Django settings.py 中加载配置,在Django配置文件中以 CELERY 开头的配置项被识别和使用。
app.config_from_object('django.conf:settings', namespace='CELERY')

# 自动发现任务模块
# app.autodiscovertasks()
# 让Celery自动从项目中所有注册的应用里发现任务
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

# 定义调试任务
@app.task(bind=True)
def debug_task(self):
    print(f'Request: {self.request!r}')

说明:

  • @app.task(bind=True) 的作用:
    • Celery 中的 @app.task 装饰器将一个普通的 Python 函数转换为 Celery 任务。
    • bind=True 参数的作用将任务绑定到任务实例本身,从而使任务方法能够访问实例属性和方法。
  • self.request 是任务实例的请求对象,包含当前任务的执行上下文信息,例如任务名称、ID、传递的参数、重试次数等。
  • !r 是 Python 的 格式化字符串转换标志,用于调用对象的 __repr__ 方法。
  • 相比 !s(或默认调用 __str__ 方法),!r 更适合调试,能提供更正式和准确的对象表示。

在 Django 中配置 Celery

settings.py 文件中添加 Celery 的相关配置:使用的消息代理、结果后端、内容类型、序列化格式以及时区等。

# vote/settings.py

# Celery settings
CELERY_BROKER_URL = 'amqp://hellokitty:123456@192.168.83.130:5672/myvhost'  # RabbitMQ服务地址
CELERY_RESULT_BACKEND = 'rpc://'  # 结果存储到RabbitMQ
CELERY_ACCEPT_CONTENT = ['json']  # 只接受json格式数据
CELERY_TASK_SERIALIZER = 'json'  # 任务序列化和反序列化使用json
CELERY_RESULT_SERIALIZER = 'json'  # 结果序列化和反序列化使用json
CELERY_TIMEZONE = 'UTC'  # 使用UTC时间
# 自动加载Django app中的tasks.py任务
CELERY_IMPORTS = ('asyncTasks.tasks',)

说明:

  • CELERY_RESULT_BACKEND 用于指定任务结果后端[^结果后端]的存储位置。不同的后端支持不同的存储方式,以下常见配置选项:
    • 'rpc://':指定使用 RabbitMQ 的 Remote Procedure Call(RPC)机制来存储和检索任务结果。
    • 'db+sqlite:///results.sqlite':支持 SQLite、PostgreSQL、MySQL 等关系型数据库。
    • 'file:///path/to/results':使用文件系统作为结果存储。
    • 'django-db':使用 Django 的数据库作为结果存储。

[^结果后端]:结果后端是 Celery 用来存储任务执行结果的地方。当任务执行完成后,结果会被存储在结果后端中,供应用程序或其他任务后续获取和处理。RabbitMQ 可以作为结果后端,通常用于存储任务执行的结果。

异步任务

任务的异步执行流程:

  1. 定义任务:通过 @app.task 装饰器定义任务。
  2. 消息生产者:在需要执行任务的地方,通过调用 delay 方法将任务添加到 Celery 的任务队列中。
  3. Celery Worker:Celery 的工作进程(worker)从任务队列中获取任务,并执行任务函数。

创建异步任务

创建 Celery 异步任务 add

# 任务定义文件 asyncTasks/tasks.py
from asyncTasks.celerys import app


# 示例任务
@app.task
def send_email(from_user, to_user, cc_user, subject, content):
    # 假设这里是发送邮件的逻辑
    print(f"Sending email from {from_user} to {to_user}, cc {cc_user}, subject {subject}, content {content}")


# 在 Django 的某个视图或其他地方调用任务
# from .tasks import sendemail
#
# sendemail.delay('from@example.com', ['to@example.com'], ['cc@example.com'], 'Subject', 'Email content')


# 定义一个异步任务

@app.task
def add(x, y):
    print(f"Executing task: add({x}, {y})")
    return x + y

应用模块初始化

在 Python 模块的初始化文件__init__.py,导入 Celery 应用实例,用于暴露特定的对象,以便在其他模块中可以使用这些对象。

# asyncTasks/init.py

from asyncTasks.celerys import app as celery_app

__all__ = ('celery_app',)

说明:__all__ 是一个特殊的变量,用于指定模块(文件)哪些内容暴露给外部使用,明确模块中的公共接口。

调用异步任务

创建一个专门的模块调用异步任务。

# asyncTasks/runtasks.py
import time
from asyncTasks.tasks import add


# 调度任务
result = add.delay(16, 15)

for i in range(10):
    if not result.ready():
        print("Task status:", result.status)
        time.sleep(10)

    result_value = result.get()
    print("Task status:", result.status)
    print("Task result:", result_value)
    time.sleep(1)

说明:

  • add.delay() 方法用于将任务异步地添加到 Celery 的任务队列中,并立即返回,不会阻塞当前线程。

启动 Celery Worker

启动 Celery Worker 来处理异步任务。

celery -A asyncTasks worker -P eventlet -l info

说明:

  • -A asyncTasks 指定 Celery 应用所在的模块。
  • worker 启动一个 Celery 工作进程,负责接收和执行任务。
  • -P eventlet: Windows 环境下需要指定 eventlet 进程池,否则,可能在 work 接收到任务时阻塞执行。参考文末[[异步任务和定时任务#^22bd29|问题1]]。
  • -l 指定日志级别。
  • 可以在启动 Celery worker 或 beat 时指定 -n 参数值,为当前工作进程指定一个唯一的名称:
celery -A <module_name> worker -l <log_level> -P <pool_type> -n <worker_name>
  • 也可以根据主机名动态生成工作进程的名称,例如 -n "worker1@%(hostname)s"

提示:Windows 下可以使用 taskkill 终止进程:taskkill /F /IM celery*

验证任务执行情况

运行 Python 程序 run_tasks.py

Python 程序终端打印如下:

D:\PycharmProjects\vote\venv\Scripts\python.exe D:\PycharmProjects\myproject\vote\asyncTasks\run_tasks.py 
Task status: PENDING
Task status: SUCCESS
Task result: 31
Task status: SUCCESS
Task result: 31
...

Celery Woker 日志打印:

(venv) PS D:\PycharmProjects\myproject\vote> celery -A asyncTasks worker -P eventlet -l info
...
[2024-06-19 20:05:15,980: INFO/MainProcess] pidbox: Connected to amqp://hellokitty:**@192.168.83.130:5672/myvhost.
[2024-06-19 20:05:15,983: INFO/MainProcess] celery@DESKTOP-TE8F48P ready.
[2024-06-19 20:05:15,984: INFO/MainProcess] Task asyncTasks.tasks.add[bd21b484-a492-42fe-9831-8f71303c9362] received
[2024-06-19 20:05:15,985: WARNING/MainProcess] Executing task: add(16, 15)
[2024-06-19 20:05:15,997: INFO/MainProcess] Task asyncTasks.tasks.add[bd21b484-a492-42fe-9831-8f71303c9362] succeeded in 0.015000000013969839s: 31       

定时任务

Celery Worker 负责从任务队列中提取异步任务并执行。这是 Celery 的核心功能。主要处理那些耗时操作或需要后台运行的任务。

而 Celery Beat 专注于定时任务的调度[^调度存储]和存储管理。它充当调度器的角色,根据预定义的周期或触发时间,将任务放到队列,供 Celery Worker 执行。

[^调度存储]:调度存储(Beat Schedule Storage)是 Celery Beat 用来存储定时任务调度信息的地方。这些信息包括定时任务的执行时间、参数等。通常可以使用文件系统或者数据库作为调度存储,但不推荐使用 RabbitMQ。对于 Celery Beat 的定时任务调度信息,推荐使用文件系统(如 SQLite 文件)或数据库(如 MySQL、PostgreSQL)来作为持久化存储。

配置 django-celery-beat

django-celery-beat 是一个用于与 Django 集成的 Celery 扩展包,它提供了管理 定时任务调度 的功能。通过它,你可以在 Django 的 Admin 界面中直观地添加、修改和删除定时任务,而无需直接修改代码。

安装:

pip install django-celery-beat

settings.py 文件中,添加配置:

INSTALLED_APPS = [
    # ... 其他已安装的应用
    'asyncTasks',
    'django_celery_beat',
]

创建 Celery 应用实例

定时任务实例 app_schedule

# asyncTasks/celery.py

# celerys.py

import os
from celery import Celery
from celery.schedules import crontab
from django.conf import settings

# 设置Django环境变量
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'vote.settings')

# 创建第一个Celery实例
app = Celery('tasks')

# 对于复杂的项目,推荐使用专用的配置模块
# app.configfromobject('celeryconfig')
# 从 Django settings.py 中加载配置
app.config_from_object('django.conf:settings', namespace='CELERY')

# 自动发现任务模块
# app.autodiscovertasks()
# 让Celery自动从项目中所有注册的应用里发现异步任务/定时任务
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)


@app.task(bind=True)
def debug_task(self):
    print(f'Request: {self.request!r}')


# 创建定时任务的Celery实例
app_schedule = Celery('app_schedule')

# 从 Django settings.py 中加载配置
app_schedule.config_from_object('django.conf:settings', namespace='CELERY')

app_schedule.autodiscover_tasks()

app_schedule.conf.update(
    # imports=['asyncTasks.tasks'],
    timezone=settings.TIME_ZONE,
    enable_utc=True,
    beat_schedule={  # 使用正确的配置项名称
        'task2': {
            'task': 'asyncTasks.tasks.scheduled_task',
            'schedule': crontab(minute='*/1'),  # 每1分钟执行一次
            'args': ('Argument for task2',)
        },
        'task3': {
            'task': 'asyncTasks.tasks.scheduled_task',
            'schedule': crontab(minute='0', hour='*/2'),  # 每2小时执行一次
            'args': ('Argument for task3',)
        },
        'task4': {
            'task': 'asyncTasks.tasks.scheduled_task',
            'schedule': crontab(minute='0', hour='12', day_of_week='mon'),  # 每周一中午12点执行
            'args': ('Argument for task4',)
        },
    },
)

应用模块初始化

# asyncTasks/init.py

from asyncTasks.celerys import app as celery_app, app_schedule

__all__ = ('celery_app', 'app_schedule',)

定时任务内容

from celery import shared_task
import logging

logger = logging.getLogger('django')


@shared_task
def scheduled_task(arg):
    # 任务实现逻辑
    logger.info(f'Scheduled task with arg: {arg}')
    print(f'Executing scheduled task with arg: {arg}')

说明:

  • @app.task 装饰器用于将一个函数注册为 Celery 任务。它需要绑定到具体的 Celery 实例上,通常用于在 Celery 实例的上下文中定义任务。任务与特定的 Celery 实例绑定,不太适合在多项目或共享任务的场景中使用。
  • @shared_task 装饰器用于将一个函数注册为 Celery 任务,但它不需要绑定到具体的 Celery 实例上。它通常用于 Django 项目中的应用模块,使任务更易于被多个 Celery 实例共享和发现。

运行定时任务

启动 Django 项目

python manage.py runserver

启动 Celery Worker

celery -A asyncTasks worker -P eventlet -l info

启动 Celery Beat

celery -A asyncTasks.celerys.app_schedule beat --loglevel=debug

说明:

  • asyncTasks.celerys.app_schedule:当有多个 Celery 实例时,启动 Celery Beat 需要指定对应的定时任务实例。

检查任务结果

Celery Woker 日志输出:

[2024-06-19 21:29:25,023: INFO/MainProcess] Task asyncTasks.tasks.add[b042c567-9642-4e46-89dc-722d9dc65864] received
[2024-06-19 21:29:25,025: WARNING/MainProcess] Executing task: add(16, 15)
[2024-06-19 21:29:25,036: INFO/MainProcess] Task asyncTasks.tasks.add[b042c567-9642-4e46-89dc-722d9dc65864] succeeded in 0.0s: 31
[2024-06-19 21:30:15,181: INFO/MainProcess] Task asyncTasks.tasks.scheduled_task[6c4e45c4-649d-4591-a9db-33d1f5b5568b] received
INFO Scheduled task with arg: Argument for task2
[2024-06-19 21:30:15,182: INFO/MainProcess] Scheduled task with arg: Argument for task2
[2024-06-19 21:30:15,182: WARNING/MainProcess] Executing scheduled task with arg: Argument for task2
[2024-06-19 21:30:15,195: INFO/MainProcess] Task asyncTasks.tasks.scheduled_task[6c4e45c4-649d-4591-a9db-33d1f5b5568b] succeeded in 0.01600000000325963s:
 None

Celery Beat 日志输出:

[2024-06-19 21:30:15,105: INFO/MainProcess] beat: Starting...
[2024-06-19 21:30:15,162: DEBUG/MainProcess] Current schedule:
<ScheduleEntry: task2 asyncTasks.tasks.scheduled_task('Argument for task2') <crontab: */1 * * * * (m/h/dM/MY/d)>
<ScheduleEntry: task3 asyncTasks.tasks.scheduled_task('Argument for task3') <crontab: 0 */2 * * * (m/h/dM/MY/d)>
<ScheduleEntry: task4 asyncTasks.tasks.scheduled_task('Argument for task4') <crontab: 0 12 * * mon (m/h/dM/MY/d)>
[2024-06-19 21:30:15,162: DEBUG/MainProcess] beat: Ticking with max interval->5.00 minutes
...
[2024-06-19 21:30:15,172: INFO/MainProcess] Scheduler: Sending due task task2 (asyncTasks.tasks.scheduled_task)
[2024-06-19 21:30:15,174: DEBUG/MainProcess] using channel_id: 1
[2024-06-19 21:30:15,175: DEBUG/MainProcess] Channel open
[2024-06-19 21:30:15,180: DEBUG/MainProcess] beat: Synchronizing schedule...
[2024-06-19 21:30:15,181: DEBUG/MainProcess] asyncTasks.tasks.scheduled_task sent. id->6c4e45c4-649d-4591-a9db-33d1f5b5568b
[2024-06-19 21:30:15,188: DEBUG/MainProcess] beat: Waking up in 44.80 seconds.
[2024-06-19 21:31:00,000: INFO/MainProcess] Scheduler: Sending due task task2 (asyncTasks.tasks.scheduled_task)
[2024-06-19 21:31:00,002: DEBUG/MainProcess] asyncTasks.tasks.scheduled_task sent. id->5d9c3da2-5f43-42dc-bccc-69be34e417a3

运行任务发现,定时任务的函数中的日志打印逻辑不会在 Python 运行的终端打印内容。- 这是因为在 Celery 的架构中,Celery 的任务是在独立的进程中执行的,任务函数的打印语句不会直接在 Python 运行终端中输出,不同于直接在 Python shell 或脚本中运行的函数。

如果想要查看定时任务内容内部的执行逻辑信息,可以使用日志记录器来捕获和管理。

Celery Flower 监控

Celery Flower 是一个 Web 监控工具,用于查看和管理 Celery 分布式任务队列。它提供了一个直观的界面来监控任务状态、队列和工作节点的实时信息。

安装 Flower

pip install flower

启动 Flower

celery --broker=amqp://hellokitty:123456@192.168.83.130:5672/myvhost flower

访问 Web 页面

访问 http://0.0.0.0:5555,检查任务执行结果如下:

Pasted image 20240619213253.png

附件

脚本 setup_rabbitmq.sh: ^ec4024

#!/bin/bash

# 创建用户
rabbitmqctl add_user hellokitty 123456

# 设置用户标签
rabbitmqctl set_user_tags hellokitty administrator

# 创建虚拟主机
rabbitmqctl add_vhost myvhost

# 分配权限
rabbitmqctl set_permissions -p myvhost hellokitty ".*" ".*" ".*"

# 验证结果
rabbitmqctl list_users
rabbitmqctl list_vhosts
rabbitmqctl list_permissions -p myvhost

参考

Celery 官方文档

RabbitMQ 官方教程

RabbitMQ 管理配置

rabbitmq.conf.示例

问题

问题1. Celery woker 程序在 Windows 环境下接受任务后执行阻塞,且进程难以 ctrl+c 停掉。

分析解决:Windows 下启动 Celery 程序需要指定使用 eventlet 进程池。 ^22bd29

  • 安装 eventlet
pip install eventlet
  • 启动 Celery 服务:
celery -A 应用任务 worker -P eventlet -c 2 -l info

参考:celery 在 windows 接收任务不执行

本文由作者按照 CC BY 4.0 进行授权。