什么是分布式任务队列

分布式任务队列可以分发任务到多线程,多进程,多机器上执行,例如:

  • 例如数据库中有100万条 tag 数据记录,按照时间排序,要统计每小时重复的 tag 数量,
    可以按照每天24小时,分解为24个任务,推入分布式任务队列,由24个 work 进程并行计算
  • 大量的图片资源需要压缩,可以通过分布式任务队列在集群的多台机器上分发调度任务,提高效率

Celery 特点

  • 分布式并发能力,可以运行在单台机器,多台机器机,或者跨数据中心的服务器上
  • 高可用性,在连接出错的时候 worker 和 client 会自动重试,broker 支持高可用(RabbitMP)
  • 速度快,官方公布,单一 celery 进程,每分钟可以处理百万级别的任务
  • 低延迟,使用 RabbitMQ 毫秒级别的延迟
  • 可扩展,Celery 中几乎每一个组件都可以水平扩展,定制性强

架构组成

  • 客户端,提交任务到消息队列
  • 消息队列(消息中间件 broker), 存储/分发任务给工作进程
  • 任务执行单元(worker 进程),执行分派的任务,并返回结果
  • 结果存储,保存在 RabbitMQ,Redis 等存储中

客户端

有多种语言客户端,也有与框架集成方案

消息队列

  • RabbitMQ 消息中间件,不同应用程序之间通过消息来集成
  • Redis 缓存,队列,发生故障时可能会丢失数据(低)

序列化方式

  • pickle, json, yaml, msgpack
  • zlib, bzip2 compression

安装

以下例子基于 Python3

安装 RabbitMQ, Redis

# Centos
yum install -y rabbitmq-server
yum install -y redis-server
# debian, ubuntu
apt-get install rabbitmq-server
apt-get install redis-server

安装 Celery

用标准的 Python 包管理工具 pip 安装

pip install celery

应用例子:简单计算

# tasks.py
from celery import Celery

# 参数一是模块名,broker 是消息中间件,backend 保存结果和消息状态
# app = Celery('tasks', broker='redis://localhost')
app = Celery('tasks', broker='amqp://localhost')

@app.task
def add(x, y):
    return x + y

运行 worker 服务

# 默认启动了24个进程
celery -A tasks worker --loglevel=info

任务脚本

# run_task.py
from tasks import add
print(add(100, 28))

# 异步执行
result = add.delay(20, 40)
print(result.ready())
print(result.get())

运行任务

python run_task.py

应用例子:定时任务,1分钟后执行计算任务

from tasks import add
from datetime import datetime, timedelta
import time


minute = datetime.utcnow() + timedelta(minutes=1)
result = add.apply_async((14, 18), eta=minute)

while not result.ready():
    print('not calc')
        time.sleep(15)

        print(result.get())

Celery 监控和管理

基本的命令行管理命令

# 查看 celery 状态
celery status

# **危险**, 从所有的任务队列删除消息
celery -A tasks purge

# 查看激活的任务
celery -A tasks inspect active

# 查看定时任务
celery -A tasks inspect scheduled

# 查看注册的任务
celery -A tasks inspect registered

# 查看任务统计
celery -A tasks inspect stats

web 实时图形监控,Flower

Flower 有丰富的监控展示

  • 任务的进度和历史
  • 显示任务的详情
  • 图形和统计
  • 查看 worker
  • 关闭 worker
  • 扩展worker等
# 安装
pip install flower

# 启动
celery -A proj flower --port=5555

监控图


right