2021/07/20 Celery介紹及使用場景
[TOC]
基本使用
一、架構圖

# 說明
1. Celery的模塊,可以分為beat, worker, broker, backend(可選) 四個部份
2. 首先,會先定義好worker需要執行的函數或腳本
3. 異步請求來源,可以手動調用或是配罝beat定時調用,將請求發到broker(RabbitMQ)
4. worker收到消息後,執行函數,最後將結果保存於backend(Redis)
二、單機使用
# tasks.py
#!/usr/bin/env python
import socket
from celery import Celery
app = Celery('demo',
             broker='amqp://admin:admin@192.168.1.15/',
             backend='redis://192.168.1.15')
@app.task()
def hi():
    hostname = socket.gethostname()
    msg = f'Hi, I am {hostname}'
    return msg
@app.on_after_configure.connect
def schedule(sender, **kwargs):
    sender.add_periodic_task(3, hi.s(), name='say hi')
# 啟動
>>> celery -A tasks worker -l INFO
 -------------- celery@svr012.viger.click v5.1.2 (sun-harmonics)
--- ***** ----- 
-- ******* ---- Linux-3.10.0-1160.11.1.el7.x86_64-x86_64-with-centos-7.9.2009-Core 2021-07-26 05:57:17
- *** --- * --- 
- ** ---------- [config]
- ** ---------- .> app:         demo:0x7f42dc217310
- ** ---------- .> transport:   amqp://admin:**@192.168.1.15:5672//
- ** ---------- .> results:     redis://192.168.1.15/
- *** --- * --- .> concurrency: 2 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ----- 
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery               
[tasks]
  . tasks.hi
[2021-07-26 05:57:17,382: INFO/MainProcess] Connected to amqp://admin:**@192.168.1.15:5672//
[2021-07-26 05:57:17,396: INFO/MainProcess] mingle: searching for neighbors
[2021-07-26 05:57:18,434: INFO/MainProcess] mingle: all alone
[2021-07-26 05:57:18,473: INFO/MainProcess] celery@svr012.viger.click ready.
# 直接調用
>>> from tasks import hi
>>> res = hi.delay()
>>> res.get()
'Hi, I am svr012.viger.click'
# 定時任務
>>> celery -A tasks beat -l INFO
celery beat v5.1.2 (sun-harmonics) is starting.
__    -    ... __   -        _
LocalTime -> 2021-07-26 06:22:16
Configuration ->
    . broker -> amqp://admin:**@192.168.1.15:5672//
    . loader -> celery.loaders.app.AppLoader
    . scheduler -> celery.beat.PersistentScheduler
    . db -> celerybeat-schedule
    . logfile -> [stderr]@%INFO
    . maxinterval -> 5.00 minutes (300s)
[2021-07-26 06:22:16,906: INFO/MainProcess] beat: Starting...
[2021-07-26 06:22:19,935: INFO/MainProcess] Scheduler: Sending due task say hi (tasks.hi)
[2021-07-26 06:22:22,909: INFO/MainProcess] Scheduler: Sending due task say hi (tasks.hi)
三、分佈式使用
# 跨服務器執行
>>> from celery import Celery
>>> app = Celery('demo',
...              broker='amqp://admin:admin@192.168.1.15/',
...              backend='redis://192.168.1.15')
>>> res = app.send_task('tasks.hi')
>>> res.get()
'Hi, I am svr012.viger.click'
# 定時任務
#!/usr/bin/env python
from celery import Celery
app = Celery('demo',
             broker='amqp://admin:admin@192.168.1.15/',
             backend='redis://192.168.1.15')
@app.on_after_configure.connect
def schedule(sender, **kwargs):
    sender.add_periodic_task(3, app.signature('tasks.hi'), name='say hi')
>> celery -A tasks beat -l INFO
搭配Django
一、架構圖

# 說明
1. 撰寫Django views實現具體功能
2. Worker創建request類調用views
3. HTTP請求可以增加異步調用的功能
4. 定時任務異常或是線上有狀況時,可以直接調用接口來執行任務
二、目錄設計
# 部份簡略
.
├── main                   # Django 主目錄文件
│   ├── celery.py          # Celery Beat定時任務
│   ├── config.yml
│   ├── __init__.py
│   ├── settings.py        # Django 配罝文件
│   ├── urls.py
│   ├── uwsgi.ini
│   └── wsgi.py
├── apps                   # Django 應用
│   └── common
│       ├── apps.py
│       ├── tasks
│       │   └── demo.py    # Celery Worker功能
│       ├── urls.py
│       └── views
│           └── demo.py    # Django 處理http請求的views
├── libs                   # 公共模塊
│   ├── base
│   ├── celery
│   │   └── scheduler.py
│   └── tool.py
├── logs
├── manage.py
├── celery.sh              # Celery 啟動腳本
└── uwsgi.sh               # uWSGI  啟動腳本
三、撰寫接口
# apps/common/views/demo.py
from main.settings import HOSTNAME
from libs.base.viewsets import BaseViewSet
from rest_framework.decorators import action
class DemoViewSet(BaseViewSet):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
    @action(methods=['GET'], detail=False)
    def hostname(self, request, *args, **kwargs):
        return self.success_info('', HOSTNAME)
四、撰寫worker任務
# apps/common/tasks/demo.py
from main.celery import app
@app.task(expires=60)
def hostname():
    from apps.common.views.demo import DemoViewSet
    # 生成Request类及request实例
    property = {
        'method': 'GET',
        'query_params': {},
        'data': {},
        'user': None,
    }
    request = type('Request', (object,), property)
    view = DemoViewSet()
    response = view.hostname(request)
    return response.data.get('result')
五、設罝beat定時任務
# main/celery.py
from libs.celery.scheduler import app
from apps.common.tasks import demo
# 定时任务
@app.on_after_configure.connect
def schedule(sender, **kwargs):
    add = sender.add_periodic_task
    # 獲取hostname
    add(3, demo.hostname.s(), name='get hostname')
六、實現異步請求+結果查詢
# apps/common/views/demo.py
import time
from rest_framework.decorators import action
from main.settings import logger, HOSTNAME
from libs.base.viewsets import BaseViewSet
from libs.tool import Param
class DemoViewSet(BaseViewSet):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
    @action(methods=['GET'], detail=False)
    def hostname(self, request, *args, **kwargs):
        # 參數校验
        param = Param(request.query_params)
        try:
            param.parse('sync', default=False, type=bool)
            param.parse('task_id', default='', type=str)
        except Exception:
            msg = f'参数 {param._miss_key} 缺失'
            logger.error(msg)
            return self.validation_error(msg)
        # 結果查詢
        if param.task_id:
            from main.celery import app
            from celery.result import AsyncResult
            res = AsyncResult(app=app, id=param.task_id)
            if not res.ready():
                msg = 'the task is not finished yet'
                return self.warning_error(msg)
            return self.success_info('', res.get())
        # 異步請求
        if param.sync:
            from apps.common.tasks.demo import hostname
            res = hostname.delay()
            return self.success_info('', res)
        time.sleep(10)
        return self.success_info('', HOSTNAME)
集群布署
一、五種場景

1.定時任務,一個beat發送,一個Worker執行
場景:監控告警,只能一個baet發送檢測請求,一個worker執行發送告警郵件,其它beat、worker不能執行
2.定時任務,每個beat發送,每個Worker執行
場景:服務器上日志文件切割,每台都必需執行
3.異步請求,一個請求,一個Worker執行
場景:一般使用,只做一次數據刷新
4.異步請求,一個請求,每個Worker執行
場景:文件或圖片上傳,每台服務器都需要在本地保存
5.跨平台的異步任務
場景:CMDB新增app,通知其它平台更新
二、RabbitMQ的機制
1. Direct

# 說明
1. Exchange 對接單一個 Queue
2. 生產者將消息發送給Exchange,Exchange轉發給Queue,再由消費者接收消息
3. 一筆消息,只會由一個消費者消費
2. Fanout

# 說明
1. Exchange 對接多個 Queue
2. 生產者將消息發送給Exchange,Exchange將消息發給所有綁定的Queue,再由消費者接收消息
3. 一個Queue,只能由一個消費者來消費
三、Exchange、Queue的規劃
# main/settings.py
CELERY_TASK_QUEUES = [
    # 做為最基本的使用,例如 celery_api的 Queue,實現一個請求,一個執行
    Queue(f'{PROJECT}'),
    # 基於服務器創建 Queue,例如 celery_api.ph190svr016044,實現定時任務在每台服務器執行(日志切割)
    Queue(f'{PROJECT}.{HOSTNAME}'),
    # 基於服務器創建 Queue,並綁定 fanout exchange,實現一個請求,多台服務器執行
    Queue(f'{PROJECT}.{HOSTNAME}', exchange=Exchange(f'{PROJECT}.fanout', type='fanout')),
    # 基於項目創建 Queue,綁定其它項目的fanout exchange,實現跨平台的消息通知
    Queue(f'{PROJECT}.sync', exchange=Exchange('cmdb.sync.fanout', type='fanout'), routing_key='cmdb'),
]
四、應用
1. 限制同個 Queue,只能執行一次
# libs/celery/scheduler.py
app = Celery(PROJECT)
# 省略 ...
class QueueLockScheduler(PersistentScheduler):
    default_queue = PROJECT
    initialize = False
# 省略 ...
app.conf.beat_scheduler = QueueLockScheduler
# 說明:
1. 繼承源碼 PersistentScheduler 創建 QueueLockScheduler類
2. beat成功發送消息後,會將下一次應該執行時間戳(next_time)記錄於lock Queue
3. 每次beat在發送消息前,會讀取lock Queue訊息,比較當前時間是不是的大於next_time,若不是則跳過不發送
2. Beat 指定 Queue 發送消息
# libs/celery/scheduler.py
@app.task(queue=f'{PROJECT}.{HOSTNAME}', expires=60)
def logrotate(target):
    tool.logrotate(target, time_format='%Y%m%d')
    return 'Success'
3. 針對 fanout exchange 發消息
# 異步任務,同步圖片
from main.celery import app
b64code = base64.b64encode(img).decode('utf8')
kwargs = {
    'name': 'apps.common.tasks.files.add_file',
    'kwargs': {
        'b64code': b64code,
        'path': ''
    },
    'exchange': Exchange(f'{PROJECT}.fanout', type='fanout'),
    'routing_key': PROJECT,
}
kwargs['kwargs']['path'] = app_static
app.send_task(**kwargs)