从零开始的Linux运维屌丝之路,资源免费分享平台   运维人员首选:简单、易用、高效、安全、稳定、社区活跃的开源软件

Celery 分布式任务队列快速入门

发布:蔺要红04-04分类: Python

最简单的终端模式
"""
最简单的使用 终端模式
"""
from celery import Celery
import time
import subprocess

app = Celery('tasks',
             broker='redis://127.0.0.1:6379/0',
             backend='redis://127.0.0.1:6379/0')


@app.task
def add(x, y):
    print("running...", x, y)
    # print(name)
    time.sleep(1)
    return x + y


@app.task
def run_cmd(cmd):
    cmd_obj = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    return cmd_obj.stdout.read().decode('utf-8')  # 结果必须是 json 可格式化的数据


"""
最简单的使用
celery -A tasks worker -l info   启动一个worker用来执行任务 用来执行任务

# 进入 study 目录
from celery.result import AsyncResult
import tasks 
tasks.add(1,2)  # 测试终端执行
tasks.run_cmd.delay('df -h')    # 执行命令
tasks.add.delay(4, 4)           # 命令行手动执行一个任务
ret = add.delay(4, 4)
ret.get()                       #获取执行结果
ret.get(timeout=1)              # 获取执行结果加上超时时间
ret.ready()                     # 获取执行状态
ret.result  # 获取结果
ret.get(propagate=False)        # 获取结果(如果程序出错不会抛出异常,获取异常结果)
ret.traceback # 调试

#  AsyncResult  可以通过 task_id获取任务结果
ret = tasks.add.delay(2,2)
task_id =  ret.task_id  # 可以把task_id存起来 用作后续查询执行结果
>>> task_id
'929a4792-7ec0-4033-ab2c-1e95d4d4a70f'
>>> result = AsyncResult(id=task_id)
>>> result.get()


"""

定时任务
"""
celery定时任务
启动任务调度器 celery beat
celery -A periodic_task beat

启动 worker 执行器
celery -A periodic_task worker -l info

"""
from celery import Celery

# from celery.schedules import crontab
# from kombu import Exchange, Queue


app = Celery('tasks',
             broker='redis://127.0.0.1:6379/0',
             backend='redis://127.0.0.1:6379/0'
             )


@app.on_after_configure.connect
def periodic_tasks(sender, **kwargs):
    # Calls test('hello') every 10 seconds.
    sender.add_periodic_task(10.0, test.s('test-10'), name='test every 10')

    # Calls test('world') every 30 seconds
    sender.add_periodic_task(30.0, test.s('test-30'), name='test every 30')
    # sender.add_periodic_task(10.0, test.s('test-30'), expires=10, name='test every 30')

    # sender.add_periodic_task(
    #     crontab(hour='*', minute='*', day_of_week='sun'),
    #     test.s('Happy Mondays!'),
    # )


@app.task
def test(a):
    print(a)
    return a
 
项目使用
# 目录结构

project/
    celery.py
    tasks.py
 
# celery.py

from __future__ import absolute_import, unicode_literals
from celery import Celery

app = Celery('tasks',
             broker='redis://127.0.0.1:6379/0',
             backend='redis://127.0.0.1:6379/0',
             include=['project.tasks'])

# Optional configuration, see the application user guide.
# 结果过期时间
app.conf.update(
    result_expires=3600,
)

if __name__ == '__main__':
    app.start()
 
# tasks.py

from __future__ import absolute_import, unicode_literals
from .celery import app
import subprocess


@app.task
def add(x, y):
    return x + y + y


@app.task
def mul(x, y):
    return x * y


@app.task
def run_cmd(cmd):
    cmd_obj = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    return cmd_obj.stdout.read().decode('utf-8')  # 结果必须是 json 可格式化的数据


""" multi 毛 提
celery -A project worker -l info        # 前台测试运行worker
在 project的 同级路径下  启动 一个 work进程  后台进行
sudo celery multi start w1 -A project -l info   # 启动一个命令终端
sudo celery multi start w2 -A project -l info   # 启动一个命令终端
sudo celery multi stop w1  # 停止一个
 
# 进入终端执行一个任务 
>>> tasks.mul.delay(2,3)
<AsyncResult: 9ac09ad6-43d2-4aee-a715-d4e653b1d704>
>>> from celery.result import AsyncResult
>>> result = AsyncResult(id='9ac09ad6-43d2-4aee-a715-d4e653b1d704')
>>> result.get()


"""
 
温馨提示如有转载或引用以上内容之必要,敬请将本文链接作为出处标注,如有侵权我会在24小时之内删除!

欢迎使用手机扫描访问本站