精华内容
参与话题
问答
  • 向线程池提交任务的两种方式

    千次阅读 2019-06-29 23:06:32
    方式一:提交无返回值的任务(execute); class RunnableThread implements Runnable{ @Override public void run() { for(int i=0;i<10;i++){ System.out.println (Thread.currentThread ().get...

    方式一:提交无返回值的任务(execute);

    class RunnableThread implements  Runnable{
        @Override
        public void run() {
            for(int i=0;i<10;i++){
                System.out.println (Thread.currentThread ().getName ()+"."+i );
            }
        }
    }
    public class Test {
        public static void main(String[] args) {
            RunnableThread runnableThread=new RunnableThread ();
            ThreadPoolExecutor threadPoolExecutor=new ThreadPoolExecutor ( 3,5,2000,
                    TimeUnit.MILLISECONDS,new LinkedBlockingDeque <> (  ) );
            for (int i=0;i<5;i++){
                threadPoolExecutor.execute ( runnableThread );
            }
            List<Runnable> runnables=threadPoolExecutor.shutdownNow ();
            System.out.println (runnables.size () );
            System.out.println ("是否停下"+threadPoolExecutor.isShutdown () );
            System.out.println ("是否终止"+threadPoolExecutor.isTerminated () );
            System.out.println ("获取CPU个数"+Runtime.getRuntime ().availableProcessors () );
                    while (threadPoolExecutor.isTerminated ()){
                        break;
                    }
    
    
        }
    }

     二、提交有返回值的任务(submit())

    class  callableThread implements Callable<Integer>{
        @Override
        public Integer call() throws Exception {
            int sum=0;u
            for(int i=0;i<20;i++){
                sum+=i;
                System.out.println (Thread.currentThread ().getName ()+","+sum );
            }
            return sum;
        }
    }
    public class Test {
        public static void main(String[] args) {
           Callable CallableThread=new callableThread();
            ThreadPoolExecutor threadPoolExecutor=new ThreadPoolExecutor ( 3,5,2000,
                    TimeUnit.MILLISECONDS,new LinkedBlockingDeque<> (  ) );
            for(int i=0;i<5;i++){
                Future future=threadPoolExecutor.submit ( CallableThread );
                Integer Sum= null;
                try {
                    Sum = (Integer) future.get ();
                    System.out.println (Sum );
                } catch (InterruptedException e) {
                    e.printStackTrace ( );
                } catch (ExecutionException e) {
                    e.printStackTrace ( );
                }
            }
        }
    }

     

    展开全文
  • YARN的任务提交流程简述及图解

    千次阅读 2018-11-10 09:27:14
    #YARN的任务提交流程简述及图解 1,Client向ResourceManager发出请求,提交程序,(ResourceManager中有Scheduler调度器和ApplicationsManager应用程序管理器 2,ResourceManager向Scheduler返回一个ApplicationID...

    #YARN的任务提交流程简述及图解

    1,Client向ResourceManager发出请求,提交程序,(ResourceManager中有Scheduler调度器和ApplicationsManager应用程序管理器

    2,ResourceManager向Scheduler返回一个ApplicationID作为回应
    3,Client向RM回应Application Submission Context(ASC)。ASC包括ApplicationID、user、queue,以及其他一些启动AM相关的信息,除此之外,还有一个Container Launch Context(CLC),CLC包含了资源请求数(内存与CPU),job files,安全token,以及其他一些用以在一个node上启动AM的信息。任务一旦提交以后,client可以请求RM去杀死应用或查询应用的运行状态,

    4,当RM接受到ASC后,它会调度一个合适的container来启动AM,这个container经常被称作为container 0。AM需要请求其他的container来运行任务,如果没有合适的container,AM就不能启动。当有合适的container时,RM发请求到合适的NM上,来启动AM。这时候,AM的PRC与监控的URL就已经建立了。
    5,当AM启动起来后,ResourceManager回应给AM集群的最小与最大资源等信息。这时AM必须决定如何使用那么当前可用的资源。YARN不像那些请求固定资源的scheduler,它能够根据集群的当前状态动态调整。

    6,AM根据从RM那里得知的可使用的资源,它会请求一些一定数目的container。This request can be very specific,including containers with multiples of the resource minimum values (e.g., extra memory)。

    7,ResourceManager将会根据调度策略,尽可能的满足AM申请的container。

    在这里插入图片描述

    展开全文
  • 概略: ...2.Master接收到任务信息后,开始资源调度,此时会和所有的Worker进行通信,找到空闲的Worker,并通知Worker来拿取任务和启动相应的Executor 3.Executor启动后,开始与Driver进行反向注册,...

    概略:
    在这里插入图片描述
    1.Driver端启动SparkSubmit进程,启动后开始向Master进行通信,此时创建了一个对象(SparkContext),接着向Master发送任务消息
    2.Master接收到任务信息后,开始资源调度,此时会和所有的Worker进行通信,找到空闲的Worker,并通知Worker来拿取任务和启动相应的Executor
    3.Executor启动后,开始与Driver进行反向注册,接下来Driver开始把任务发送给相应的Executor,Executor开始计算任务

    全流程:
    在这里插入图片描述
    1.调用SparkSubmit类,内部执行submit --> doRunMain -> 通过反射获取应用程序的主类对象 --> 执行主类的main方法。
    2.构建SparkConf和SparkContext对象,在SparkContext入口做了三件事,创建了SparkEnv对象(创建了ActorSystem对象),TaskScheduler(用来生成并发送task给Executor),DAGScheduler(用来划分Stage)。
    3.ClientActor将任务信息封装到ApplicationDescription对象里并且提交给Master。
    4.Master收到ClientActor提交的任务信息后,把任务信息存在内存中,然后又将任务信息放到队列中。
    5.当开始执行这个任务信息的时候,调用scheduler方法,进行资源的调度。
    6.将调度好的资源封装到LaunchExecutor并发送给对应的Worker。
    7.Worker接收到Master发送过来的调度信息(LaunchExecutor)后,将信息封装成一个ExecutorRunner对象。
    8.封装成ExecutorRunner后,调用ExecutorRunner的start方法,开始启动 CoarseGrainedExecutorBackend对象。
    9.Executor启动后向DriverActor进行反向注册。
    10.与DriverActor注册成功后,创建一个线程池(ThreadPool),用来执行任务。
    11.当所有的Executor注册完成后,意味着作业环境准备好了,Driver端会结束与SparkContext对象的初始化。
    12.当Driver初始化完成后(创建了sc实例),会继续执行我们提交的App的代码,当触发了Action的RDD算子时,就触发了一个job,这时就会调用DAGScheduler对象进行Stage划分。
    13.DAGScheduler开始进行Stage划分。
    14.将划分好的Stage按照区域生成一个一个的task,并且封装到TaskSet对象,然后TaskSet提交到TaskScheduler。
    15.TaskScheduler接收到提交过来的TaskSet,拿到一个序列化器,对TaskSet序列化,将序列化好的TaskSet封装到LaunchExecutor并提交到DriverActor。
    16.把LaunchExecutor发送到Executor上。
    17.Executor接收到DriverActor发送过来的任务(LaunchExecutor),会将其封装成TaskRunner,然后从线程池中获取线程来执行TaskRunner。
    18.TaskRunner拿到反序列化器,反序列化TaskSet,然后执行App代码,也就是对RDD分区上执行的算子和自定义函数。

    展开全文
  • 向YARN提交wordcount任务 1、首先在HDFS创建输入文件目录,并将待处理的wordcount文件传入相应的输入文件目录。 # 创建输入文件目录 hadoop fs -mkdir -p /tmp/jbw/wordcount_input_dir # 将待处理的文件上传至...

    前提

    已经搭建好Hadoop环境。

    向YARN提交wordcount任务

    1、首先在HDFS创建输入文件目录,并将待处理的wordcount文件传入相应的输入文件目录。

    # 创建输入文件目录
    hadoop fs -mkdir -p /tmp/jbw/wordcount_input_dir
    
    # 将待处理的文件上传至对应目录
    hadoop fs -put /mnt/disk1/linken_speech.txt /tmp/jbw/wordcount_input_dir
    hadoop fs -ls /tmp/jbw/wordcount_input_dir


    这里写图片描述

    2、运行Hadoop的woedcount样例程序(向YARN提交作业)

    参数中指定jar执行文件、输入数据目录(需要先创建好,并将待处理文本上传至其中)、输出目录(无需创建,由样例程序自己生成)。

    hadoop jar hadoop/bin/hadoop-mapreduce-examples.jar wordcount /tmp/jbw/wordcount_input_dir /tmp/jbw/wordcount_output_dir

    执行过程如下图,可以看到wordcount执行过程会分map和reduce两个阶段。
    这里写图片描述

    3、查看运行结果

    在HDFS的输出文件目录下查看是否有结果文件,并查看。

    hadoop fs -ls /tmp/jbw/wordcount_output_dir
    hadoop fs -cat /tmp/jbw/wordcount_output_dir/part-r-00000

    这里写图片描述
    结果如下,可以看到每个单词的出现次数已经被统计出来:
    这里写图片描述

    Kill掉YARN上的某个任务

    我们可以kill掉提交给YARN上的任何执行中的任务。这里以大数据基准测试TPC造数据作为向YARN提交的任务。关于TPC,它其实会生成大量不同数量级别的用于测试大数据平台性能的标准测试数据。这里选它的原因是它造大量数据的时间比较长,我们有充分的时间可以kill掉它。

    解压tpcds-5.x.tar.gz文件,进入bin目录执行./gen-date.sh:

    tar -zxvf tpcds-5.x.tar.gz
    cd tpcds/bin
    # 生成数据
    ./gen-date.sh

    过程如下:
    这里写图片描述

    现在我们看一下YARN上有哪些运行的作业,并查看状态:

    yarn application -list
    # 根据任务ID查看任务状态
    yarn application -status application_1528449227004_0002

    这里写图片描述

    指定任务ID,kill掉它

    yarn application -kill application_1528449227004_0002

    这里写图片描述

    展开全文
  • 笔者是一个痴迷于挖掘数据中的价值的学习人,希望在平日的工作学习中,挖掘数据的价值,找寻数据的秘密,笔者认为,数据的价值不仅仅只体现在企业中,个人也可以体会到数据的魅力,用技术力量探索行为密码,让大数据...
  • YARN任务提交流程

    千次阅读 2017-07-03 10:01:47
    yarn取代了以前hadoop中jobtracker(后面简写JT)的角色,因为以前JT的 任务过重,负责任务的调度、跟踪、失败重启等过程,而且只能运行mapreduce作业,不支持其他编程模式,这也限制了JT使用范围,而yarn应运而 生...
  • hadoop任务提交过程

    千次阅读 2015-06-14 22:30:01
    WordCountMapper: private final static IntWritable one = new IntWritable(1); @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
  • Hadoop任务提交过程

    千次阅读 2014-07-21 20:03:13
    Hadoop任务提交分析 分析工具和环境 下载一份hadoop的源码,这里以hadoop-1.1.2为例。本地IDE环境为eclipse,导入整个目录,然后可以在IDE里面看到目录结构了,要分析任务提交过程,需要找到入口代码,很...
  • Flink任务提交模式

    千次阅读 2018-04-12 13:15:27
    local模式本地运行,不需要集群环境IDE开发时,local模式方便本地测试standalone需要搭建flink集群提交命令flink run -m artemis-02:6123 -c ...
  • flink任务提交

    千次阅读 2018-08-07 11:59:18
    flink安装启动后,默认端口为8081。 浏览器启动 http://ip:8081,可以进入可视化界面。可以看到正在运行的任务及状态。 若要提交新的任务,点击“Submit new Job” -- “Add New+”,...提交后可看到任务运行及...
  • MapReduce框架以及Hive任务提交详解

    千次阅读 2019-11-01 16:45:30
    在切Hive任务到Spark的时候,发现Spark在处理只有Hive元数据而HDFS文件块丢失的任务时,会抛HDFS的异常InvalidInputException,而Hive在这种情况下不受影响。 因此,就去找Hive在处理只有元数据的空表时做的优化。...
  • 关于spark任务提交的几种方式

    千次阅读 2019-04-08 13:25:27
    1.Spark当前支持三种集群管理方式 Standalone—Spark自带的一种集群管理方式,易于构建集群。 Apache Mesos—通用的集群管理,可以在其上运行Hadoop MapReduce和一些服务应用。 Hadoop YARN—Hadoop2中的资源管理器...
  • Flink任务提交流程和任务调度原理

    千次阅读 2020-01-30 22:14:46
    Flink任务提交后,Client向HDFS上传Flink的Jar包和配置,之后向Yarn ResourceManager提交任务,ResourceManager分配Container资源并通知对应的NodeManager启动ApplicationMaster,ApplicationMaster启动后加载Flink...
  • Spark集群任务提交

    千次阅读 2017-06-09 16:10:06
    1. 集群管理器 Spark当前支持三种集群管理方式 Standalone—Spark自带的一种集群管理方式,易于构建集群。 Apache Mesos—通用的集群管理,可以在其上运行Hadoop MapReduce和一些服务应用。...
  • 图解Spark的任务提交的四种方式

    千次阅读 2019-05-23 21:33:07
  • spark集群的任务提交执行流程

    万次阅读 2018-03-07 20:41:30
    本文转自:https://www.linuxidc.com/Linux/2018-02/150886.htm一、Spark on Standalone1.spark集群启动后,Worker向Master注册信息2.spark-submit命令提交程序后,driver和application也会向Master注册信息3....
  • spark任务提交流程(standalone)

    千次阅读 2018-06-09 18:43:09
    spark程序使用spark-submit方式提交,如果是standalone集群的话,会在提交任务的节点启动一个driver进程; dirver进程启动以后,首先是构建sparkcontext,sparkcontext主要包含两部分:DAGScheduler和TaskScheduler...
  • 文章目录1.Flink多种提交方式对比1.1 local模式1.1.1 纯粹的local模式运行1.1.2 local使用remote的方式运行1.1.3 本地提交到remote集群1.2 standalone模式1.3 yarn模式1.3.1 yarn-session1.3.2 yarn-cluster2.flink...
  • Yarn中MapReduce任务提交步骤

    千次阅读 2014-12-05 18:43:35
    Submit ApplicationStage job.waitForCompletion  ...final JobSubmitter submitter =getJobSubmitter(cluster.getFileSystem(), cluster.getClient());...submitter.submitJobInternal(Job.

空空如也

1 2 3 4 5 ... 20
收藏数 380,445
精华内容 152,178
关键字:

任务提交