"""
最简单的使用 终端模式
"""
from celery import Celery
import time
import subprocess
app = Celery('tasks',
broker='redis://127.0.0.1:6379/0',
backend='redis://127.0.0.1:6379/0')
@app.task
def add(x, y):
print("running...", x, y)
# print(name)
time.sleep(1)
return x + y
@app.task
def run_cmd(cmd):
cmd_obj = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
return cmd_obj.stdout.read().decode('utf-8') # 结果必须是 json 可格式化的数据
"""
最简单的使用
celery -A tasks worker -l info 启动一个worker用来执行任务 用来执行任务
# 进入 study 目录
from celery.result import AsyncResult
import tasks
tasks.add(1,2) # 测试终端执行
tasks.run_cmd.delay('df -h') # 执行命令
tasks.add.delay(4, 4) # 命令行手动执行一个任务
ret = add.delay(4, 4)
ret.get() #获取执行结果
ret.get(timeout=1) # 获取执行结果加上超时时间
ret.ready() # 获取执行状态
ret.result # 获取结果
ret.get(propagate=False) # 获取结果(如果程序出错不会抛出异常,获取异常结果)
ret.traceback # 调试
# AsyncResult 可以通过 task_id获取任务结果
ret = tasks.add.delay(2,2)
task_id = ret.task_id # 可以把task_id存起来 用作后续查询执行结果
>>> task_id
'929a4792-7ec0-4033-ab2c-1e95d4d4a70f'
>>> result = AsyncResult(id=task_id)
>>> result.get()
"""
定时任务
"""
celery定时任务
启动任务调度器 celery beat
celery -A periodic_task beat
启动 worker 执行器
celery -A periodic_task worker -l info
"""
from celery import Celery
# from celery.schedules import crontab
# from kombu import Exchange, Queue
app = Celery('tasks',
broker='redis://127.0.0.1:6379/0',
backend='redis://127.0.0.1:6379/0'
)
@app.on_after_configure.connect
def periodic_tasks(sender, **kwargs):
# Calls test('hello') every 10 seconds.
sender.add_periodic_task(10.0, test.s('test-10'), name='test every 10')
# Calls test('world') every 30 seconds
sender.add_periodic_task(30.0, test.s('test-30'), name='test every 30')
# sender.add_periodic_task(10.0, test.s('test-30'), expires=10, name='test every 30')
# sender.add_periodic_task(
# crontab(hour='*', minute='*', day_of_week='sun'),
# test.s('Happy Mondays!'),
# )
@app.task
def test(a):
print(a)
return a
项目使用
# 目录结构
project/
celery.py
tasks.py
# celery.py
from __future__ import absolute_import, unicode_literals
from celery import Celery
app = Celery('tasks',
broker='redis://127.0.0.1:6379/0',
backend='redis://127.0.0.1:6379/0',
include=['project.tasks'])
# Optional configuration, see the application user guide.
# 结果过期时间
app.conf.update(
result_expires=3600,
)
if __name__ == '__main__':
app.start()
# tasks.py
from __future__ import absolute_import, unicode_literals
from .celery import app
import subprocess
@app.task
def add(x, y):
return x + y + y
@app.task
def mul(x, y):
return x * y
@app.task
def run_cmd(cmd):
cmd_obj = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
return cmd_obj.stdout.read().decode('utf-8') # 结果必须是 json 可格式化的数据
""" multi 毛 提
celery -A project worker -l info # 前台测试运行worker
在 project的 同级路径下 启动 一个 work进程 后台进行
sudo celery multi start w1 -A project -l info # 启动一个命令终端
sudo celery multi start w2 -A project -l info # 启动一个命令终端
sudo celery multi stop w1 # 停止一个
# 进入终端执行一个任务
>>> tasks.mul.delay(2,3)
<AsyncResult: 9ac09ad6-43d2-4aee-a715-d4e653b1d704>
>>> from celery.result import AsyncResult
>>> result = AsyncResult(id='9ac09ad6-43d2-4aee-a715-d4e653b1d704')
>>> result.get()
"""