celery
功能描述
它是一个简单、灵活、可靠的用于处理大量消息的分布式系统。
功能主要有三个:执行异步任务,执行延迟任务,执行定时任务。
举个例子,你现在有两个项目、一个项目用于爬取数据,一个项目用于分析数据,如何在数据爬取后将任务交给另一个项目进行分析呢?这种场景下就可以使用
celery
进行处理。
官网
英文文档
一个噩耗消息:
Celery is a project with minimal funding, so we don’t support Microsoft Windows. Please don’t open any issues related to that platform.Celery是一个资金较少的项目,因此我们不支持Microsoft Windows。请不要提出与该平台有关的任何问题。
尽管官方提示不支持
windows
,但是你仍然可以进行使用,这可能需要一些其他模块的辅助。
celery
是单独的服务,并不依赖于其他框架,就像
Django
一样你只要安装了它就可以通过自身命令启动服务。
架构介绍
celery
架构由三部分组成,分别是消息中间件
message br56coker
,任务执行单元
worker
与任务执行结果存储
task result store
,如下图所示:
celery
是一个独立运行的服务,内置
socket
,如果想使用它你需要做这几件事情:
- 安装celery环境框架,配置broker与backend,启动celery服务
- 添加任务到borker,worker就会自动的在后台执行任务
- 任务执行完成后,通过backend获取结果
基本使用
安装使用
安装模块,我装的旧版,新版
5.x
的有些摸不着头脑:
pip3 install celery==4.4.7
新建一个
python
包,任意名字。
project├── celery_task # celery包│ ├── __init__.py # 包文件│ ├── celery.py # celery连接和配置相关文件,且名字必须叫celery.py│ └── tasks.py # 所有任务函数├── add_task.py # 添加任务└── get_result.py # 获取结果
在
celery.py
中配置
borker
与
backend
:
from celery import Celerybroker = \"redis://127.0.0.1:6379/1\" # broker任务队列backend = \"redis://127.0.0.1:6379/2\" # 结构存储,执行完的结果存在这# 如果有密码:\"redis//:password@127.0.0.1:6379/2\"app = Celery(__name__, # 取名,随便取broker=broker,backend=backend,include=[\"celery_tasks.task\", # 第一个任务,必须是包名.文件名])
任务书写
在
tasks.py
中开始书写任务:
from .celery import app@app.task # 必须添加该装饰器def add(x,y):return x+y@app.taskdef sub(x,y):return x-y@app.taskdef multi(x,y):return x*y
任务执行
在
add_task.py
中开始执行任务,三个任务分别指定三种不同的执行状态:
# 导入定义好的任务from celery_task import tasks# 添加异步任务,返回结果。任务号t1_id = tasks.add.delay(10,20)# 配置延迟、定时任务的时区为本地,如果延迟任务不生效,则取消本地时区的设置(windows下失效)from celery_task.celery import app# 时区# app.conf.timezone = \'Asia/Shanghai\'# 是否使用UTC# app.conf.enable_utc = False# 添加延迟任务,返回结果。任务号from datetime import datetime,timedeltatime = datetime.utcnow() + timedelta(seconds=10) # 十秒后执行t2_id = tasks.sub.apply_async(args=(100,50),eta=time)# 添加定时任务,需要启动定时任务beat服务from celery.schedules import crontab # 如果要定义其他的周期日期,导入这个app.conf.beat_schedule = {\'multi-task\': {\'task\': \'celery_task.tasks.multi\',\'schedule\': timedelta(seconds=3),# \'schedule\': crontab(hour=8, day_of_week=1), # 每周一早八点\'args\': (20, 10),}}
获取结果
在
get_result.py
中书写获取结果的代码:
from celery_task.celery import appfrom celery.result import AsyncResultid = \'a9ffd16c-dbe0-44d2-9317-b198b432273c\' # 任务号if __name__ == \'__main__\':async = AsyncResult(id=id, app=app)if async.successful():result = async.get()print(result)elif async.failed():print(\'任务失败\')elif async.status == \'PENDING\':print(\'任务等待中被执行\')elif async.status == \'RETRY\':print(\'任务异常后正在重试\')elif async.status == \'STARTED\':print(\'任务已经开始被执行\')
启动服务
接下来启动服务,首先切换到该包的上级目录中:
# cd project# Linuxcelery worker -A 模块名 -l info# Windows需要先安564装eventlet模块pip install eventletcelery worker -A 包名 -l info -P eventlet# 如果是定时任务,还需要启动beat服务celery beat -A 包名 -l info
Django使用
基本使用
如果在
Django
中要使用
celery
,则需要将
celery
项目建立在
Django
项目的根目录下:
- DjangoProject01- celery_project- __init__.py- celery.py- django_app_name_task.py- app01- djangoproject01
同时,在任务中还需要导入
Django
环境,一般书写在
celery.py
文件中即可:
import osimport djangofrom celery import Celery# 由于celery是独立的项目,所以必须导入django环境os.environ.setdefault(\"DJANGO_SETTINGS_MODULE\", \"DjangoProject.settings\")django.setup()broker = \'redis://127.0.0.1:6379/1\' # broker任务队列backend = \'redis://127.0.0.1:6379/2\' # 结构存储,执行完的结果存在这app=Celery(__name__,broker=broker,backend=backend,include=[\'celery_project.app01_task\',])app.conf.timezone = \"Asia/Shanghai\"app.conf.enable_utc = Falsefrom datetime import timedeltafrom celery.schedules import crontabapp.conf.beat_schedule = {\'add-task\': {\'task\': \'celery_project.app01_task.task01\',\'schedule\': timad8edelta(hours=4),}}