精华内容
下载资源
问答
  • Celery 4.3.0 任务失败试机制

    千次阅读 2019-10-21 20:23:09
    在异步调用任务中经常需要调用第三方的api请求,如果一次执行失败,则应该进行试执行。否则,如果在执行一些连续性的chain链条任务,前面执行失败,那么后续的也就不用执行了。 下面来看看一个发送邮件失败,然后...

    存在的现象

    在异步调用任务中经常需要调用第三方的api请求,如果一次执行失败,则应该进行重试执行。否则,如果在执行一些连续性的chain链条任务,前面执行失败,那么后续的也就不用执行了。

    下面来看看一个发送邮件失败,然后重试执行的示例。

    Celery任务的文档结构

    该示例是延续上一篇Django 2.1.7 Celery 4.3.0 异步发送邮件示例的,如果不清楚如何发送邮件,可以先查阅一下我上一篇的内容。

    错误重试示例

    故意将邮件服务的地址配置错误

    为了做到错误的演示,我首先将发送邮件的smtp地址写错,如下:

    那么稍后执行发送邮件的时候,就一定会报找不到smtp的错误。

    编写错误重试的task任务

    # 定义任务函数
    @celery_app.task(bind=True)
    def send_register_active_email(self,to_email, username, token):
        '''发送激活邮件'''
        # 组织邮件信息
        subject = '欢迎信息'
        message = ''
        sender = settings.EMAIL_FROM
        receiver = [to_email]
        html_message = '<h1>%s, 欢迎您成为xxx注册会员</h1>请点击下面链接激活您的账户<br/><a href="http://127.0.0.1:8000/user/active/%s">http://127.0.0.1:8000/user/active/%s</a>' % (username, token, token)
    
        print("=========== 执行发送邮件 ===============")
    
        try:
            send_mail(subject, message, sender, receiver, html_message=html_message)
        except Exception as e:
            """
                    邮件发送失败,使用retry进行重试
    
                    retry的参数可以有:
                        exc:指定抛出的异常
                        throw:重试时是否通知worker是重试任务
                        eta:指定重试的时间/日期
                        countdown:在多久之后重试(每多少秒重试一次)
                        max_retries:最大重试次数
                    """
            raise self.retry(exc=e, countdown=3, max_retries=5)
    

    该示例任务将会进行调用发送邮件,当发生错误后,间隔3秒则重试执行一次,总共5次。
    可以通过print的打印信息来确认重试的次数。

    启动celery任务

    windows启动命令:

    celery -A celery_tasks worker -l info -P eventlet
    

    linux启动命令:

    celery -A celery_tasks worker -l info
    

    执行celery任务

    启动完毕celery之后,那么下面进行交互模式进行测试,执行如下:

    In [1]: from celery_tasks.tasks import send_register_active_email
    
    In [2]: to_email = 'lijw@******.cn'
    
    In [3]: token = '123456'
    
    In [4]: username = 'lijw'
    
    In [5]: send_register_active_email.delay(to_email,username,token)
    Out[5]: <AsyncResult: 02deebc0-5d64-43f9-9ad5-5ccfe260ec70>
    

    执行完毕任务后,查看celery的执行日志,如下:

    [2019-10-21 14:01:09,508: INFO/MainProcess] celery@junwei1 ready.
    [2019-10-21 14:01:09,509: INFO/MainProcess] pidbox: Connected to redis://127.0.0.1:6379/8.
    [2019-10-21 14:01:14,353: INFO/MainProcess] Received task: celery_tasks.tasks.send_register_active_email[7b921776-f6a6-4da0-9c71-b04aac0d139a]
    
    # 任务第一次执行,然后执行失败
    [2019-10-21 14:01:14,354: WARNING/MainProcess] =========== 执行发送邮件 ===============
    [2019-10-21 14:01:15,311: INFO/MainProcess] Received task: celery_tasks.tasks.send_register_active_email[7b921776-f6a6-4da0-9c71-b04aac0d139a]  ETA:[2019-10-21 06:01:18.21
    8857+00:00]
    [2019-10-21 14:01:15,345: INFO/MainProcess] Task celery_tasks.tasks.send_register_active_email[7b921776-f6a6-4da0-9c71-b04aac0d139a] retry: Retry in 3s: gaierror(11001, 'N
    o address found')
    
    # 当执行错误之后,下面则会重试执行5次任务,直到成功,或者失败
    [2019-10-21 14:01:18,224: WARNING/MainProcess] =========== 执行发送邮件 ===============
    [2019-10-21 14:01:18,253: INFO/MainProcess] Received task: celery_tasks.tasks.send_register_active_email[7b921776-f6a6-4da0-9c71-b04aac0d139a]  ETA:[2019-10-21 06:01:21.22
    5853+00:00]
    [2019-10-21 14:01:18,265: INFO/MainProcess] Task celery_tasks.tasks.send_register_active_email[7b921776-f6a6-4da0-9c71-b04aac0d139a] retry: Retry in 3s: gaierror(11001, 'N
    o address found')
    [2019-10-21 14:01:21,227: WARNING/MainProcess] =========== 执行发送邮件 ===============
    [2019-10-21 14:01:21,255: INFO/MainProcess] Received task: celery_tasks.tasks.send_register_active_email[7b921776-f6a6-4da0-9c71-b04aac0d139a]  ETA:[2019-10-21 06:01:24.22
    8790+00:00]
    [2019-10-21 14:01:21,264: INFO/MainProcess] Task celery_tasks.tasks.send_register_active_email[7b921776-f6a6-4da0-9c71-b04aac0d139a] retry: Retry in 3s: gaierror(11001, 'N
    o address found')
    [2019-10-21 14:01:24,243: WARNING/MainProcess] =========== 执行发送邮件 ===============
    [2019-10-21 14:01:24,280: INFO/MainProcess] Received task: celery_tasks.tasks.send_register_active_email[7b921776-f6a6-4da0-9c71-b04aac0d139a]  ETA:[2019-10-21 06:01:27.24
    4729+00:00]
    [2019-10-21 14:01:24,291: INFO/MainProcess] Task celery_tasks.tasks.send_register_active_email[7b921776-f6a6-4da0-9c71-b04aac0d139a] retry: Retry in 3s: gaierror(11001, 'N
    o address found')
    [2019-10-21 14:01:27,245: WARNING/MainProcess] =========== 执行发送邮件 ===============
    [2019-10-21 14:01:27,271: INFO/MainProcess] Received task: celery_tasks.tasks.send_register_active_email[7b921776-f6a6-4da0-9c71-b04aac0d139a]  ETA:[2019-10-21 06:01:30.24
    6720+00:00]
    [2019-10-21 14:01:27,281: INFO/MainProcess] Task celery_tasks.tasks.send_register_active_email[7b921776-f6a6-4da0-9c71-b04aac0d139a] retry: Retry in 3s: gaierror(11001, 'N
    o address found')
    [2019-10-21 14:01:30,261: WARNING/MainProcess] =========== 执行发送邮件 ===============
    [2019-10-21 14:01:30,279: ERROR/MainProcess] Task celery_tasks.tasks.send_register_active_email[7b921776-f6a6-4da0-9c71-b04aac0d139a] raised unexpected: gaierror(11001, 'N
    o address found')
    Traceback (most recent call last):
      ....
        raise socket.gaierror(socket.EAI_NONAME, 'No address found')
    socket.gaierror: [Errno 11001] No address found
    

    可以看到,上面的日志中的打印信息。
    第一次执行任务,则发送了一次报错。
    随后一直重试执行了5次都报错,说明重试的5次是从第一次执行失败后计算的。

    展开全文
  • celery任务失败

    千次阅读 2019-04-22 16:54:46
    throw:试时是否通知worker是任务 eta:指定试的时间/日期 countdown:在多久之后试(每多少秒试一次) max_retries:最大试次数 bing=True后,task对象会作为第一个参数自动传入,可以...

    方式一:

    示例:

    @app.task(bind=True)
    def send_twitter_status(self, oauth, tweet):
        try:
            twitter = Twitter(oauth)
            twitter.update_status(tweet)
        except (Twitter.FailWhaleError, Twitter.LoginError) as exc:
            raise self.retry(exc=exc)
    

    retry的参数可以有:

    • exc:指定抛出的异常
    • throw:重试时是否通知worker是重试任务
    • eta:指定重试的时间/日期
    • countdown:在多久之后重试(每多少秒重试一次)
    • max_retries:最大重试次数

    bing=True后,task对象会作为第一个参数自动传入,可以使用任务对象的属性。例如:
    self.request.retries:当前重试的次数
    self.request还有以下属性:

    namedesc
    idThe unique id of the executing task
    groupThe unique id a group, if this task is a member.
    chordThe unique id of the chord this task belongs to (if the task is part of the header).
    argsPositional arguments.
    kwargsKeyword arguments.
    retriesHow many times the current task has been retried. An integer starting at 0.
    is_eagerSet to True if the task is executed locally in the client, and not by a worker.
    etaThe original ETA of the task (if any). This is in UTC time (depending on the CELERY_ENABLE_UTC setting).
    expiresThe original expiry time of the task (if any). This is in UTC time (depending on the CELERY_ENABLE_UTC setting).
    logfileThe file the worker logs to. See Logging.
    loglevelThe current log level used.
    hostnameHostname of the worker instance executing the task.
    delivery_infoAdditional message delivery information. This is a mapping containing the exchange and routing key used to deliver this task. Used by e.g. retry() to resend the task to the same destination queue. Availability of keys in this dict depends on the message broker used.
    called_directlyThis flag is set to true if the task was not executed by the worker.
    callbacksA list of subtasks to be called if this task returns successfully.
    errbackA list of subtasks to be called if this task fails.
    utcSet to true the caller has utc enabled (CELERY_ENABLE_UTC).

    方式二:

    @app.task(autoretry_for=(ReadTimeout,), retry_kwargs={'max_retries': 3, 'countdown': 5})
    def test_func():
        viewutils.test_func()
    

    在autoretry_for中添加要自动重试的异常,如果所有异常都需要重试可以写Exception,retry_kwargs中添加重试的参数。

    方式三:

    重写Task类

    from celery import Task
    
    class DebugTask(Task):
        abstract = True
    
        def after_return(self, *args, **kwargs):
            print('Task returned: {0!r}'.format(self.request)
    
    
    @app.task(base=DebugTask)
    def add(x, y):
        return x + y
    

    Handlers

    1. after_return(self, status, retval, task_id, args, kwargs, einfo)
      Handler called after the task returns.

      参数:

      • status – Current task state.
      • retval – Task return value/exception.
      • task_id – Unique id of the task.
      • args – Original arguments for the task that returned.
      • kwargs – Original keyword arguments for the task that returned.
      • einfo – ExceptionInfo instance, containing the traceback (if any).

      The return value of this handler is ignored.

    2. on_failure(self, exc, task_id, args, kwargs, einfo)
      This is run by the worker when the task fails.

      参数:

      • exc – The exception raised by the task.
      • task_id – Unique id of the failed task.
      • args – Original arguments for the task that failed.
      • kwargs – Original keyword arguments for the task that failed.
      • einfo – ExceptionInfo instance, containing the traceback.

      The return value of this handler is ignored.

    3. on_retry(self, exc, task_id, args, kwargs, einfo)
      This is run by the worker when the task is to be retried.

      参数:

      • exc – The exception sent to retry().
      • task_id – Unique id of the retried task.
      • args – Original arguments for the retried task.
      • kwargs – Original keyword arguments for the retried task.
      • einfo – ExceptionInfo instance, containing the traceback.

      The return value of this handler is ignored.

    4. on_success(self, retval, task_id, args, kwargs)
      Run by the worker if the task executes successfully.

      参数:

      • retval – The return value of the task.
      • task_id – Unique id of the executed task.
      • args – Original arguments for the executed task.
      • kwargs – Original keyword arguments for the executed task.

      The return value of this handler is ignored.

    展开全文
  • azkaban失败任务批量

    千次阅读 2019-04-25 15:43:02
    id = b.id WHERE a.status = 70 and a.attempt=0 and a.start_time > 1551369601000 ORDER by b.name asc status =70 表示失败状态 attempt=0 表示试次数 start_time 查收时间段。1551369601000 为时间戳ms 可以在...

    其他接口可以参考官方文档 https://azkaban.readthedocs.io/en/latest/ajaxApi.html

    一、mydb查询失败数据(查出projectname和flowid)

    select b.name ,a.flow_id from execution_jobs a
    INNER JOIN projects b ON a.project_id = b.id
    WHERE a.status = 70
    and a.attempt=0
    and a.start_time > 1551369601000 ORDER by b.name asc
    

    status =70 表示失败状态
    attempt=0 表示重试次数
    start_time 查收时间段。1551369601000 为时间戳ms 可以在https://tool.lu/timestamp/ 进行转换

    在这里插入图片描述

    二、 拼接curl 命令

    2.1) 找到sessionid,sessionid这个id是用户登录azkaban的实时sessionid。

    2.2) 在Excel中拼接,(注意事项:下拉时候日期或者端口会递增,所有只能复制)

    curl -k --data "session.id=76ba04cb-c54a-4b43-b40c-590d1385c044&ajax=executeFlow&flowOverride[dt]=2019-03-01&project=azkabanProject&flow=azkabanFlows"  http://localhost:8088/executor?ajax=executeFlow
    

    在这里插入图片描述project 作业名称,sql查出来的name字段
    flow 流程名称,sql查出来的flow_id 字段

    在这里插入图片描述

    三、 登录azkaban服务器,直接执行所有拼接好的命令。

    执行及返回正确结果入下所示:

    [yuhui@hadoop11 ~]$ curl -k --data "session.id=76ba04cb-c54a-4b43-b40c-590d1385c044&ajax=executeFlow&flowOverride[dt]=2019-03-01&project=azkabanProject&flow=azkabanFlows"  http://localhost:8088/executor?ajax=executeFlow
    {
      "project" : "azkabanProject",
      "message" : "Execution submitted successfully with exec id 53890",
      "flow" : "azkabanFlows",
      "execid" : 53890
    }[yuhui@hadoop11 ~]
    

    之后去azkaban去查看运行的项目

    北京小辉微信公众号

    在这里插入图片描述

    大数据资料分享请关注
    展开全文
  • Azkaban任务失败试及试间隔命令

    千次阅读 2018-09-14 11:43:01
    Azkaban任务失败试及试间隔命令 在.job文件中,添加如下命令: retries=12 retry.backoff=300000

    Azkaban任务失败重试及重试间隔命令

    .job文件中,添加如下命令:

    retries=12
    retry.backoff=300000  #代表重试间隔时间
    展开全文
  • azkaban配置任务失败

    千次阅读 2019-11-18 17:47:53
    retries=1 ##失败的job的自动试的次数 retry.backoff=10000 ##试的间隔(毫秒) 创建spring.job type=command command=echo "spring start... now is : ${azkaban.flow.start.year}-${azkaban.flow.start.month...
  • 在项目时间目标已经确定的情况下如何采取行动来帮助时间目标达成?项目经理博客 1、项目成员加班赶工 毫无疑问,这是企业和项目经理采用的最普遍的方法。加班赶工能够争取时间,但同样危害多多。...
  • Airflow 跑dag中部分失败的任务

    千次阅读 2018-09-18 20:00:00
    跑dag中部分失败的任务 例如 dagA 中, T1 >> T2 >> T3 >> T4 >> T5 ,其中 T1 T2 成功, T3 失败, T4 T5因为依赖 T3,也不会运行。 跳过 T1 T2 跑 T3 T4 T5 的方法是: (1)点击 ...
  • jenkins任务失败重新构建插件Naginator Plugin jenkins任务经常会因为一些偶然因素失败,这时重新构建一次就肯能成功;jenkins的Naginator Plugin插件可以重新调度构建失败的任务;Retry Failed Builds Plugin也...
  • 由于学校网络改造,我们告别了客户端登录的锐捷,迎来了PPPoE拨号的深澜,本以为新的认证计费系统能带来更...打开“任务计划程序”(Win10直接左下角搜就行,Win7去开始菜单搜),左侧选择“任务计划程序库”,右侧...
  • 深度学习(1): 深度学习简介

    万次阅读 多人点赞 2019-08-09 11:10:29
    比较 GPU 和 CPU ,就是比较它们两者如何处理任务。如下图图 1-9 所示, CPU 使用几个核心处理单元去优化串行顺序任务,而 GPU 的大规模并行架构拥有数以千计的更小、更高效的处理单元,用于处理多个并行小任务。 ...
  • c#中任务Task

    万次阅读 2018-07-28 22:53:25
    1.通过任务开启线程 1.1 创建Task对象  首先创建任务对象,任务对象调用Start()方法开启任务线程。 class Program { static void DownLoad(object str) { Console.WriteLine("DownLoad Begin ID = &...
  • 前面的章节,用户通过绑定手机号的注册为会员,并可以补充完个人信息,比如姓名、生日等信息,拿到用户的生日信息之后,就可以通过会员生日信息进行营销,此处就涉及到定时任务执行营销信息推送的问题。本篇就带你...
  • 最近接手一个项目,要把其中的阻塞任务队列,构成非阻塞。在客户端很少有机会直接处理任务队列。项目完成需要总结经验。 阻塞的发生 我这里先说明我遇到的阻塞问题,我这里的阻塞不是多线程访问的阻塞,概念上...
  • &nbsp; &nbsp; &nbsp; &nbsp;在 Python 的 web 框架中,Flask 由于其轻量、易于扩展而得到了广泛的应用,本文主要基于 Flask 浅谈 Celery 的应用。...简而言之,Celery就是一个任务队列,专注...
  • 说明 ...本代码完成了行人检测和行人识别的工作,行人检测使用Retianet作为主干网络,损失函数使用Focal ...本代码修改了库的部分源码,实现了在摄像头中实时的响应行人识别的任务。 流程图如下: 目录说明 - d...
  • 2、打开“任务计划程序” 3、创建自动联网任务 ① 点击 创建基本任务 ② 输入 名称 然后点击 下一步 ; ③ 触发器 选择 当特定事件被记录时 ,然后点击 下一步 ; ④ 日志 选择 系统 ,源 输入 rasman ,事件 ID...
  • 多无人机(UAV)协同任务分配

    万次阅读 多人点赞 2019-03-22 13:33:53
    1.协同任务规划的功能与结构 多无人机协同任务规划即是根据一组特定条件的约束,以实现某个准则函数的最优或次优为目标,将某项作战任务分解成一些子任务并分配给多无人机系统中的各个无人机分别去完成的过程。 ...
  • 分布式任务调度

    万次阅读 2018-10-01 22:31:50
    什么是定时任务? 指定时间去执行任务 Java实现定时任务方式 1.Thread public class Demo01 { static long count = 0; public static void main(String[] args) { Runnable runnable = new Runnable() { ...
  • mapreduce中我们自定义的mapper和reducer程序在执行后有可能遇上出错退出的情况,mapreduce中jobtracker会全程追踪任务的执行情况,对于出错的任务mapreduce也定义了一套自己的处理方式。 首先要明确的是mapreduce...
  • 1,桌面运维(以windows为主,工资偏低,桌面运维经理可以达到8K到10K,很多人在公司里干的就是安装windows系统,windows里的QQ坏了装下,砸个水晶头诸如此类) 2,通讯运维(电话)(不推荐) 3,网络运维,...
  • 根据项目实际划分工作分解任务,可以通过excel进行分解任务, 下面是一个新房装修项目例子: 设置任务之间的依赖关系 1.理解任务之间的依赖关系图 2.在project中设置任务依赖关系 使用下图中的红色...
  • iOS中如何触发定时任务或延时任务? 定时任务指的是周期性的调用某个方法,实现任务的反复执行,例如倒计时功能的实现;延时任务指的是等待一定时间后再执行某个任务,例如页面的延时跳转等。iOS中控制任务的延时或...
  • 在FixedThreadPool中,有一个固定大小的池,如果当前需要执行的任务超过了池大小,那么多余出来的任务会进入等待状态,直到线程池中有空闲下来的线程才去执行这些多余的任务,而当执行的任务数量小于线程池大小,...
  • 【UCOSIII】UCOSIII系统内部任务

    千次阅读 2018-06-25 18:04:35
    空闲任务:UCOSIII创建的第一个任务,UCOSIII必须创建的任务,此任务有UCOSIII自动创建,不需要用户手动创建; 时钟节拍任务:此任务也是必须创建的任务; 统计任务:可选任务,用来统计CPU使用率和各个任务的堆栈...
  • 定时任务下的多线程任务

    千次阅读 2018-03-21 17:45:55
    我们看到了上面提到的一个在java并发中非常重要的一类算法 -- CAS: Compare And Set 比较并设置; 什么意思呢,我们以   boolean compareAndSet(expectedValue, updateValue); 方法为例来解释CAS的思想, 内存中...
  • 订单退款与退款失败任务重

    千次阅读 2017-05-03 20:14:18
    订单退款与退款失败任务重
  • r}'.format(task_id, exc)) # 任务成功时执行 def on_success(self, retval, task_id, args, kwargs): pass # 任务重试时执行 def on_retry(self, exc, task_id, args, kwargs, einfo): pass @task(base=MyTask) def...
  • FreeRTOS任务调度研究

    千次阅读 2017-08-17 22:51:32
    所以不涉及对FreeRTOS整体的介绍,而只是分析任务调度这一块的机制。对应的Demo参考自CORTEX_A9_Zynq_ZC702。 一、触发任务调度的两种机制 taskYIELD() 这种机制其实是通过ARM swi 异常触发task context switch...
  • 程序员的笔记本电脑

    万次阅读 多人点赞 2019-07-11 16:39:24
    电脑选择:写程序及运行程序,需要大内存,现在内存应该16G起步了,CPU越好,计算速度越快,线程数越多,多任务及多线程程序支持越好,固态硬盘,显卡,接口等也需注意 电脑配置看什么呢,这些配置自然是越高越好 ...
  • 今天使用Timer实现任务调度时,阿里巴巴Java开发规范提示 多线程并行处理定时任务时,Timer运行多个TimeTask时,只要其中之一没有捕获抛出的异常,其它任务便会自动终止运行,使用ScheduledExecutorService则没有这...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 478,544
精华内容 191,417
关键字:

任务比较重