Celery

Celery的安装

  • 安装
1
2
3
pip install celery==4.4.1
pip install celery-with-redis==3.0
pip install django-celery-results==1.2.1 #django-celery-results库基于 Django ORM实现了结果存储后端
  • 配置
1
2
3
4
5
6
7
8
9
10
11
12
INSTALLED_APPS = [
....
'celery',
'django_celery_results',
]
# celery任务配置
BROKER_URL = 'redis://localhost:6379/5'
CELERY_RESULT_BACKEND = 'django-db'
CELERY_TASK_SERIALIZER = 'json' # 任务序列化和反序列化使⽤json
CELERY_RESULT_SERIALIZER = 'json' # 结果序列化为json
CELERYD_MAX_TASKS_PER_CHILD = 1000 # 每个worker执⾏了多少任务就重启
CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 后端存储任务超过⼀天,则⾃动清理数据,单位为秒
  • 创建celery实例

在settings.py同级目录下新建celery.py文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from __future__ import absolute_import
from celery import Celery
from django.conf import settings
import os

# 设置系统的环境配置⽤的是Django的
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "工程名称.settings")

# 实例化celery
app = Celery('mycelery')

# 设置时区
app.conf.timezone = "Asia/Shanghai"

# 指定celery的配置来源 ⽤的是项⽬的配置⽂件settings.py
app.config_from_object("django.conf:settings")

# 让celery⾃动去发现我们的任务(task)
app.autodiscover_tasks() # 你需要在app⽬录下 新建⼀个叫tasks.py(⼀定不要写错)⽂件
  • 在settings.py同级⽬录下的init.py加⼊
1
2
from __future__ import absolute_import
from .celery import app as celery_app

Celery的使用

  • 创建任务

在需要使⽤异步任务的APP⽬录下新建tasks.py,使用 shared_task修饰器修饰函数,他就是celery任务:

1
2
3
4
5
6
7
8
9
from celery import shared_task
import time

@shared_task
def printHello():
n = 5
for i in range(n):
print("Hello World")
time.sleep(1)
  • 生成数据表

需要生成django_celery_results表来存放celery任务:

1
python manage.py migrate django_celery_results
  • 启动worker

注意:修改tasks.py的内容后 要重启celery的服务

1
celery -A ⼯程名 worker -l info
  • 出现错误

如果出现Task handler raised error: ValueError(‘not enough values to unpack (expected 3, got 0)’)报错,安装eventlet模块,启动时加-P eventlet:

1
2
pip install eventlet #安装eventlet模块
celery -A ⼯程名 worker -l info -P eventlet #加—P参数
  • 获取任务执⾏结果

异步任务执⾏完毕后,会⾃动触发信号:

1
2
3
4
5
6
7
8
9
10
11
12
13
from celery import shared_task
from celery.signals import task_success

@shared_task
def printHello(n):
result = 0
for i in range(0, n+1, 2):
result += i
return result

@task_success.connect(sender=printHello)
def Helloreturn(sender=None, result=None, **kwargs):
print("偶数和:{}".format(str(result)))
函数 作用
task_success 任务执行成功调用
task_failure 任务执行失败调用

celery定时任务

  • 启动worker
1
celery -A ⼯程名 worker -l info -P eventlet
  • 启动beat
1
celery -A ⼯程名 beat -l info
  • 添加定时任务
1
2
3
4
5
6
7
8
9
10
11
#setting.py
from celery.schedules import crontab

CELERYBEAT_SCHEDULE = {
"every-ten-second-run-my_task": {
"task": "App.tasks.do",
"schedule": crontab(minute="39", hour="14"), # 每天14:39执行
# "schedule": timedelta(seconds=10), # 每隔10秒执行一次
# "args": (2,)
}
}

Django日志

配置日志信息后Django发生错误会按照配置的格式输出,上线后发生error级错误会发邮件通知管理员。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# 日志配置

# smtp服务的邮箱服务器
EMAIL_HOST = 'smtp.qq.com'
# smtp服务固定的端口是25
EMAIL_PORT = 25

#发送邮件的邮箱
EMAIL_HOST_USER = '259380039@qq.com'
#在邮箱中设置的客户端授权密码
EMAIL_HOST_PASSWORD = 'smtp授权码'
#收件人看到的发件人 <此处要和发送邮件的邮箱相同>
EMAIL_FROM = 'Log<259380039@qq.com>'

ADMINS = (
('dir', '23223856@qq.com'), # 管理员邮箱
)
# 配置邮件
EMAIL_BACKEND = 'django.core.mail.backends.smtp.EmailBackend'
SERVER_EMAIL = EMAIL_HOST_USER


LOGGING = {
'version': 1,
'disable_existing_loggers': True,
'formatters': {
'standard': {
'format': '%(asctime)s [%(threadName)s:%(thread)d] [%(name)s:%(lineno)d] [%(module)s:%(funcName)s] [%(levelname)s]- %(message)s'}
},
'filters': { # 过滤条件
# 要求debug是False才记录
'require_debug_false': {
'()': 'django.utils.log.RequireDebugFalse',
}
},
'handlers': {
'null': {
'level': 'DEBUG',
'class': 'logging.NullHandler',
},
'mail_admins': { # 一旦线上代码报错 邮件提示
'level': 'ERROR',
'class': 'django.utils.log.AdminEmailHandler',
'filters': ['require_debug_false'],
},
'debug': {
'level': 'DEBUG',
'class': 'logging.handlers.RotatingFileHandler',
'filename': os.path.join(BASE_DIR, "log", 'debug.log'), # 文件路径,根目录创建log文件夹
'maxBytes': 1024 * 1024 * 5, # 5兆的数据
'backupCount': 5, # 允许有5这样的文件
'formatter': 'standard', # 格式
},
'console': {
'level': 'DEBUG',
'class': 'logging.StreamHandler',
'formatter': 'standard',
},
},
'loggers': {
'django': {
'handlers': ['console'],
'level': 'DEBUG',
'propagate': False
},
'django.request': {
'handlers': ['debug', 'mail_admins'],
'level': 'ERROR',
'propagate': True, # 是否继承父类的log信息
},
# 对于不在 ALLOWED_HOSTS 中的请求不发送报错邮件
'django.security.DisallowedHost': {
'handlers': ['null'],
'propagate': False,
},
}
}