精华内容
下载资源
问答
  • 在真正动手写RTOS之前,...轮询系统即是在裸机编程的时候,先初始化好相关的硬件,然后让主程序在一个死循环里面不断循环,顺序完成各种事情,大概的伪代码具体见代码清单 4-1。轮询系统是一种非常简单的软件结构,...

    在真正动手写RTOS之前,我们先来讲解下单片机编程中的裸机系统和多任务系统的区别。

    4.1 裸机系统

    裸机系统通常分为轮询系统和前后台系统,有关这两者的具体实现方式请看下面的讲解。

    4.1.1 轮询系统

    轮询系统即是在裸机编程的时候,先初始化好相关的硬件,然后让主程序在一个死循环里面不断循环,顺序完成各种事情,大概的伪代码具体见代码清单 4-1。轮询系统是一种非常简单的软件结构,通常只适用于那些只需要顺序执行代码且不需要外部事件来驱动就能完成的事情。在代码清单4-1中,如果只是实现LED翻转,串口输出,液晶显示等这些操作,那么适用轮询系统将会非常完美。但是,如果加入按键操作等需要检测外部信号的事件,用来模拟紧急报警,那么整个系统的实时响应能力就不会那么好了。假设DoSomething3是按键扫描,当外部按键被按下,相应于一个报警,这个时候,需要立马响应,并做紧急处理,而这个时候程序刚好执行到DoSomething1,要命的是DoSomething1需要执行的时间比较久,久到按键释放之后都没有执行完毕,那么当执行到DoSomething3的时候就会丢失掉一次事件。足见,轮询系统只适合顺序执行的功能代码,当有外部事件驱动时,实时性就会降低。

     

    代码清单 4-1 轮询系统伪代码

    int main(void)
    {
        /* 硬件相关初始化 */
        HardWareInit();
    
        /* 无限循环 */
        for(;;) {
            /* 处理事情1 */
            DoSomethin1();
    
            /* 处理事情2 */
            DoSomethin2();
    
            /* 处理事情3 */
            DoSomethin3();
        }
    }
    轮询系统伪代码

     

    4.1.2 前后台系统

    相比轮训系统,前后台系统是在轮系系统的基础之上加入了中断。外部事件的响应在中断里面完成,事件的处理还是回轮训系统中完成,中断在这里我们称为前台,main函数里面的无限循环我们称为后台,大概的伪代码见代码清单4-2。

     

    代码清单4-2 前后台系统伪代码

    int flag1 = 0;
    int flag2 = 0;
    int flag3 = 0;
    
    int main(void)
    {
        /* 硬件相关初始化 */
        HardWareInit();
    
        /* 无限循环 */
        for(;;) {
            if(flag1) {
                /* 处理事情1 */
                DoSomethin1();
            }
            
            if(flag2) {
                /* 处理事情2 */
                DoSomethin2();
            }
    
            if(flag3) {
                /* 处理事情3 */
                DoSomethin3();
            }
        }
    }
    
    void ISR1(void)
    {
        /* 置位标志位 */
        flag1 = 1;
    
        /* 如果事件处理时间很短,则在中断里面处理
           如果事件处理事件较长,则回到后台处理   */
        DoSomethin1();
    }
    
    void ISR2(void)
    {
        /* 置位标志位 */
        flag2 = 1;
    
        /* 如果事件处理时间很短,则在中断里面处理
           如果事件处理事件较长,则回到后台处理   */
        DoSomethin2();
    }
    
    void ISR3(void)
    {
        /* 置位标志位 */
        flag3 = 1;
    
        /* 如果事件处理时间很短,则在中断里面处理
           如果事件处理事件较长,则回到后台处理   */
        DoSomethin3();
    }
    前后台系统伪代码

     

    在顺序执行后台程序的时候,如果有中断来临,那么中断会打断后台程序的正常执行流,转而去执行中断服务程序,在中断服务程序里面标记事件,如果事件要处理的事情很简短,则可在中断服务程序里面处理,如果事件要处理的事情比较多,则返回到后台程序里面处理。虽然事件的响应和处理是分开了,但是事件的处理还是在后台里面顺序执行的,但相比轮序系统,前后台系统确保了事件不会丢失,再加上中断具有可嵌套的功能,这可以大大的提高程序的实时响应能力。在大多数中小项目中,前后台系统运用的好,堪称有操作系统的效果。

     

    4.2 多任务系统

    相比前后台系统,多任务系统的事件响应也是在中断中完成的,但是事件的处理是在任务中完成的。在多任务系统中,任务跟中断一样,也具有优先级,优先级高的任务会被优先执行。当一个紧急的事件在中断被标记后,如果事件对应的任务的优先级足够高,就会立马得到响应。相比前后台系统,多任务系统的实时性又被提高了。多任务系统大概的伪代码具体见代码清单4-3

     

    代码清单4-3 多任务系统伪代码  

    int flag1 = 0;
    int flag2 = 0;
    int flag3 = 0;
    
    int main(void)
    {
        /* 硬件相关初始化 */
        HardWareInit();
    
        /* OS 初始化 */
        RTOSInit();
    
        /* OS 启动,开始多任务调度,不再返回 */
        RTOSStart();
    
    }
    
    void ISR1(void)
    {
        /* 置位标志位 */
        flag1 = 1;
    }
    
    void ISR2(void)
    {
        /* 置位标志位 */
        flag2 = 1;
    }
    
    void ISR3(void)
    {
        /* 置位标志位 */
        flag3 = 1;
    }
    
    void DoSomethin1(void)
    {
        /* 无限循环,不能返回 */
        for(;;) {
            /* 任务实体 */
            if(flag1) {
    
            }
        }
    }
    
    void DoSomethin2(void)
    {
        /* 无限循环,不能返回 */
        for(;;) {
            /* 任务实体 */
            if(flag2) {
    
            }
        }
    }
    
    void DoSomethin3(void)
    {
        /* 无限循环,不能返回 */
        for(;;) {
            /* 任务实体 */
            if(flag3) {
    
            }
        }
    }
    多任务系统伪代码

     

    相比前后台系统中后台顺序执行的程序主体,在多任务系统中,根据程序的功能,我们把这个程序的主体分割成一个个独立的,无限循环且不能返回的小程序,这个小程序我们称之为任务。每个任务都是独立的,互不干扰的,且具备自身的优先级,它由操作系统调度管理。加入操作系统后,我们在编程的时候,不需要精心地去设计程序执行流,不用担心每个功能模块之间是否存在干扰。加入了操作系统,我们的编程反而变得简单了。整个系统随之带来的额外开销就是操作系统占据的那一丁点的FLASH和RAM。现如今,单片机的FLASH和RAM是越来越大,完全足以抵挡RTOS那点开销。

    无论是裸机系统中的轮询系统、前后台系统和多任务系统,我们不能一锤子敲定孰优孰劣,它们是不同时代的产物,在各自的领域都还有相当大的应用价值,只有适合才是最好。有关这三者的软件模型区别具体见表格4-1。

     

    表格4-1 轮询、前后台、多任务系统软件模型区别

    模型 事件响应 事件处理 特点
    轮询系统 主程序 主程序 轮询响应事件,轮询处理事件
    前后台系统 中断 主程序 实时响应事件,轮询处理事件
    多任务系统 中断 任务 实时响应事件,实时处理事件

    转载于:https://www.cnblogs.com/doitjust/p/10910181.html

    展开全文
  • 在真正开始动手写 RTOS 之前,我们先来讲解下单片机编程中的... 轮询系统即是在裸机编程的时候, 先初始化好相关的硬件,然后让主程序在一个死循环里面不断循环, 顺序地做各种事情,大概的伪代码具体见代码清单 4...

         在真正开始动手写 RTOS 之前,我们先来讲解下单片机编程中的裸机系统和多任务系统的区别。


    4.1 裸机系统


         裸机系统通常分成轮询系统前后台系统,有关这两者的具体实现方式请看下面的讲解。


    4.1.1 轮询系统


         轮询系统即是在裸机编程的时候, 先初始化好相关的硬件,然后让主程序在一个死循环里面不断循环, 顺序地做各种事情,大概的伪代码具体见代码清单 4-1。轮询系统是一种非常简单的软件结构,通常只适用于那些只需要顺序执行代码且不需要外部事件来驱动的就能完成的事情。在代码清单 4-1 中,如果只是实现 LED 翻转,串口输出,液晶显示等这些操作,那么使用轮询系统将会非常完美。但是,如果加入了按键操作等需要检测外部信号的事件,用来模拟紧急报警,那么整个系统的实时响应能力就不会那么好了。假设DoSomething3 是按键扫描,当外部按键被按下,相当于一个警报,这个时候,需要立马响应,并做紧急处理,而这个时候程序刚好执行到 DoSomething1,要命的是 DoSomething1需要执行的时间比较久,久到按键释放之后都没有执行完毕,那么当执行到 DoSomething3的时候就会丢失掉一次事件。足见,轮询系统只适合顺序执行的功能代码,当有外部事件驱动时,实时性就会降低。

    int main(void)
     {
     /* 硬件相关初始化 */
     HardWareInit();
    
     /* 无限循环 */
     for (;;) {
     /* 处理事情 1 */
     DoSomethin1();
    
     /* 处理事情 2 */
     DoSomething2();
    
     /* 处理事情 3 */
     DoSomething3();
     }
     }

    4.1.2 前后台系统:


          相比轮询系统,前后台系统是在轮询系统的基础上加入了中断。外部事件的响应在中断里面完成, 事件的处理还是回到轮询系统中完成,中断在这里我们称为前台main 函数里面的无限循环我们称为后台,大概的伪代码见代码清单 4-2。

    int flag1 = 0;
    int flag2 = 0;
    int flag3 = 0;
    int main(void)
    {
    /* 硬件相关初始化 */
    HardWareInit();
    /* 无限循环 */
    for (;;) {
    if (flag1) {
    /* 处理事情 1 */
    DoSomethin1();
    }
    if (flag2) {
    /* 处理事情 2 */
    DoSomething2();
    }
    if (flag3) {
    /* 处理事情 3 */
    DoSomething3();
    }
    }
    }
    void ISR1(void)
    {
    /* 置位标志位 */
    flag1 = 1;
    /* 如果事件处理时间很短,则在中断里面处理
    如果事件处理时间比较长,在回到前台处理 */
    DoSomethin1();
    }
    void ISR1(void)
    {
    /* 置位标志位 */
    flag1 = 2;
    /* 如果事件处理时间很短,则在中断里面处理
    如果事件处理时间比较长,在回到前台处理 */
    DoSomethin2();
    }
    void ISR3(void)
    {
    /* 置位标志位 */
    flag3 = 1;
    /* 如果事件处理时间很短,则在中断里面处理
    如果事件处理时间比较长,在回到前台处理 */
    DoSomethin3();
    }

          在顺序执行后台程序的时候,如果有中断来临,那么中断会打断后台程序的正常执行流,转而去执行中断服务程序,在中断服务程序里面标记事件,如果事件要处理的事情很简短,则可在中断服务程序里面处理如果事件要处理的事情比较多,则返回到后台程序里面处理。虽然事件的响应和处理是分开了,但是事件的处理还是在后台里面顺序执行的,但相比轮询系统,前后台系统确保了事件不会丢失,再加上中断具有可嵌套的功能,这可以大大的提高程序的实时响应能力。 在大多数的中小型项目中,前后台系统运用的好,堪称有操作系统的效果。


    4.2 多任务系统:


          相比前后台系统,多任务系统的事件响应也是在中断中完成的,但是事件的处理是在任务中完成的。在多任务系统中,任务跟中断一样,也具有优先级,优先级高的任务会被优先执行当一个紧急的事件在中断被标记之后,如果事件对应的任务的优先级足够高,就会立马得到响应。相比前后台系统,多任务系统的实时性又被提高了。 多任务系统大概的伪代码具体见代码清单 4-3。

    int flag1 = 0;
    int flag2 = 0;
    int flag3 = 0;
    int main(void)
    {
    /* 硬件相关初始化 */
    HardWareInit();
    /* OS 初始化 */
    RTOSInit();
    /* OS 启动,开始多任务
    RTOSStart();
    }
    void ISR1(void)
    {
    /* 置位标志位 */
    flag1 = 1;
    }
    void ISR1(void)
    {
    /* 置位标志位 */
    flag1 = 2;
    }
    void ISR3(void)
    {
    /* 置位标志位 */
    flag3 = 1;
    }
    void DoSomethin1(void)
    {
    /* 无限循环,不能返回 */
    for (;;) {
    /* 任务实体 */
    if (flag1) {
    }
    }
    }
    void DoSomethin2(void)
    {
    /* 无限循环,不能返回 */
    for (;;) {
    /* 任务实体 */
    if (flag2) {
    }
    }
    }
    void DoSomethin3(void)
    {
    /* 无限循环,不能返回 */
    for (;;) {
    /* 任务实体 */
    if (flag3) {
    }
    }
    }

           相比前后台系统中后台顺序执行的程序主体,在多任务系统中,根据程序的功能,我们把这个程序主体分割成一个个独立的,无限循环且不能返回的小程序,这个小程序我们称之为任务。 每个任务都是独立的,互不干扰的,且具备自身的优先级,它由操作系统调度管理。 加入操作系统后, 我们在编程的时候不需要精心地去设计程序的执行流,不用担心每个功能模块之间是否存在干扰。加入了操作系统,我们的编程反而变得简单了。整个系统随之带来的额外开销就是操作系统占据的那一丁点的 FLASH 和 RAM。 现如今,单片机的FLASH和 RAM是越来越大,完全足以抵挡 RTOS 那点开销。
           无论是裸机系统中的轮询系统、 前后台系统和多任务系统,我们不能一锤子的敲定孰优孰劣,它们是不同时代的产物,在各自的领域都还有相当大的应用价值,只有合适才是最好。有关这三者的软件模型区别具体见表格 4-1。

     

    展开全文
  • 上一小节已经详细讲解过while循环的结构和使用方法,本节主要讲述本人在具体应用中相关方法和小技巧 1.模拟控制台输入账户密码登陆,并将登陆限制最大错误次数限制为3次,超过3次后提示用户,错误次数超过上限,账户...

    上一小节已经详细讲解过while循环的结构和使用方法,本节主要讲述本人在具体应用中相关方法和小技巧
    1.模拟控制台输入账户密码登陆,并将登陆限制最大错误次数限制为3次,超过3次后提示用户,错误次数超过上限,账户冻结
    在这里插入图片描述
    在这里插入图片描述

    2.用户输入行数和列数
    在控制台输入对应行数的表格(因为字体原因和系统问题,图形不规范还请见谅)
    ┏┳┳┓
    ┣╋╋┫(——我们需要明白的是,画表格计算和循环的部分应该为中间部分,而不是将头和尾计算在内——)
    ┣╋╋┫
    ┗┻┻┛
    在这里插入图片描述
    在这里插入图片描述

    3.猜数字:计算机随机生成一个100以内的整数,循环接收用户输入猜测的数据,每次猜完给出提示:“猜大了”“猜小了”“猜对了”,只有猜对时循环结束,否则一直让用户输入猜测数据并给出提示
    (提示:Math.random();//返回一个0到1之间包含0不包含1的随机小数
    (int)(Math.random()*100)+1;//生成1到100之间的随机整数)
    在这里插入图片描述

    补充:我们学过while循环后通常会写一个简单的购物系统来应用所学知识,下面我将介绍其中的重难点部分的书写,并在最后上传完整代码,希望可以帮助到各位小伙伴,我在网上找到很多购物系统,大多数都是很复杂,希望这个简单的可以各位小伙伴思路。有什么问题欢迎留言和猪猪一起进步。
    上面介绍过判断输入的次数限制案例,那么当写成购物系统时候应用是怎么应用的呢?
    在这里插入图片描述
    在这里插入图片描述
    这部分为登陆部分,下面介绍选取商品和结账

    在这里插入图片描述

    在这里插入图片描述下面展示全部代码(代码有一些bug以及未实现输入不是列表商品就跳回重新输入的功能):

    在这里插入图片描述

    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述

    展开全文
  • 本期内容 本讲讲解sparkStreaming的driver部分的数据的接受和管理的部分,即receiverTracker,包括: 1.receiverTracker的架构设计 2.消息循环系统 3.receiverTracker的具体实现。

    本期内容
    本讲讲解sparkStreaming的driver部分的数据的接受和管理的部分,即receiverTracker,包括:
    1.receiverTracker的架构设计
    2.消息循环系统
    3.receiverTracker的具体实现

    通过前面的课程,我们知道:
    receiverTracker以driver中具体自己的算法在具体的executor上启动receiver。
    启动的方式是:把每个receiver封装成1个task,该tasks是job中唯一的task,也就是说,
    有多少receiver就会分发多少个Job,每个Job只有一个task,该task内部只有1条数据即该receiver的数据。
    receiverTracker在启动receiver的时候,有个receiverSupervisorImpl,它在启动的时候反过来帮我们启动receiver,receiver不断地接受数据,转过来通过blockGenerator把自己接受的数据封装成1个1个的block(背后有定时器),不断地把数据存储,有2中存储方式:第一种直接通过blockmanger存储,第二种先写WAL;存储过之后receiverSupervisorImpl会把存储的数据的元数据汇报给receiverTracker,(实际上汇报给的是receiverTracker的rpc通讯实体。),汇报的消息包括具体的数据的id,具体位置,多少条,大小等等。receiverTracker接受到这些数据后,转过来再进行一下步的数据管理工作。
    本节讲解receiverTracker接受到这些数据后,会怎样具体进行一下步的处理。

    ReceiverSupervisorImpl通过receivedBlockHandler来写数据,根据是否要写wal,receivedBlockHandler的具体实现有2种方式:

      private val receivedBlockHandler: ReceivedBlockHandler = {
        if (WriteAheadLogUtils.enableReceiverLog(env.conf)) {
          if (checkpointDirOption.isEmpty) {
            throw new SparkException(
              "Cannot enable receiver write-ahead log without checkpoint directory set. " +
                "Please use streamingContext.checkpoint() to set the checkpoint directory. " +
                "See documentation for more details.")
          }
          //使用了wal时的实现
          new WriteAheadLogBasedBlockHandler(env.blockManager, receiver.streamId,
            receiver.storageLevel, env.conf, hadoopConf, checkpointDirOption.get)
        } else {
         //没使用wal时的实现
          new BlockManagerBasedBlockHandler(env.blockManager, receiver.storageLevel)
        }
      }

    接受到数据且封装成block后的处理:

      /** Store an ArrayBuffer of received data as a data block into Spark's memory. */
      def pushArrayBuffer(
          arrayBuffer: ArrayBuffer[_],
          metadataOption: Option[Any],
          blockIdOption: Option[StreamBlockId]
        ) {
        //通过pushAndReportBlock()来存储数据且把元数据汇报给driver:
        pushAndReportBlock(ArrayBufferBlock(arrayBuffer), metadataOption, blockIdOption)
      }
    
      /** Store a iterator of received data as a data block into Spark's memory. */
      def pushIterator(
          iterator: Iterator[_],
          metadataOption: Option[Any],
          blockIdOption: Option[StreamBlockId]
        ) {
         //通过pushAndReportBlock()来存储数据且把元数据汇报给driver:
        pushAndReportBlock(IteratorBlock(iterator), metadataOption, blockIdOption)
      }
    
      /** Store the bytes of received data as a data block into Spark's memory. */
      def pushBytes(
          bytes: ByteBuffer,
          metadataOption: Option[Any],
          blockIdOption: Option[StreamBlockId]
        ) {
         //通过pushAndReportBlock()来存储数据且把元数据汇报给driver:
        pushAndReportBlock(ByteBufferBlock(bytes), metadataOption, blockIdOption)
      }

    通过pushAndReportBlock()来存储数据且把元数据汇报给driver:

      /** Store block and report it to driver */
      def pushAndReportBlock(
          receivedBlock: ReceivedBlock,
          metadataOption: Option[Any],
          blockIdOption: Option[StreamBlockId]
        ) {
        val blockId = blockIdOption.getOrElse(nextBlockId)
        val time = System.currentTimeMillis
        val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock)
        logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms")
        val numRecords = blockStoreResult.numRecords
        //把存储的block信息封装起来。
        val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult)
        //发消息给ReceiverTracker,askWithRetry确保发送成功
        trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo))
        logDebug(s"Reported block $blockId")
      }

    其中用到了case class ReceivedBlockInfo()把欲存储的block信息封装起来。

    /** Information about blocks received by the receiver */
    private[streaming] case class ReceivedBlockInfo(
        streamId: Int,
        numRecords: Option[Long],
        metadataOption: Option[Any],
        blockStoreResult: ReceivedBlockStoreResult
      ) {
    

    ReceivedBlockStoreResult封装了欲存储的block的元数据信息:

    /** Trait that represents the metadata related to storage of blocks */
    private[streaming] trait ReceivedBlockStoreResult {
      // Any implementation of this trait will store a block id
      def blockId: StreamBlockId
      // Any implementation of this trait will have to return the number of records
      //题外话:从这里得到一个启示,我们谈spark处理的数据量大小,专业的说法是处理多少条数据,而不是处理的数据的大小(因为每条数据可能有多个字段,当处理音频视频文件时数据量一般都很大)。
      def numRecords: Option[Long]
    }

    我们说ReceiverSupervisorImpl发送的消息是发送给ReceiverTracker,以下源码是证据:
    /* Remote RpcEndpointRef for the ReceiverTracker /
    private val trackerEndpoint = RpcUtils.makeDriverRef(“ReceiverTracker”, env.conf, env.rpcEnv)

    在ReceiverTracker启动时,新建的消息通讯体的名字确实是”ReceiverTracker”:

      /** Start the endpoint and receiver execution thread. */
      def start(): Unit = synchronized {
        if (isTrackerStarted) {
          throw new SparkException("ReceiverTracker already started")
        }
    
        if (!receiverInputStreams.isEmpty) {
          endpoint = ssc.env.rpcEnv.setupEndpoint(
            "ReceiverTracker", new ReceiverTrackerEndpoint(ssc.env.rpcEnv))
          if (!skipReceiverLaunch) launchReceivers()
          logInfo("ReceiverTracker started")
          trackerState = Started
        }
      }

    ReceiverTracker是整个block管理的中心,它使用ReceiverTrackerEndpoint消息循环体接受来自Receiver的消息,我们来看下该消息通讯体:

    /** RpcEndpoint to receive messages from the receivers. */
      private class ReceiverTrackerEndpoint(override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint {
    
        // TODO Remove this thread pool after https://github.com/apache/spark/issues/7385 is merged
        private val submitJobThreadPool = ExecutionContext.fromExecutorService(
          ThreadUtils.newDaemonCachedThreadPool("submit-job-thread-pool"))
    
        private val walBatchingThreadPool = ExecutionContext.fromExecutorService(
          ThreadUtils.newDaemonCachedThreadPool("wal-batching-thread-pool"))
    
        @volatile private var active: Boolean = true
    
        override def receive: PartialFunction[Any, Unit] = {
          // Local messages
          case StartAllReceivers(receivers) =>
            val scheduledLocations = schedulingPolicy.scheduleReceivers(receivers, getExecutors)
            for (receiver <- receivers) {
              val executors = scheduledLocations(receiver.streamId)
              updateReceiverScheduledExecutors(receiver.streamId, executors)
              receiverPreferredLocations(receiver.streamId) = receiver.preferredLocation
              startReceiver(receiver, executors)
            }
          case RestartReceiver(receiver) =>
            // Old scheduled executors minus the ones that are not active any more
            val oldScheduledExecutors = getStoredScheduledExecutors(receiver.streamId)
            val scheduledLocations = if (oldScheduledExecutors.nonEmpty) {
                // Try global scheduling again
                oldScheduledExecutors
              } else {
                val oldReceiverInfo = receiverTrackingInfos(receiver.streamId)
                // Clear "scheduledLocations" to indicate we are going to do local scheduling
                val newReceiverInfo = oldReceiverInfo.copy(
                  state = ReceiverState.INACTIVE, scheduledLocations = None)
                receiverTrackingInfos(receiver.streamId) = newReceiverInfo
                schedulingPolicy.rescheduleReceiver(
                  receiver.streamId,
                  receiver.preferredLocation,
                  receiverTrackingInfos,
                  getExecutors)
              }
            // Assume there is one receiver restarting at one time, so we don't need to update
            // receiverTrackingInfos
            startReceiver(receiver, scheduledLocations)
          case c: CleanupOldBlocks =>
            receiverTrackingInfos.values.flatMap(_.endpoint).foreach(_.send(c))
          case UpdateReceiverRateLimit(streamUID, newRate) =>
            for (info <- receiverTrackingInfos.get(streamUID); eP <- info.endpoint) {
              eP.send(UpdateRateLimit(newRate))
            }
          // Remote messages
          case ReportError(streamId, message, error) =>
            reportError(streamId, message, error)
        }
    
        override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
          // Remote messages
          case RegisterReceiver(streamId, typ, host, executorId, receiverEndpoint) =>
            val successful =
              registerReceiver(streamId, typ, host, executorId, receiverEndpoint, context.senderAddress)
            context.reply(successful)
          case AddBlock(receivedBlockInfo) =>
            if (WriteAheadLogUtils.isBatchingEnabled(ssc.conf, isDriver = true)) {
              walBatchingThreadPool.execute(new Runnable {
                override def run(): Unit = Utils.tryLogNonFatalError {
                  if (active) {
                    context.reply(addBlock(receivedBlockInfo))
                  } else {
                    throw new IllegalStateException("ReceiverTracker RpcEndpoint shut down.")
                  }
                }
              })
            } else {
              context.reply(addBlock(receivedBlockInfo))
            }
          case DeregisterReceiver(streamId, message, error) =>
            deregisterReceiver(streamId, message, error)
            context.reply(true)
          // Local messages
          case AllReceiverIds =>
            context.reply(receiverTrackingInfos.filter(_._2.state != ReceiverState.INACTIVE).keys.toSeq)
          case StopAllReceivers =>
            assert(isTrackerStopping || isTrackerStopped)
            stopReceivers()
            context.reply(true)
        }
    
        /**
         * Return the stored scheduled executors that are still alive.
         */
        private def getStoredScheduledExecutors(receiverId: Int): Seq[TaskLocation] = {
          if (receiverTrackingInfos.contains(receiverId)) {
            val scheduledLocations = receiverTrackingInfos(receiverId).scheduledLocations
            if (scheduledLocations.nonEmpty) {
              val executors = getExecutors.toSet
              // Only return the alive executors
              scheduledLocations.get.filter {
                case loc: ExecutorCacheTaskLocation => executors(loc)
                case loc: TaskLocation => true
              }
            } else {
              Nil
            }
          } else {
            Nil
          }
        }
    
        /**
         * Start a receiver along with its scheduled executors
         */
        private def startReceiver(
            receiver: Receiver[_],
            scheduledLocations: Seq[TaskLocation]): Unit = {
          def shouldStartReceiver: Boolean = {
            // It's okay to start when trackerState is Initialized or Started
            !(isTrackerStopping || isTrackerStopped)
          }
    
          val receiverId = receiver.streamId
          if (!shouldStartReceiver) {
            onReceiverJobFinish(receiverId)
            return
          }
    
          val checkpointDirOption = Option(ssc.checkpointDir)
          val serializableHadoopConf =
            new SerializableConfiguration(ssc.sparkContext.hadoopConfiguration)
    
          // Function to start the receiver on the worker node
          val startReceiverFunc: Iterator[Receiver[_]] => Unit =
            (iterator: Iterator[Receiver[_]]) => {
              if (!iterator.hasNext) {
                throw new SparkException(
                  "Could not start receiver as object not found.")
              }
              if (TaskContext.get().attemptNumber() == 0) {
                val receiver = iterator.next()
                assert(iterator.hasNext == false)
                val supervisor = new ReceiverSupervisorImpl(
                  receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)
                supervisor.start()
                supervisor.awaitTermination()
              } else {
                // It's restarted by TaskScheduler, but we want to reschedule it again. So exit it.
              }
            }
    
          // Create the RDD using the scheduledLocations to run the receiver in a Spark job
          val receiverRDD: RDD[Receiver[_]] =
            if (scheduledLocations.isEmpty) {
              ssc.sc.makeRDD(Seq(receiver), 1)
            } else {
              val preferredLocations = scheduledLocations.map(_.toString).distinct
              ssc.sc.makeRDD(Seq(receiver -> preferredLocations))
            }
          receiverRDD.setName(s"Receiver $receiverId")
          ssc.sparkContext.setJobDescription(s"Streaming job running receiver $receiverId")
          ssc.sparkContext.setCallSite(Option(ssc.getStartSite()).getOrElse(Utils.getCallSite()))
    
          val future = ssc.sparkContext.submitJob[Receiver[_], Unit, Unit](
            receiverRDD, startReceiverFunc, Seq(0), (_, _) => Unit, ())
          // We will keep restarting the receiver job until ReceiverTracker is stopped
          future.onComplete {
            case Success(_) =>
              if (!shouldStartReceiver) {
                onReceiverJobFinish(receiverId)
              } else {
                logInfo(s"Restarting Receiver $receiverId")
                self.send(RestartReceiver(receiver))
              }
            case Failure(e) =>
              if (!shouldStartReceiver) {
                onReceiverJobFinish(receiverId)
              } else {
                logError("Receiver has been stopped. Try to restart it.", e)
                logInfo(s"Restarting Receiver $receiverId")
                self.send(RestartReceiver(receiver))
              }
          }(submitJobThreadPool)
          logInfo(s"Receiver ${receiver.streamId} started")
        }
    
        override def onStop(): Unit = {
          submitJobThreadPool.shutdownNow()
          active = false
          walBatchingThreadPool.shutdown()
        }
    
        /**
         * Call when a receiver is terminated. It means we won't restart its Spark job.
         */
        private def onReceiverJobFinish(receiverId: Int): Unit = {
          receiverJobExitLatch.countDown()
          receiverTrackingInfos.remove(receiverId).foreach { receiverTrackingInfo =>
            if (receiverTrackingInfo.state == ReceiverState.ACTIVE) {
              logWarning(s"Receiver $receiverId exited but didn't deregister")
            }
          }
        }
    
        /** Send stop signal to the receivers. */
        private def stopReceivers() {
          receiverTrackingInfos.values.flatMap(_.endpoint).foreach { _.send(StopReceiver) }
          logInfo("Sent stop signal to all " + receiverTrackingInfos.size + " receivers")
        }
      }

    我们来过滤下ReceiverTracker的源码。

    ** Enumeration to identify current state of a Receiver */
    private[streaming] object ReceiverState extends Enumeration {
      //type是scala语法,别名
      type ReceiverState = Value
      val INACTIVE, SCHEDULED, ACTIVE = Value
    }
    
    /**
     * Messages used by the NetworkReceiver and the ReceiverTracker to communicate
     * with each other.
     */
     //sealed:是scala语法,表示该类包含它所有的子类, 即所有的消息类型都在这里。
    private[streaming] sealed trait ReceiverTrackerMessage
    private[streaming] case class RegisterReceiver(
        streamId: Int,
        typ: String,
        host: String,
        executorId: String,
        receiverEndpoint: RpcEndpointRef
      ) extends ReceiverTrackerMessage
    private[streaming] case class AddBlock(receivedBlockInfo: ReceivedBlockInfo)
      extends ReceiverTrackerMessage
    private[streaming] case class ReportError(streamId: Int, message: String, error: String)
    private[streaming] case class DeregisterReceiver(streamId: Int, msg: String, error: String)
      extends ReceiverTrackerMessage
    
    /**
     * Messages used by the driver and ReceiverTrackerEndpoint to communicate locally.
     */
    private[streaming] sealed trait ReceiverTrackerLocalMessage
    
    /**
     * This message will trigger ReceiverTrackerEndpoint to restart a Spark job for the receiver.
     */
    private[streaming] case class RestartReceiver(receiver: Receiver[_])
      extends ReceiverTrackerLocalMessage
    
    /**
     * This message is sent to ReceiverTrackerEndpoint when we start to launch Spark jobs for receivers
     * at the first time.
     */
    private[streaming] case class StartAllReceivers(receiver: Seq[Receiver[_]])
      extends ReceiverTrackerLocalMessage
    
    /**
     * This message will trigger ReceiverTrackerEndpoint to send stop signals to all registered
     * receivers.
     */
    private[streaming] case object StopAllReceivers extends ReceiverTrackerLocalMessage
    
    /**
     * A message used by ReceiverTracker to ask all receiver's ids still stored in
     * ReceiverTrackerEndpoint.
     */
     //ReceiverTrackerEndpoint存储了AllReceiverIds信息
    private[streaming] case object AllReceiverIds extends ReceiverTrackerLocalMessage
    
    private[streaming] case class UpdateReceiverRateLimit(streamUID: Int, newRate: Long)
      extends ReceiverTrackerLocalMessage

    receiver执行的管理包括3个方面:receiver的启动与重新启动,receiver的回收,receiver执行过程中接受数据的管理。

    /**
     * This class manages the execution of the receivers of ReceiverInputDStreams. Instance of
     * this class must be created after all input streams have been added and StreamingContext.start()
     * has been called because it needs the final set of input streams at the time of instantiation.
     *
     * @param skipReceiverLaunch Do not launch the receiver. This is useful for testing.
     */
    private[streaming]
    class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false) 
    //ReceiverTracker是driver级别,不需要发送到executor,故不需要序列化。
    extends Logging {
    
      //从DstreamGraph中取得所有的input streams信息
      private val receiverInputStreams = ssc.graph.getReceiverInputStreams()
      private val receiverInputStreamIds = receiverInputStreams.map { _.id }
      private val receivedBlockTracker = new ReceivedBlockTracker(
        ssc.sparkContext.conf,
        ssc.sparkContext.hadoopConfiguration,
        receiverInputStreamIds,
        ssc.scheduler.clock,
        ssc.isCheckpointPresent,
        Option(ssc.checkpointDir)
      )
      //listenerBus对监控很关键
      private val listenerBus = ssc.scheduler.listenerBus

    我们来看下ReceiverTracker对receiveAndReply消息的处理:

       override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
          // Remote messages
          case RegisterReceiver(streamId, typ, host, executorId, receiverEndpoint) =>
            val successful =
              registerReceiver(streamId, typ, host, executorId, receiverEndpoint, context.senderAddress)
            context.reply(successful)
          case AddBlock(receivedBlockInfo) =>
            if (WriteAheadLogUtils.isBatchingEnabled(ssc.conf, isDriver = true)) {
              //这里用到了线程池里的线程,因为wal的处理是耗时的,不能阻塞主线程
              walBatchingThreadPool.execute(new Runnable {
                override def run(): Unit = Utils.tryLogNonFatalError {
                  if (active) {
                    context.reply(addBlock(receivedBlockInfo))
                  } else {
                    throw new IllegalStateException("ReceiverTracker RpcEndpoint shut down.")
                  }
                }
              })
            } else {
              //非wal模式的处理:
              context.reply(addBlock(receivedBlockInfo))
            }
          case DeregisterReceiver(streamId, message, error) =>
            deregisterReceiver(streamId, message, error)
            context.reply(true)
          // Local messages
          case AllReceiverIds =>
            context.reply(receiverTrackingInfos.filter(_._2.state != ReceiverState.INACTIVE).keys.toSeq)
          case StopAllReceivers =>
            assert(isTrackerStopping || isTrackerStopped)
            stopReceivers()
            context.reply(true)
        }

    无论是否启动了wal,都会调用receivedBlockTracker.addBlock()来处理:

     /** Add new blocks for the given stream */
      private def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = {
        receivedBlockTracker.addBlock(receivedBlockInfo)
      }
    

    来看下receivedBlockTracker:

    /**
     * Class that keep track of all the received blocks, and allocate them to batches
     * when required. All actions taken by this class can be saved to a write ahead log
     * (if a checkpoint directory has been provided), so that the state of the tracker
     * (received blocks and block-to-batch allocations) can be recovered after driver failure.
     *
     * Note that when any instance of this class is created with a checkpoint directory,
     * it will try reading events from logs in the directory.
     */
    private[streaming] class ReceivedBlockTracker(
        conf: SparkConf,
        hadoopConf: Configuration,
        streamIds: Seq[Int],
        clock: Clock,
        recoverFromWriteAheadLog: Boolean,
        checkpointDirOption: Option[String])
      extends Logging {

    来看下receivedBlockTracker.addBlock(receivedBlockInfo):

      /** Add received block. This event will get written to the write ahead log (if enabled). */
      def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = {
        try {
          val writeResult = writeToLog(BlockAdditionEvent(receivedBlockInfo))
          if (writeResult) {
            synchronized {
              getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo
            }
            logDebug(s"Stream ${receivedBlockInfo.streamId} received " +
              s"block ${receivedBlockInfo.blockStoreResult.blockId}")
          } else {
            logDebug(s"Failed to acknowledge stream ${receivedBlockInfo.streamId} receiving " +
              s"block ${receivedBlockInfo.blockStoreResult.blockId} in the Write Ahead Log.")
          }
          writeResult
        } catch {
          case NonFatal(e) =>
            logError(s"Error adding block $receivedBlockInfo", e)
            false
        }
      }

    receivedBlockTracker把unallocated blocks 分配给batch:

    /**
       * Allocate all unallocated blocks to the given batch.
       * This event will get written to the write ahead log (if enabled).
       */
      def allocateBlocksToBatch(batchTime: Time): Unit = synchronized {
        if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) {
          val streamIdToBlocks = streamIds.map { streamId =>
              (streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true))
          }.toMap
          val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)
          if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) {
            timeToAllocatedBlocks.put(batchTime, allocatedBlocks)
            lastAllocatedBatchTime = batchTime
          } else {
            logInfo(s"Possibly processed batch $batchTime need to be processed again in WAL recovery")
          }
        } else {
          // This situation occurs when:
          // 1. WAL is ended with BatchAllocationEvent, but without BatchCleanupEvent,
          // possibly processed batch job or half-processed batch job need to be processed again,
          // so the batchTime will be equal to lastAllocatedBatchTime.
          // 2. Slow checkpointing makes recovered batch time older than WAL recovered
          // lastAllocatedBatchTime.
          // This situation will only occurs in recovery time.
          logInfo(s"Possibly processed batch $batchTime need to be processed again in WAL recovery")
        }
      }

    我们来看下ReceiverTracker对receive消息的处理:

     override def receive: PartialFunction[Any, Unit] = {
          // Local messages
          case StartAllReceivers(receivers) =>
            val scheduledLocations = schedulingPolicy.scheduleReceivers(receivers, getExecutors)
            for (receiver <- receivers) {
              val executors = scheduledLocations(receiver.streamId)
              updateReceiverScheduledExecutors(receiver.streamId, executors)
              receiverPreferredLocations(receiver.streamId) = receiver.preferredLocation
              startReceiver(receiver, executors)
            }
          //当receiver出故障时,需要发消息重启
          case RestartReceiver(receiver) =>
            // Old scheduled executors minus the ones that are not active any more
            val oldScheduledExecutors = getStoredScheduledExecutors(receiver.streamId)
            val scheduledLocations = if (oldScheduledExecutors.nonEmpty) {
                // Try global scheduling again
                oldScheduledExecutors
              } else {
                val oldReceiverInfo = receiverTrackingInfos(receiver.streamId)
                // Clear "scheduledLocations" to indicate we are going to do local scheduling
                val newReceiverInfo = oldReceiverInfo.copy(
                  state = ReceiverState.INACTIVE, scheduledLocations = None)
                receiverTrackingInfos(receiver.streamId) = newReceiverInfo
                schedulingPolicy.rescheduleReceiver(
                  receiver.streamId,
                  receiver.preferredLocation,
                  receiverTrackingInfos,
                  getExecutors)
              }
            // Assume there is one receiver restarting at one time, so we don't need to update
            // receiverTrackingInfos
            startReceiver(receiver, scheduledLocations)
          //清楚掉处理过了的不需要的block信息
          case c: CleanupOldBlocks =>
            receiverTrackingInfos.values.flatMap(_.endpoint).foreach(_.send(c))
          //限制数据处理速度,限流
          case UpdateReceiverRateLimit(streamUID, newRate) =>
            for (info <- receiverTrackingInfos.get(streamUID); eP <- info.endpoint) {
              eP.send(UpdateRateLimit(newRate))
            }
          // Remote messages
          //报错
          case ReportError(streamId, message, error) =>
            reportError(streamId, message, error)
        }

    总结:receiver接受到数据,合并并存储数据之后,ReceiverSupervisorImpl会把元数据汇报给ReceiverTracker,ReceiverTracker接受到元数据之后,内部有个ReceivedBlockTracker来会管理接受的数据的分配,JobGenerator会在每个batchDuartion内从ReceiverTracker获取元数据信息,据此生成rdd。
    ReceivedBlockTracker在内部专门来管理block的元数据信息。
    从设计模式角度讲,表面干活的是ReceiverTracker,而实际干活的是ReceivedBlockTracker,这是门面模式 Facade模式。

    本次分享来自于王家林老师的课程‘源码版本定制发行班’,在此向王家林老师表示感谢!
    欢迎大家交流技术知识!一起学习,共同进步!

    展开全文
  • 资金流宛如血液循环系统:也是企业运营质量的综合反映。借助工具可及时发现和控制原料过量与停工待料现象、产品积压现象及销售环节的呆帐现象,及时解决浪费问题,及时暴露管理瓶颈,解决企业中普遍存在的财务管理仅...
  • 消息循环系统 c. ReceiverTracker具体实现注:本讲内容基于Spark 1.6.1版本(在2016年5月来说是Spark最新版本)讲解。上节回顾上一讲中,我们主要给大家介绍Spark Streaming在接收数据的全生命周期贯通;a. 当有...
  • 深度学习入门到精通

    2018-06-07 14:36:06
    深度学习课程大全讲解,入门到精通,具体内容包括R-CNN到FASTERR-CNN目标检测与文本摘要NIC算法、基于LSTM的对话系统、【实战】LSTM模型写诗、循环神经网络RNN+LSTM+GRU、实战手写数字识别与人脸识别TensorFlow、...
  • C语言入门规则.pdf

    2020-09-08 16:20:45
    注明以下及其后续内容部分摘自 Standard C++ Bible 所有程序代码都在 Visual Stdio 6.0 中编译运行操作系统为 WinXP 本文不涉及 VC6.0 开发工具的使用 只讲解 C++ 语法知识 C++ 和 C 的共同部分就不讲解了 如常量和...
  • Visual Stdio 6.0 中编译运行操作系统为WinXP本文不涉及 VC6.0 开发工具的使用 只讲解 C++语法知识 C++和 C 的共同部分就不讲解了如常量和变量循环语句和循环控制数组和 指针等这里面的一些区别会在本节和下节介绍...
  • c语言与C++的区别.docx

    2020-03-09 13:38:19
    Visual Stdio 6.0 中编译运行操作系统为 WinXP本文不涉及 VC6.0 开发工具的使用只讲解 C++语法知 识 C++和 C 的共同部分就不讲解了如 常量和变量循环语句和循环控制数组和指针等 这里面的一些区别会在本节和下节介绍...
  • Linux定时执行程序

    2013-07-19 13:58:48
    Linux有时一些程序需要定时执行,以保证通过程序获得的信息是最新的,这便要用到crontab命令,crontab是一个很方便的在unix/linux系统上定时(循环)执行某个任务的程序,具体讲解如下: cron服务是一个定时执行的...
  • C++与C语言的区别 上

    千次阅读 2012-07-31 12:07:57
    注明:以下及其后续内容部分...C++和C的共同部分就不讲解了(如 常量和变量,循环语句和循环控制,数组和指针等,这里面的一些区别会在本节和下节介绍一下),具体可看精华区->新手上路->C语言入门,本文着重介绍C++
  • C++实现猜数字游戏

    2020-12-17 09:30:40
    用while循环来实现一猜数字游戏,供大家参考,具体内容如下 程序里有随机数的问题,当然讲解的不够细致,要详细讲解可以百度自行查询。 #include #include<stdlib>//rand的头文件,如果用的编译器VS2017就不用这个...
  • 本课程讲解Spark 3.0.0 Driver 启动内幕 的内容,包括:Spark Driver Program 剖析:Spark Driver Program、SparkContext 深度剖析、SparkContext 源码解析;... 打通Spark 系统运行内幕机制循环流程。
  • C++与C语言的区别

    千次阅读 2011-04-04 17:05:00
    C++和C的共同部分就不讲解了(如 常量和变量,循环语句和循环控制,数组和指针等,这里面的一些区别会在本节和下节介绍一下),具体可看精华区->新手上路->C语言入门,本文着重介绍C++的特点,如类、继承和多重继承...
  • Shell应用(四)

    2020-10-19 19:55:15
    在前面几章我们已经介绍了Shell脚本的编写规则与各种语句的具体应用,但是实际生 产环境中,Shell脚本通常与正则表达式,文本处理工具结合使用。因此本章我们将介绍 正则表达式与Linux系统中两个功能强大的文本处理...
  • 信息论与编码理论

    2017-09-12 14:57:25
    第二部分介绍了一些基于香农编码理论的信道和信源编码方法,具体包括线性码、循环玛、BCH和RS码、卷积码等信道纠错编码,以及变长信源编码等。《信息论与编码理论》内容丰富翔实,对基本概念和基础理论的阐述清晰...
  • 本书是普通高等教育“十五”国家级规划教材,书中详细介绍了C++程序设计语言的语法规则和编程方法,同时通过编程实例讲解如何使用C++语言求解与实现具有应用背景的各种具体问题,从而提高读者的编程与动手能力,为...
  • 资源介绍:本书用图解的方式对易语言的使用方法和操作技巧作了生动、系统讲解。全书分十章,分十天讲完。第一章是介绍易语言的安装,以及运行后的界面。同时介绍一个非常简单的小程序,以帮助用户入门学习。最后...

空空如也

空空如也

1 2 3 4 5 ... 7
收藏数 140
精华内容 56
关键字:

循环系统具体讲解