精华内容
下载资源
问答
  • DistCp

    2020-12-18 15:31:40
    目录 前言 常用命令 选项 更新和覆盖 不同HDFS版本间的复制 ...DistCp即distributed copy,分布式复制的意思,是集群间用于处理高I/O复制的工具。其底层基于MapReduce,因此具有分布式的能力,容错性以及对...DistCp

    目录

    前言

    常用命令

    选项

    更新和覆盖

    不同HDFS版本间的复制

    MapReduce和副效应


    前言

    DistCp即distributed copy,分布式复制的意思,是集群间用于处理高I/O复制的工具。其底层基于MapReduce,因此具有分布式的能力,容错性以及对异常的监控和上报能力。它将文件和目录列表展开到映射任务的输入中,每个任务将复制源列表中指定文件的一个分区。

    本文的目的是描述新的DistCp的设计、它的新特性、它们的最佳使用以及与遗留实现的任何偏差。

    常用命令

    DistCp最常见的用法就是集群内文件的复制:

    hadoop distcp hdfs://nn1:8020/foo/bar hdfs://nn2:8020/bar/foo

    还可以在命令行中指定多个源目录:

    hadoop distcp hdfs://nn1:8020/foo/a hdfs://nn1:8020/foo/b hdfs://nn2:8020/bar/foo

    或者使用-f选项,从文件里获得多个源,使用-f选项:

    hadoop distcp -f hdfs://nn1:8020/srclist hdfs://nn2:8020/bar/foo

    其中srclist 的内容是
        hdfs://nn1:8020/foo/a
        hdfs://nn1:8020/foo/b

    当从多个源复制文件时,如果两个源冲突,DistCp会停止复制并打印出错信息, 如果在目的路径发生冲突,会根据选项设置解决。 默认情况会跳过已经存在的目标文件。

    每次操作结束时都会报告跳过的文件数目,但是如果某些操作在当前操作中失败,但之后的尝试却成功了, 那么报告的信息可能不够准确。每个TaskTracker必须能够与源端和目的端文件系统进行访问和交互。对于HDFS来说,源和目的端要运行相同版本的协议或者使用向下兼容的协议。复制完成后,建议生成源端和目的端文件的列表,并交叉检查,来确认复制是成功的。 因为DistCp使用MapReduce和文件系统API进行操作,所以这三者或它们之间有任何问题,都会影响到复制操作。

    一些Distcp命令可能会执行失败,但当带上-update参数再次进行执行时,将被成功执行。但用户在如此操作之前应该对该命令的语法很熟悉。值得注意的是,当另一个客户端同时在向源文件写入时,复制操作很有可能会失败。 尝试覆盖HDFS上正在被写入的文件的操作也会失败。 如果一个源文件在复制之前被移动或删除了,复制失败同时输出异常 FileNotFoundException。

    选项

    选项索引

    标识描述备注
    -p[rbugp]Preserve
      r: replication number
      b: block size
      u: user
      g: group
      p: permission
    修改次数不会被保留。并且当指定 -update 时,更新的状态不会被同步,除非文件大小不同(比如文件被重新创建)。
    -i忽略失败这个选项会比默认情况提供关于复制的更精确的统计, 同时它还将保留失败复制操作的日志,这些日志信息可以用于调试。最后,如果一个map失败了,但并没完成所有分块任务的尝试,这不会导致整个作业的失败。
    -log <logdir>记录日志到 <logdir>DistCp为每个文件的每次尝试复制操作都记录日志,并把日志作为map的输出。 如果一个map失败了,当重新执行时这个日志不会被保留。
    -m <num_maps>同时复制的最大数目指定了复制数据时map的数目。请注意并不是map数越多吞吐量越大。
    -overwrite覆盖目标如果一个map失败并且没有使用-i选项,不仅仅那些复制失败的文件,这个分块任务中的所有文件都会被重新复制。它会改变生成目标路径的语义,用户要小心使用这个选项。
    -update如果源和目标的大小不一样则进行覆盖像之前提到的,这不是"同步"操作。 执行覆盖的唯一标准是源文件和目标文件大小是否相同;如果不同,则源文件替换目标文件。 它也改变生成目标路径的语义, 用户要小心使用这个选项。
    -f <urilist_uri>使用<urilist_uri> 作为源文件列表这等价于把所有文件名列在命令行中。 urilist_uri 列表应该是完整合法的URI。

    更新和覆盖

    这里给出一些 -update和 -overwrite的例子。 考虑一个从/foo/a 和 /foo/b 到 /bar/foo的复制,源路径包括:

        hdfs://nn1:8020/foo/a
        hdfs://nn1:8020/foo/a/aa
        hdfs://nn1:8020/foo/a/ab
        hdfs://nn1:8020/foo/b
        hdfs://nn1:8020/foo/b/ba
        hdfs://nn1:8020/foo/b/ab

    如果没设置-update或 -overwrite选项, 那么两个源都会映射到目标端的 /bar/foo/ab。 如果设置了这两个选项,每个源目录的内容都会和目标目录的内容做比较。DistCp碰到这类冲突的情况会终止操作并退出。默认情况下,/bar/foo/a 和 /bar/foo/b 目录都会被创建,所以并不会有冲突。

    现在考虑一个使用-update合法的操作:

    distcp -update hdfs://nn1:8020/foo/a hdfs://nn1:8020/foo/b hdfs://nn2:8020/bar

    其中源路径/大小:

        hdfs://nn1:8020/foo/a
        hdfs://nn1:8020/foo/a/aa 32
        hdfs://nn1:8020/foo/a/ab 32
        hdfs://nn1:8020/foo/b
        hdfs://nn1:8020/foo/b/ba 64
        hdfs://nn1:8020/foo/b/bb 32

    和目的路径/大小:

        hdfs://nn2:8020/bar
        hdfs://nn2:8020/bar/aa 32
        hdfs://nn2:8020/bar/ba 32
        hdfs://nn2:8020/bar/bb 64

    会产生:

        hdfs://nn2:8020/bar
        hdfs://nn2:8020/bar/aa 32
        hdfs://nn2:8020/bar/ab 32
        hdfs://nn2:8020/bar/ba 64
        hdfs://nn2:8020/bar/bb 32

    只有nn2的aa文件没有被覆盖。如果指定了 -overwrite选项,所有文件都会被覆盖。

    DistCp会尝试着均分需要复制的内容,这样每个map复制差不多相等大小的内容。 但因为文件是最小的复制粒度,所以配置增加同时复制(如map)的数目不一定会增加实际同时复制的数目以及总吞吐量。

    如果没使用-m选项,DistCp会尝试在调度工作时指定map的数目 为 min (total_bytes / bytes.per.map, 20 * num_task_trackers), 其中bytes.per.map默认是256MB。

    建议对于长时间运行或定期运行的作业,根据源和目标集群大小、复制数量大小以及带宽调整map的数目。

    不同HDFS版本间的复制

    对于不同Hadoop版本间的复制,用户应该使用HftpFileSystem。 这是一个只读文件系统,所以DistCp必须运行在目标端集群上(更确切的说是在能够写入目标集群的TaskTracker上)。 源的格式是 hftp://<dfs.http.address>/<path> (默认情况dfs.http.address是 <namenode>:50070)。

    MapReduce和副效应

    像前面提到的,map复制输入文件失败时,会带来一些副效应。

    • 除非使用了-i,任务产生的日志会被新的尝试替换掉。
    • 除非使用了-overwrite,文件被之前的map成功复制后当又一次执行复制时会被标记为 "被忽略"。
    • 如果map失败了mapred.map.max.attempts次,剩下的map任务会被终止(除非使用了-i)。
    • 如果mapred.speculative.execution被设置为 final和true,则复制的结果是未定义的。
    展开全文
  • distcp

    2015-11-18 21:47:42
    [hadoop@hadoopmaster test]$ hadoop distcp hdfs://hadoopmaster:9000/user/hive/warehouse/jacktest.db hdfs://hadoopmaster:9000/jacktest/todir 15/11/18 05:39:30 INFO tools.DistCp: Input Options: DistCpO....
    [hadoop@hadoopmaster test]$ hadoop distcp hdfs://hadoopmaster:9000/user/hive/warehouse/jacktest.db hdfs://hadoopmaster:9000/jacktest/todir
    
    15/11/18 05:39:30 INFO tools.DistCp: Input Options: DistCpOptions{atomicCommit=false, syncFolder=false, deleteMissing=false, ignoreFailures=false, maxMaps=20, sslConfigurationFile='null', copyStrategy='uniformsize', sourceFileListing=null, sourcePaths=[hdfs://hadoopmaster:9000/user/hive/warehouse/jacktest.db], targetPath=hdfs://hadoopmaster:9000/jacktest/todir, targetPathExists=true, preserveRawXattrs=false}
    15/11/18 05:39:30 INFO client.RMProxy: Connecting to ResourceManager at hadoopmaster/192.168.1.50:8032
    15/11/18 05:39:31 INFO Configuration.deprecation: io.sort.mb is deprecated. Instead, use mapreduce.task.io.sort.mb
    15/11/18 05:39:31 INFO Configuration.deprecation: io.sort.factor is deprecated. Instead, use mapreduce.task.io.sort.factor
    15/11/18 05:39:31 INFO client.RMProxy: Connecting to ResourceManager at hadoopmaster/192.168.1.50:8032
    15/11/18 05:39:32 INFO mapreduce.JobSubmitter: number of splits:2
    15/11/18 05:39:32 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1447853441917_0001
    15/11/18 05:39:32 INFO impl.YarnClientImpl: Submitted application application_1447853441917_0001
    15/11/18 05:39:33 INFO mapreduce.Job: The url to track the job: http://hadoopmaster:8088/proxy/application_1447853441917_0001/
    15/11/18 05:39:33 INFO tools.DistCp: DistCp job-id: job_1447853441917_0001
    15/11/18 05:39:33 INFO mapreduce.Job: Running job: job_1447853441917_0001
    15/11/18 05:39:41 INFO mapreduce.Job: Job job_1447853441917_0001 running in uber mode : false
    15/11/18 05:39:41 INFO mapreduce.Job: map 0% reduce 0%
    15/11/18 05:39:48 INFO mapreduce.Job: map 50% reduce 0%
    15/11/18 05:39:50 INFO mapreduce.Job: map 100% reduce 0%
    15/11/18 05:39:50 INFO mapreduce.Job: Job job_1447853441917_0001 completed successfully
    15/11/18 05:39:50 INFO mapreduce.Job: Counters: 33
    File System Counters
    FILE: Number of bytes read=0
    FILE: Number of bytes written=216204
    FILE: Number of read operations=0
    FILE: Number of large read operations=0
    FILE: Number of write operations=0
    HDFS: Number of bytes read=1220
    HDFS: Number of bytes written=24
    HDFS: Number of read operations=31
    HDFS: Number of large read operations=0
    HDFS: Number of write operations=8
    Job Counters
    Launched map tasks=2
    Other local map tasks=2
    Total time spent by all maps in occupied slots (ms)=10356
    Total time spent by all reduces in occupied slots (ms)=0
    Total time spent by all map tasks (ms)=10356
    Total vcore-seconds taken by all map tasks=10356
    Total megabyte-seconds taken by all map tasks=10604544
    Map-Reduce Framework
    Map input records=3
    Map output records=0
    Input split bytes=272
    Spilled Records=0
    Failed Shuffles=0
    Merged Map outputs=0
    GC time elapsed (ms)=156
    CPU time spent (ms)=1320
    Physical memory (bytes) snapshot=342798336
    Virtual memory (bytes) snapshot=1753182208
    Total committed heap usage (bytes)=169869312
    File Input Format Counters
    Bytes Read=924
    File Output Format Counters
    Bytes Written=0
    org.apache.hadoop.tools.mapred.CopyMapper$Counter
    BYTESCOPIED=24
    BYTESEXPECTED=24
    COPY=3
    [hadoop@hadoopmaster test]$ hadoop fs -ls /jacktest
    Found 1 items
    drwxr-xr-x - hadoop supergroup 0 2015-11-18 05:39 /jacktest/todir
    [hadoop@hadoopmaster test]$ hadoop fs -ls /jacktest/todir
    Found 1 items
    drwxr-xr-x - hadoop supergroup 0 2015-11-18 05:39 /jacktest/todir/jacktest.db
    [hadoop@hadoopmaster test]$ hadoop fs -ls /jacktest/todir/jacktest.db
    Found 1 items
    drwxr-xr-x - hadoop supergroup 0 2015-11-18 05:39 /jacktest/todir/jacktest.db/test1
    [hadoop@hadoopmaster test]$ hadoop fs -ls /jacktest/todir/jacktest.db/test1
    Found 1 items
    -rw-r--r-- 1 hadoop supergroup 24 2015-11-18 05:39 /jacktest/todir/jacktest.db/test1/test.body
    [hadoop@hadoopmaster test]$ hadoop fs -cat /jacktest/todir/jacktest.db/test1/test.body
    1,jack
    2,josson
    3,gavin
    [hadoop@hadoopmaster test]$


    hive> create table test1(id int,name string) row format delimited fields terminated by ',';
    OK
    Time taken: 0.454 seconds
    hive> select * from test1;
    OK
    Time taken: 0.65 seconds
    hive> show create table test1;
    OK
    CREATE TABLE `test1`(
    `id` int,
    `name` string)
    ROW FORMAT DELIMITED
    FIELDS TERMINATED BY ','
    STORED AS INPUTFORMAT
    'org.apache.hadoop.mapred.TextInputFormat'
    OUTPUTFORMAT
    'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
    LOCATION
    'hdfs://hadoopmaster:9000/user/hive/warehouse/jacktest.db/test1'
    TBLPROPERTIES (
    'transient_lastDdlTime'='1447853584')
    Time taken: 0.152 seconds, Fetched: 13 row(s)


    [hadoop@hadoopmaster test]$ vi test.body

    1,jack
    2,josson
    3,gavin


    关于协议
    如果两个集群间的版本不一致,那么使用hdfs可能就会产生错误,因为rpc系统不兼容。那么这时候你可以使用基于http协议的hftp协议,但目标地址还必须是hdfs的,象这样:
    hadoop distcp hftp://namenode:50070/user/hadoop/input hdfs://namenode:9000/user/hadoop/input1
    推荐用hftp的替代协议webhdfs,源地址和目标地址都可以使用webhdfs,可以完全兼容


    hadoop distcp hftp://hadoopmaster:50070/user/hive/warehouse/jacktest.db hdfs://hadoopmaster:9000/jacktest/todir1


    [hadoop@hadoopmaster test]$ hadoop fs -mkdir /jacktest/todir1
    [hadoop@hadoopmaster test]$ hadoop distcp hftp://hadoopmaster:9000/user/hive/warehouse/jacktest.db hdfs://hadoopmaster:9000/jacktest/todir1
    15/11/18 05:44:32 INFO tools.DistCp: Input Options: DistCpOptions{atomicCommit=false, syncFolder=false, deleteMissing=false, ignoreFailures=false, maxMaps=20, sslConfigurationFile='null', copyStrategy='uniformsize', sourceFileListing=null, sourcePaths=[hftp://hadoopmaster:9000/user/hive/warehouse/jacktest.db], targetPath=hdfs://hadoopmaster:9000/jacktest/todir1, targetPathExists=true, preserveRawXattrs=false}
    15/11/18 05:44:32 INFO client.RMProxy: Connecting to ResourceManager at hadoopmaster/192.168.1.50:8032
    15/11/18 05:44:33 ERROR tools.DistCp: Invalid input:
    org.apache.hadoop.tools.CopyListing$InvalidInputException: hftp://hadoopmaster:9000/user/hive/warehouse/jacktest.db doesn't exist
    at org.apache.hadoop.tools.GlobbedCopyListing.doBuildListing(GlobbedCopyListing.java:84)
    at org.apache.hadoop.tools.CopyListing.buildListing(CopyListing.java:84)
    at org.apache.hadoop.tools.DistCp.createInputFileListing(DistCp.java:353)
    at org.apache.hadoop.tools.DistCp.execute(DistCp.java:160)
    at org.apache.hadoop.tools.DistCp.run(DistCp.java:121)
    at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
    at org.apache.hadoop.tools.DistCp.main(DistCp.java:401)
    [hadoop@hadoopmaster test]$ hadoop distcp hftp://hadoopmaster:50070/user/hive/warehouse/jacktest.db hdfs://hadoopmaster:9000/jacktest/todir1
    15/11/18 05:45:10 INFO tools.DistCp: Input Options: DistCpOptions{atomicCommit=false, syncFolder=false, deleteMissing=false, ignoreFailures=false, maxMaps=20, sslConfigurationFile='null', copyStrategy='uniformsize', sourceFileListing=null, sourcePaths=[hftp://hadoopmaster:50070/user/hive/warehouse/jacktest.db], targetPath=hdfs://hadoopmaster:9000/jacktest/todir1, targetPathExists=true, preserveRawXattrs=false}
    15/11/18 05:45:10 INFO client.RMProxy: Connecting to ResourceManager at hadoopmaster/192.168.1.50:8032
    15/11/18 05:45:11 INFO Configuration.deprecation: io.sort.mb is deprecated. Instead, use mapreduce.task.io.sort.mb
    15/11/18 05:45:11 INFO Configuration.deprecation: io.sort.factor is deprecated. Instead, use mapreduce.task.io.sort.factor
    15/11/18 05:45:11 INFO client.RMProxy: Connecting to ResourceManager at hadoopmaster/192.168.1.50:8032
    15/11/18 05:45:11 INFO mapreduce.JobSubmitter: number of splits:2
    15/11/18 05:45:11 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1447853441917_0002
    15/11/18 05:45:11 INFO impl.YarnClientImpl: Submitted application application_1447853441917_0002
    15/11/18 05:45:12 INFO mapreduce.Job: The url to track the job: http://hadoopmaster:8088/proxy/application_1447853441917_0002/
    15/11/18 05:45:12 INFO tools.DistCp: DistCp job-id: job_1447853441917_0002
    15/11/18 05:45:12 INFO mapreduce.Job: Running job: job_1447853441917_0002
    15/11/18 05:45:18 INFO mapreduce.Job: Job job_1447853441917_0002 running in uber mode : false
    15/11/18 05:45:18 INFO mapreduce.Job: map 0% reduce 0%
    15/11/18 05:45:24 INFO mapreduce.Job: map 50% reduce 0%
    15/11/18 05:45:26 INFO mapreduce.Job: map 100% reduce 0%
    15/11/18 05:45:26 INFO mapreduce.Job: Job job_1447853441917_0002 completed successfully
    15/11/18 05:45:26 INFO mapreduce.Job: Counters: 38
    File System Counters
    FILE: Number of bytes read=0
    FILE: Number of bytes written=216208
    FILE: Number of read operations=0
    FILE: Number of large read operations=0
    FILE: Number of write operations=0
    HDFS: Number of bytes read=1200
    HDFS: Number of bytes written=24
    HDFS: Number of read operations=25
    HDFS: Number of large read operations=0
    HDFS: Number of write operations=8
    HFTP: Number of bytes read=0
    HFTP: Number of bytes written=0
    HFTP: Number of read operations=0
    HFTP: Number of large read operations=0
    HFTP: Number of write operations=0
    Job Counters
    Launched map tasks=2
    Other local map tasks=2
    Total time spent by all maps in occupied slots (ms)=10014
    Total time spent by all reduces in occupied slots (ms)=0
    Total time spent by all map tasks (ms)=10014
    Total vcore-seconds taken by all map tasks=10014
    Total megabyte-seconds taken by all map tasks=10254336
    Map-Reduce Framework
    Map input records=3
    Map output records=0
    Input split bytes=272
    Spilled Records=0
    Failed Shuffles=0
    Merged Map outputs=0
    GC time elapsed (ms)=104
    CPU time spent (ms)=2240
    Physical memory (bytes) snapshot=345600000
    Virtual memory (bytes) snapshot=1751683072
    Total committed heap usage (bytes)=169869312
    File Input Format Counters
    Bytes Read=928
    File Output Format Counters
    Bytes Written=0
    org.apache.hadoop.tools.mapred.CopyMapper$Counter
    BYTESCOPIED=24
    BYTESEXPECTED=24
    COPY=3
    [hadoop@hadoopmaster test]$
    展开全文
  • hadoop distcp

    2015-02-05 10:37:32
    distcp一般用于在两个HDFS集群中传输数据,如果集群在hadoop的同一版本上运行,就适合使用hdfs方案: % hadoop distcp hdfs://namenode1/foo hdfs://namenode2/bar
  • Distcp方式

    万次阅读 2017-10-31 13:39:55
    一、概述 DistCp(分布式拷贝)是用于大规模集群内部和集群之间拷贝的工具。它使用Map/Reduce实现文件分发,错误处理和恢复,以及报告生成。它把文件和目录的列表作为map任务的输入,每个任务会完成源列表中部分...

    一、概述


           DistCp(分布式拷贝)是用于大规模集群内部和集群之间拷贝的工具。
    它使用Map/Reduce实现文件分发,错误处理和恢复,以及报告生成。
    它把文件和目录的列表作为map任务的输入,每个任务会完成源列表中部分文件的拷贝。
    由于使用了Map/Reduce方法,这个工具在语义和执行上都会有特殊的地方。


    二、使用DistCp

     
     DistCp最常用于在集群之间的拷贝:
    hadoop distcp hdfs://nn1:8020/source hdfs://nn2:8020/destination
    上述命令会把nn1集群的/source目录下的所有文件或目录展开并存储到一个临时文件中,这些文件内容的拷贝工作被分配给多个map任务,
    然后每个NodeManager分别执行从nn1到nn2的拷贝操作。注意:DistCp使用绝对路径进行操作。
    命令行中还可以指定多个源目录:
    hadoop distcp hdfs://nn1:8020/source/a hdfs://nn1:8020/source/b hdfs://nn2:8020/destination
    或者使用-f选项,从文件里获得多个源:
    hadoop distcp -f hdfs://nn1:8020/srclist hdfs://nn2:8020/destination
    其中srclist 的内容是:
    hdfs://nn1:8020/source/a
    hdfs://nn1:8020/source/b
    当从多个源拷贝时,如果两个源冲突,DistCp会停止拷贝并提示出错信息, 如果在目的位置发生冲突,会根据选项设置解决。
    默认情况会跳过已经存在的目标文件(比如不用源文件做替换操作)。每次操作结束时
    都会报告跳过的文件数目,但是如果某些拷贝操作失败了,但在之后的尝试成功了, 那么报告的信息可能不够精确。
    每个Datanode必须都能够与源宿端Datanode进行访问和交互。

    每个Datanode必须都能够与源宿端Datanode进行访问和交互。
    对于HDFS来说,源和目的端要运行相同版本的协议或者使用向下兼容的协议。详细请参阅“在版本之间复制”章节。拷贝完成后,建议生成
    源端和目的端文件的列表,并交叉检查,来确认拷贝是否真正成功。 因为DistCp使用Map/Reduce和FileSystem
    API进行操作,所以这三者或它们之间有任何问题
    都会影响拷贝操作。一些Distcp命令的成功执行可以通过再次执行带-update参数的该命令来完成,
    但用户在如此操作之前应该对该命令的语法很熟悉。
    值得注意的是,当另一个客户端同时在向源文件写入时,拷贝很有可能会失败。 尝试覆盖HDFS上正在被写入的文件的操作也会失败。
    如果一个源文件在拷贝之前被移动或删除了,拷贝失败同时输出异常 FileNotFoundException。


    三、命令行选项


    DistCp命令行选项如表1所示:

    标识描述备注
    -p[rbugp]Preserve
    r:replication number
    b: block size
    u: user
    g: group
    p: permission
    -i忽略失败这个选项会比默认情况提供关于拷贝的更精确的统计,
    -log <logdir>记录日志到 <logdir>
    -m
    <num_maps>
    同时拷贝的最大数目指定了拷贝数据时map的数目。请注意并不是map数越多吞吐量越大。
    -overwrite覆盖目标用户要小心使用这个选项。
    -update如果源和目标的大小不一样则进行覆盖用户使用要小心。
    -f
    <urilist_uri>
    用<urilist_uri> 作为源文件列表这等价于把所有文件名列在命令行中。 urilist_uri 列表应该是完整合法的URI。


    修改次数不会被保留。并且当指定 -update 时,更新的状态不会 被同步,除非文件大小不同(比如文
    同时它还将保留失败拷贝操作的日志,这些日志信息可以用于调试。最后,如果一个map失败了,但并
    DistCp为每个文件的每次尝试拷贝操作都记录日志,并把日志作为map的输出。 如果一个map失败了
    如果一个map失败并且没有使用-i选项,不仅仅那些拷贝失败的文件,这个分块任务中的所有文件都会
    这不是"同步"操作。 执行覆盖的唯一标准是源文件和目标文件大小是否相同;如果不同,则源文件替
    表1 DistCp命令行参数表


    四、更新与覆盖


    DistCp –update选项用于复制目标中不存在的文件或者文件内容不相同的文件。DistCp
    -overwrite选项将覆盖目标文件,即使它们存在于源中,或者它们具有相同的内容。-update和-overwrite选项值得进一步讨论,因为它们处
    理源路径的方式与默认值不同,有些细节需要注意。
    这里给出一些 -update和 -overwrite的例子。考虑从/source/first/ 和 /source/second/ 到 /target/的拷贝,源路径包括:
    hdfs://nn1:8020/source/first/1
    hdfs://nn1:8020/source/first/2
    hdfs://nn1:8020/source/second/10
    hdfs://nn1:8020/source/second/20
    当不使用-update或-overwrite选项时,DistCp默认会在/target下创建/first和/second目录。因此将在/target之前先创建目录。
    从而:
    hadoop distcp hdfs://nn1:8020/source/first hdfs://nn1:8020/source/second hdfs://nn2:8020/target
    上述命令将在/target中生成以下内容:
    hdfs://nn2:8020/target/first/1
    hdfs://nn2:8020/target/first/2
    hdfs://nn2:8020/target/second/10
    hdfs://nn2:8020/target/second/20
    当指定-update或-overwrite时,源目录的内容将复制到目标,而不是源目录本身。 从而:
    distcp -update hdfs://nn1:8020/source/first hdfs://nn1:8020/source/second hdfs://nn2:8020/target
    上述命令将在/ target中生成以下内容:
    hdfs://nn2:8020/target/1
    hdfs://nn2:8020/target/2
    hdfs://nn2:8020/target/10
    hdfs://nn2:8020/target/20
    如果设置了这两个选项,每个源目录的内容都会和目标目录的内容做比较。如果两个源文件夹都包含一个具有相同名称的文件(例如“0”
    ),那么这两个源文件将在目的地映射到同一个目录:/target/0。DistCp碰到这类冲突的情况会终止操作并退出。
    现在,请考虑以下复制操作:
    distcp hdfs://nn1:8020/source/first hdfs://nn1:8020/source/second hdfs://nn2:8020/target
    其中源路径/大小:
    hdfs://nn1:8020/source/first/1 32
    hdfs://nn1:8020/source/first/2 32
    hdfs://nn1:8020/source/second/10 64
    hdfs://nn1:8020/source/second/20 32
    和目的路径/大小:
    hdfs://nn2:8020/target/1 32
    hdfs://nn2:8020/target/10 32
    hdfs://nn2:8020/target/20 64
    会产生:
    hdfs://nn2:8020/target/1 32
    hdfs://nn2:8020/target/2 32
    hdfs://nn2:8020/target/10 64
    hdfs://nn2:8020/target/20 32
    文件“1”因为文件长度和内容匹配而被跳过。
    文件“2”被复制,因为它不存在/target中。因为目标文件内容与源文件内容不匹配,文件“10”和文件“20”被覆盖。如果使用-update
    选项,文件“1”也被覆盖。


    五、DistCp命令的安全设置


    安全设置揭示了DistCp命令是运行在源集群上还是运行在目标集群上。一般认为,如果一个集群是安全的,另一个集群是不安全的,则Dist
    Cp应该从安全集群运行,否则可能存在与安全相关的问题。
    将数据从安全集群复制到非安全集群时,DistCp客户端需要设置如下配置:
    <property>
    <name>ipc.client.fallback-to-simple-auth-allowed</name>
    <value>true</value>
    </property>
    将数据从安全集群复制到安全集群时,core-site.xml文件中需要设置如下配置:
    <property>
    <name>hadoop.security.auth_to_local</name>
    <value></value>
    <description>Maps kerberos principals to local user names</description>
    </property>


    六、安全保护:Kerberos Principal Name


    distcp hdfs://cluster1-secure hdfs://cluuter2-secure
    考虑上述命令,这里存在一个问题:SASL RPC客户端要求远程服务器的Kerberos Principal
    Name必须与自己的配置中的服务器主体匹配。 因此,必须将相同的主体名称分配给源和目标集群中相关的NameNodes。
    例如,如果源集群中的NameNode的Kerberos Principal为nn/host1@realm,则目标集群中NameNode的Kerberos
    Principal也必须为nn/host2@realm,而不能为nn2/host2@realm。


    七、安全保护:ResourceManager映射规则


    当在两个安全的BCH集群间复制文件,如果两个集群存在不同的区域,那么需要进一步配置ResourceManager(RM)。为了使DistCP命
    令成功,必须在两个集群中使用相同的RM映射规则。
    例如,如果安全集群1具有以下RM映射规则:
    <property>
    <name>hadoop.security.auth_to_local</name>
    <value>
    RULE:[2:$1@$0](rm@.*SEC1.SUP1.COM)s/.*/yarn/
    DEFAULT
    </value>
    </property>
    并且安全集群2具有以下RM映射规则:
    <property>
    <name>hadoop.security.auth_to_local</name>
    <value>
    RULE:[2:$1@$0](rm@.*BA.YISEC3.COM)s/.*/yarn/
    DEFAULT
    </value>
    </property>
    从集群1到集群2的DistCp作业将会失败,因为集群2中的RM映射规则与群集1中的RM映射规则不同,导致集群2无法正确解析集群1
    中的RM映射规则供yarn使用。
    解决方案是:在集群1和集群2中使用相同的RM映射规则。
    <property>
    <name>hadoop.security.auth_to_local</name>
    <value>
    RULE:[2:$1@$0](rm@.*SEC1.SUP1.COM)s/.*/yarn/
    RULE:[2:$1@$0](rm@.*BA.YISEC3.COM)s/.*/yarn/
    DEFAULT
    </value>
    </property>


    八 、HADistCp


    在HA集群之间复制数据,需要在hdfs-site.xml文件中设置dfs.internal.nameservices属性来显式指定属于本地群集的名称服务,同时设置d
    fs.nameservices属性来指定在本地和远程集群中的所有名称服务。
    使用以下步骤在HA群集之间复制数据:
    (1)设置dfs.nameservices = HAA, HAB。
    (2)添加dfs.internal.nameservices属性:
    集群A:
    dfs.internal.nameservices = HAA
    集群B:
    dfs.internal.nameservices = HAB
    (3)将dfs.ha.namenodes. <nameservice>添加到两个集群中:
    集群A:
    dfs.ha.namenodes.HAB = nn1,nn2
    集群B:
    dfs.ha.namenodes.HAA = nn1,nn2
    (4)添加dfs.namenode.rpc-address. <cluster>.<nn>属性:
    集群A:
    dfs.namenode.rpc-address.HAB.nn1 = <NN1_fqdn>:8020
    dfs.namenode.rpc-address.HAB.nn2 = <NN2_fqdn>:8020
    集群B:
    dfs.namenode.rpc-address.HAA.nn1 = <NN1_fqdn>:8020
    dfs.namenode.rpc-address.HAA.nn2 = <NN2_fqdn>:8020
    (5)添加dfs.client.failover.proxy.provider. <cluster>属性:
    集群A:
    dfs.client.failover.proxy.provider.HAB=
    org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider
    集群B:
    dfs.client.failover.proxy.provider.HAA=
    org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider
    (6)重新启动HDFS服务,然后使用NameService运行distcp命令。 例如:
    hadoop distcp hdfs://falconG/tmp/testDistcp hdfs://falconE/tmp/


    九、DistCp和Hadoop版本


    源和目标集群的Hadoop版本决定应使用哪种类型的文件系统来从源集群读取并写入目标集群。BCH为Hadoop2的版本,可能存在这样的场
    景,已有的集群(可能是Hadoop1版本)需要向BCH集群拷贝数据,由于Hadoop
    1.x和2.x具有不同的RPC版本,客户端无法同时理解,因此对源和目的都不可能使用“hdfs”。在这种情况下,WebHdfsFilesystem(webh
    dfs://)可以在源和目标集群中使用,或者HftpFilesystem(hftp://)可用于从源集群读取数据。


    十、DistCp数据复制矩阵:Hadoop1 / Hadoop2到Hadoop2


    表2提供了使用DistCp将数据从HDP1或HDP2群集复制到HDP2群集时的配置,设置和结果的相关信息。

    目的源配置目的配置DistCp应当运行环境结果
    Hadoop1Hadoop2insecure + hdfsinsecure + webhdfsHadoop1 (source)success
    Hadoop1Hadoop2secure + hdfssecure + webhdfsHadoop1 (source)success
    Hadoop1Hadoop2secure + hftpsecure + hdfsHadoop2 (destination)success



    Hadoop1Hadoop2secure + hftpsecure + swebhdfsHadoop2(destination)success
    Hadoop1Hadoop2secure + hdfsinsecure + webhdfsHadoop1 (source)Possible issues are discussed here.
    Hadoop2Hadoop2secure + hdfsinsecure + hdfssecure Hadoop2 (source)success
    Hadoop2Hadoop2secure + hdfssecure + hdfseither Hadoop2 (source or destination)success
    Hadoop2Hadoop2secure + hdfssecure + webhdfsHadoop2 (source)success
    Hadoop2Hadoop2secure + hftpsecure + hdfsHadoop2 (destination)success


    表2 使用DistCp将数据从Hadoop1或Hadoop2群集复制到Hadoop2群集时的相关信息
    对于上表:
    (1)术语“安全”意味着设置了Kerberos安全性。
    (2)hsftp可用于Hadoop1和Hadoop2。 它向hftp添加了https支持。


    十一、 从Hadoop2复制数据到Hadoop1群集


    DistCp支持从Hadoop1集群复制数据到Hadoop2群集,但Hadoop1集群无法意识Hadoop2集群中引入的Checksum机制。使用DistCp从
    Hadoop2复制数据到Hadoop1需满足如下条件:跳过checksum和或者确保要复制的文件位于CRC32中。


    十二、 DistCp架构


    DistCp由以下组件组成: Distcp驱动程序、 复制列表生成器、 InputFormats和MapReduce组件。


    12.1、Distcp驱动程序


    DistCp驱动程序组件负责解析命令行中传递给DistCp命令的参数。它通过OptionsParser和DistCpOptionsSwitch来完成上述功能。将命令
    参数组成适当的DistCpOptions对象,并初始化DistCp。 这些参数包括:来源路径、目标位置、复制选项(例如是否更新 -
    复制,覆盖,要保存的文件属性等)。
    通过以下方式协调复制操作:
    (1)调用复制列表生成器来创建要复制的文件列表。
    (2)设置和启动Hadoop MapReduce作业以执行复制操作。
    (3)根据选项,要么立即将句柄返回给Hadoop MapReduce作业,要么等到完成。
    解析器元素只能在命令行执行(或者调用了DistCp ::
    run()方法)。通过构造DistCpOptions对象并适当地初始化DistCp对象,DistCp类也可以以编程方式使用。


    12.2、复制列表生成器


    复制列表生成器类负责创建要从源复制的文件/目录的列表。
    他们检查源路径(文件/目录,包括通配符)的内容,并将需要复制的所有路径记录到SequenceFile中以供DistCp Hadoop作业使用。
    该模块的主要类包括:
    (1)
    CopyListing:任何复制列表生成器都需要实现CopyListing接口。该接口还提供了工厂方法给实现了该接口的复制列表生成器类选择使用。
    (2)SimpleCopyListing:该类实现了CopyListing接口,可以接受多个源路径(文件/目录),并递归列出每个副本的所有单个文件和目
    录。
    (3)GlobbedCopyListing:增加了在源路径中扩展通配符功能的CopyListing的另一个实现。
    (4)FileBasedCopyListing:从指定文件读取源路径列表的CopyListing的实现。

    根据DistCpOptions中是否指定了源文件列表,源列表以下列方式中的某一种方式生成:
    (1)如果没有源文件列表,则使用GlobbedCopyListing。
    所有通配符都被扩展,所有的扩展都转发到SimpleCopyListing,而SimpleCopyListing反过来构造列表(通过每个路径的递归下降)。
    (2)如果指定了源文件列表,则使用FileBasedCopyListing。 源路径从指定的文件读取,然后转发到GlobbedCopyListing。
    然后如上所述构建列表。
    您也可以通过提供自定义的CopyListing接口的实现来自定义构建复制列表的方法。 在如何考虑复制路径时,DistCp的行为不同于传统的Dis
    tCp。传统实现仅列出必须将其复制到目标上的路径。
    例如,如果文件已存在于目标(并且未指定-overwrite),那么在MapReduce复制作业中甚至不考虑该文件。
    在设置期间(即在MapReduce作业之前)确定这一点涉及文件大小和校验和比较,这存在潜在的耗时。DistCp推迟这样的检查,直到MapR
    educe工作,从而减少设置时间。 通过在多个Maps中并行检查,性能会得到进一步增强。


    12.3、InputFormats和MapReduce组件


    InputFormats和MapReduce组件负责将文件和目录从源复制到目标路径。
    在复制列表生成期间创建的列表文件在执行复制时会被消耗。 这里相关类包括:
    UniformSizeInputFormat:org.apache.hadoop.mapreduce.InputFormat的实现类,该类提供了与Legacy
    DistCp的等价性,以平衡多个Map之间的负载。UniformSizeInputFormat的目的是使每个Map复制大致相同的字节数。因此,列表文件被
    分割成路径组,使得每个InputSplit中的文件大小的总和几乎相等。 分裂并不总是完美的,但这的确降低了设置时间。
    DynamicInputFormat和DynamicRecordReader:DynamicInputFormat实现了org.apache.hadoop.mapreduce.InputFormat接口,是
    DistCp的新功能。 列表文件被分成几个“块文件”,块文件的确切数量是Hadoop作业中请求的Map数量的倍数。
    在启动作业之前,每个Map任务都将“分配”其中一个块文件(通过将块重命名为任务的id)。
    使用DynamicRecordReader从每个块读取路径,并在CopyMapper中处理。
    在处理了一个组块中的所有路径之后,当前组块被删除并尝试获取一个新的组块。 该过程继续,直到没有更多的块可用。
    这种“动态”方法允许更快的Map任务,这比慢的Map消耗更多路径,从而加快了DistCp作业的整体运行。
    CopyMapper:此类用于实现物理文件复制。 根据输入选项(在作业配置中指定)检查输入路径,以确定是否需要复制文件。
    当且仅当以下至少一个为真时才会复制该文件:
    (1) 目标不存在同名的文件。
    (2) 具有相同名称的文件存在于目标,但具有不同的文件大小。
    (3)目标上存在同名的文件,但是具有不同的校验和,并且没有提到skipcrccheck。
    (4) 目标中存在同名的文件,但指定了-overwrite。
    (5)具有相同名称的文件存在于目标位置,但块大小不同(需要保留块大小)。
    CopyCommitter:该类负责DistCp作业的提交阶段,包括:保存目录权限(如果在选项中指定)、清理临时文件,工作目录等。

    展开全文
  • Distcp工具深入分析

    2021-03-23 14:06:08
    DistCp命令是hadoop用户最常使用的命令之一,它位于hadooptools包中,代码不多,约1300多行,主要用于在两个HDFS集群之间快速拷贝数据。DistCp工具代码结构清晰易懂,通过分析该工具的代码  引言  DistCp命令是...
  • distcp流程分析

    千次阅读 2019-10-23 20:29:17
    distcp可用于跨集群或集群内目录的复制,distcp参数不同复制的结果差别较大。本文结合官网及源码,对distcp流程进行分析,并结合测试给出验证。 使用 1. shell 目标目录父目录不存在时,可以自动建立多层目标目录...

    背景

    distcp可用于跨集群或集群内目录的复制,distcp参数不同复制的结果差别较大。本文结合官网及源码,对distcp流程进行分析,并结合测试给出验证。

    使用

    1. shell

    目标目录父目录不存在时,可以自动建立多层目标目录。

    1. 文件复制

    文件复制时,相当于cp,因为hdfs无法并行写。将a.txt复制并重命令为a_bak.txt

    $ hadoop distcp \
    hdfs://cluster-host1:9000/v2-ec-source/2019/07/02/_SUCCESS \
    hdfs://10.179.25.59:9000/v2-ec-source/2019/07/02/_SUCCESS_bak
    

    2. 文件夹复制

    distcp主要为复制文件夹服务。一个文件只能分配到一个map。因此,在文件夹中有多个文件时,可以发挥并行优势。

    $ hadoop distcp \
    hdfs://cluster-host1:9000/v2-ec-source/2019/07/02 \
    hdfs://10.179.25.59:9000/v2-ec-source/2019/bak
    

    将02下的所有文件,复制到bak目录下。

    多文件夹复制

    将多个文件夹复制到目标文件夹下,并且多个文件夹最后一个文件夹名保留。

    $ hadoop distcp \
    hdfs://cluster-host1:9000/v2-ec-source/2019/06 \
    hdfs://cluster-host1:9000/v2-ec-source/2019/07 \
    hdfs://10.179.25.59:9000/v2-ec-source/2019/mult
    

    目标文件夹结果:

    $ hadoop fs -ls /v2-ec-source/2019/mult
    ... 2019-10-23 20:44 /v2-ec-source/2019/mult/06
    ... 2019-10-23 20:44 /v2-ec-source/2019/mult/07
    

    当从多个源拷贝时,如果两个源冲突,DistCp会停止拷贝并提示出错信息, 如果在目的位置发生冲突,会根据选项设置解决。 默认情况会跳过已经存在的目标文件(比如不用源文件做替换操作)。每次操作结束时都会报告跳过的文件数目,但是如果某些拷贝操作失败了,但在之后的尝试成功了。

    值得注意的是,当另一个客户端同时在向源文件写入时,拷贝很有可能会失败。 尝试覆盖HDFS上正在被写入的文件的操作也会失败。 如果一个源文件在拷贝之前被移动或删除了,拷贝失败同时输出异常 FileNotFoundException。

    2. distcp源码分析

    1.shell入口:

    hadoop-common-project/hadoop-common/src/main/bin下以hadoop文件为入口:

    ...
        elif [ "$COMMAND" = "distcp" ] ; then
          CLASS=org.apache.hadoop.tools.DistCp
          CLASSPATH=${CLASSPATH}:${TOOL_PATH}
    ...
    	# Always respect HADOOP_OPTS and HADOOP_CLIENT_OPTS
        HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS"
    
        #make sure security appender is turned off
        HADOOP_OPTS="$HADOOP_OPTS -Dhadoop.security.logger=${HADOOP_SECURITY_LOGGER:-INFO,NullAppender}"
    
        export CLASSPATH=$CLASSPATH
        exec "$JAVA" $JAVA_HEAP_MAX $HADOOP_OPTS $CLASS "$@"
        ;;
    

    进入 org.apache.hadoop.tools.DistCp
    我们看到Distcp是一个ToolToolRunner应用(如果不熟悉hadoop的ToolRunner模式,请参看本博客ToolRunner文章)。Tool应用要求实现run方法,如下:

    public class DistCp extends Configured implements Tool {
      ...
      // Tool应用必须的run方法
      @Override
      public int run(String[] argv) {
        // 下文具体分析时给出源码
      }
      
      // main方法,也是shell的入口
      public static void main(String argv[]) {
        int exitCode;
        try {
          DistCp distCp = new DistCp();
          Cleanup CLEANUP = new Cleanup(distCp);
    
          ShutdownHookManager.get().addShutdownHook(CLEANUP,
            SHUTDOWN_HOOK_PRIORITY);
          exitCode = ToolRunner.run(getDefaultConf(), distCp, argv);
        }
        catch (Exception e) {
          LOG.error("Couldn't complete DistCp operation: ", e);
          exitCode = DistCpConstants.UNKNOWN_ERROR;
        }
        System.exit(exitCode);
      }
    }
    

    准备工作

    我们从main函数入手,先看main的准备工作。

    1. 构造器
    2. cleanup:
      private static class Cleanup implements Runnable {
        private final DistCp distCp;
    
        Cleanup(DistCp distCp) {
          this.distCp = distCp;
        }
    
        @Override
        public void run() {
          if (distCp.isSubmitted()) return;
    
          distCp.cleanup();
        }
      }
    ...
      // 清理方法
      private synchronized void cleanup() {
        try {
          if (metaFolder == null) return;
    
          jobFS.delete(metaFolder, true);
          metaFolder = null;
        } catch (IOException e) {
          LOG.error("Unable to cleanup meta folder: " + metaFolder, e);
        }
      }
    

    如果distcp实例未提交任务,则删除metaFolder,并另metaFolder = null
    至于metaFolder作用,下文分析。

    1. ShutdownHookManager工具类(可参看其他文章)。

    执行

    disctp使用ToolRunner.run执行任务。

          exitCode = ToolRunner.run(getDefaultConf(), distCp, argv);
    
    

    我们回到run方法中。

     @Override
      public int run(String[] argv) {
        if (argv.length < 1) {
          OptionsParser.usage();
          return DistCpConstants.INVALID_ARGUMENT;
        }
        
        try {
          inputOptions = (OptionsParser.parse(argv));
          setTargetPathExists();
          LOG.info("Input Options: " + inputOptions);
        } catch (Throwable e) {
          LOG.error("Invalid arguments: ", e);
          System.err.println("Invalid arguments: " + e.getMessage());
          OptionsParser.usage();      
          return DistCpConstants.INVALID_ARGUMENT;
        }
        
        try {
          execute();
        } catch (InvalidInputException e) {
          LOG.error("Invalid input: ", e);
          return DistCpConstants.INVALID_ARGUMENT;
        } catch (DuplicateFileException e) {
          LOG.error("Duplicate files in input path: ", e);
          return DistCpConstants.DUPLICATE_INPUT;
        } catch (AclsNotSupportedException e) {
          LOG.error("ACLs not supported on at least one file system: ", e);
          return DistCpConstants.ACLS_NOT_SUPPORTED;
        } catch (XAttrsNotSupportedException e) {
          LOG.error("XAttrs not supported on at least one file system: ", e);
          return DistCpConstants.XATTRS_NOT_SUPPORTED;
        } catch (Exception e) {
          LOG.error("Exception encountered ", e);
          return DistCpConstants.UNKNOWN_ERROR;
        }
        return DistCpConstants.SUCCESS;
      }
    
    1. OptionsParser类是distcp单独实现的参数解析工具类。将输入参数解析成DistCpOptions inputOptions类型。如常见的参数overwrite = false等等。作为工具类,暂时忽略。
    2. setTargetPathExists():从参数中解析出目标路径。
      /**
       * 为了优化copy,在输入选项中和job配置中,都加入了目标路径
       */
      private void setTargetPathExists() throws IOException {
        Path target = inputOptions.getTargetPath();
        FileSystem targetFS = target.getFileSystem(getConf());
        boolean targetExists = targetFS.exists(target);
        inputOptions.setTargetPathExists(targetExists);
        getConf().setBoolean(DistCpConstants.CONF_LABEL_TARGET_PATH_EXISTS, 
            targetExists);
      }
    
    1. execute():核心执行方法。
      前述的多数方法只是对路径等做一些检查以及对execute()方法做异常处理,而execute方法则是任务执行方法。
      public Job execute() throws Exception {
        Job job = createAndSubmitJob();
    
        if (inputOptions.shouldBlock()) {
          waitForJobCompletion(job);
        }
        return job;
      }
    

    在execute()方法中,会调用createAndSubmitJob()创建MR任务,准备数据,设定数据输入格式,并把任务提交到hadoop集群运行,最后等待任务执行完毕。于是我们可以看到,主体功能实现就在createAndSubmitJob()这个函数体里,工程中其它的各个类无非都是为这个函数接口服务的。下面就是createAndSubmitJob()的代码,这里删除了一些不影响阅读的源码,只留下主体功能流程。

      public Job createAndSubmitJob() throws Exception {
        assert inputOptions != null;
        assert getConf() != null;
        Job job = null;
        try {
          synchronized(this) {
            //Don't cleanup while we are setting up.
            metaFolder = createMetaFolderPath();
            jobFS = metaFolder.getFileSystem(getConf());
            job = createJob();
          }
          if (inputOptions.shouldUseDiff()) {
            if (!DistCpSync.sync(inputOptions, getConf())) {
              inputOptions.disableUsingDiff();
            }
          }
          createInputFileListing(job);
    
          job.submit();
          submitted = true;
        } finally {
          if (!submitted) {
            cleanup();
          }
        }
    
        String jobID = job.getJobID().toString();
        job.getConfiguration().set(DistCpConstants.CONF_LABEL_DISTCP_JOB_ID, jobID);
        LOG.info("DistCp job-id: " + jobID);
    
        return job;
      }
    
    metaFolder

    metaFolder是一个Path类型,private Path metaFolder; metafolder是DISTCP工具准备元数据的地方,在createMetaFolderPath()中会结合一个随机数生成一个工作目录,在这个目录中迟点会通过getFileListingPath()生成fileList.seq文件,然后往这个文件中写入数据,这是一个SequenceFile文件,即Key/Value结构的序列化文件,这个文件里将存放所有需要拷贝的源目录/文件信息列表。其中Key是源文件的Text格式的相对路径,即relPath;而Value则记录源文件的FileStatus格式的org.apache.hadoop.fs.FileStatus信息,这里FileStatus是hadoop已经封装好了的描述HDFS文件信息的类,但是DISTCP为了更好的处理数据,重新继承并封装了CopyListingFileStatus类,其描述如下图1,不过我们其实可以认为这里的Value就是FileStatus即可。metafolder目录中的fileList.seq最终会作为参数传递给MR任务中的Mapper。

      private Path createMetaFolderPath() throws Exception {
        Configuration configuration = getConf();
        Path stagingDir = JobSubmissionFiles.getStagingDir(
                new Cluster(configuration), configuration);
        Path metaFolderPath = new Path(stagingDir, PREFIX + String.valueOf(rand.nextInt()));
        if (LOG.isDebugEnabled())
          LOG.debug("Meta folder location: " + metaFolderPath);
        configuration.set(DistCpConstants.CONF_LABEL_META_FOLDER, metaFolderPath.toString());    
        return metaFolderPath;
      }
    

    开启debug模式后,我们可以看到metaFolder,并能查看内容:

    19/12/31 12:08:25 DEBUG tools.DistCp: Meta folder location: /tmp/hadoop-yarn/staging/hadoop/.staging/_distcp714835269
    
    Found 2 items
    -rw-r--r--   2 hadoop supergroup        840 2019-11-03 17:48 /tmp/hadoop-yarn/staging/hadoop/.staging/_distcp987712852/fileList.seq
    -rw-r--r--   2 hadoop supergroup        740 2019-11-03 17:48 /tmp/hadoop-yarn/staging/hadoop/.staging/_distcp987712852/fileList.seq_sorted
    
    job = createJob()

    生成常规的MR job,源码如下:

      private Job createJob() throws IOException {
        String jobName = "distcp";
        String userChosenName = getConf().get(JobContext.JOB_NAME);
        if (userChosenName != null)
          jobName += ": " + userChosenName;
        Job job = Job.getInstance(getConf());
        job.setJobName(jobName);
        job.setInputFormatClass(DistCpUtils.getStrategy(getConf(), inputOptions));
        job.setJarByClass(CopyMapper.class);
        configureOutputFormat(job);
    
        job.setMapperClass(CopyMapper.class);
        job.setNumReduceTasks(0);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputFormatClass(CopyOutputFormat.class);
        job.getConfiguration().set(JobContext.MAP_SPECULATIVE, "false");
        job.getConfiguration().set(JobContext.NUM_MAPS,
                      String.valueOf(inputOptions.getMaxMaps()));
    
        if (inputOptions.getSslConfigurationFile() != null) {
          setupSSLConfig(job);
        }
    
        inputOptions.appendToConf(job.getConfiguration());
        return job;
      }
    

    可以看到只有map作业,没有reduce。我们先看重点代码:

    • job.setInputFormatClass(DistCpUtils.getStrategy(getConf(), inputOptions));
    • job.setMapperClass(CopyMapper.class);

    我们在设置MapReduce输入格式的时候,会调用上面第一行这样一条语句,这条语句保证了输入文件会按照我们预设的格式被读取。setInputFormatClass里设定了Mapper的数据读取格式,也就是由getStrategy(getConf(), inputOptions)得到,进到这个函数里面,可以看到最终Mapper数据输入格式由UniformSizeInputFormat.class这个类定义的,而这个类继承自InputFormat.class,MR中所有的输入格式类都继承自InputFormat,这是一个抽象类。

    public static Class<? extends InputFormat> getStrategy(Configuration conf,
                                                                     DistCpOptions options) {
        String confLabel = "distcp."
            + StringUtils.toLowerCase(options.getCopyStrategy())
            + ".strategy" + ".impl";
        return conf.getClass(confLabel, UniformSizeInputFormat.class, InputFormat.class);
      }
    

    通过名称获取,默认情况confLabel = distcp.uniformsize.strategy.impl,getClass的第二个参数是默认值,即输入格式默认是UniformSizeInputFormat.class

    InputFormat抽象类仅有两个抽象方法:

    • List<InputSplit>getSplits(),获取由输入文件计算出输入分片(InputSplit),解决数据或文件分割成片问题。
    • RecordReader<K,V>createRecordReader(),创建RecordReader,从InputSplit中读取数据,解决读取分片中数据问题。

    通过InputFormat,Mapreduce框架可以做到:

    • 验证作业输入的正确性;
    • 将输入文件切割成逻辑分片(InputSplit),一个InputSplit将会被分配给一个独立的MapTask;
    • 提供RecordReader实现,读取InputSplit中的“K-V对”供Mapper使用。

    在DISTCP中,UniformSizeInputFormat继承了InputFormat并实现了数据读入格式,它会读取metafolder中fileList.seq序列化文件的内容,并根据用户设定的map数和拷贝总数据量进行分片,计算出分多少片,最终提供“K-V”对供Mapper使用。这个类的源码实现并不复杂,加上注释一共也才100多行,很容易就能读懂。

    CopyMapper.class中则定义了每个map的工作逻辑,也就是拷贝的核心逻辑,任务提交到hadoop集群中运行时每个map就是根据这个逻辑进行工作的,通过setMapperClass设定。这里要注意的是DISTCP任务只有map没有reduce,因为只需要map就可以完成拷贝数据的工作。CopyMapper的源码实现在org.apache.hadoop.tools.mapred这个包中,CopyMapper里最核心实现是setup()和map()这两个方法,这两个方法其实也是MR中Mapper的固有通用方法,setup()中完成map方法的一些初始化工作,在DISTCP中,这个方法里会设定对端的目标路径,并做一些参数设置和判断工作,源码(删掉了参数设置部分)如下:

     @Override
      public void setup(Context context) throws IOException, InterruptedException {
        conf = context.getConfiguration();
    
        syncFolders = conf.getBoolean(DistCpOptionSwitch.SYNC_FOLDERS.getConfigLabel(), false);
        ignoreFailures = conf.getBoolean(DistCpOptionSwitch.IGNORE_FAILURES.getConfigLabel(), false);
        skipCrc = conf.getBoolean(DistCpOptionSwitch.SKIP_CRC.getConfigLabel(), false);
        overWrite = conf.getBoolean(DistCpOptionSwitch.OVERWRITE.getConfigLabel(), false);
        append = conf.getBoolean(DistCpOptionSwitch.APPEND.getConfigLabel(), false);
        preserve = DistCpUtils.unpackAttributes(conf.get(DistCpOptionSwitch.
            PRESERVE_STATUS.getConfigLabel()));
    
        targetWorkPath = new Path(conf.get(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH));
        Path targetFinalPath = new Path(conf.get(
                DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH));
        targetFS = targetFinalPath.getFileSystem(conf);
    
    	// 目标路径存在且是一个文件,则覆盖掉
        if (targetFS.exists(targetFinalPath) && targetFS.isFile(targetFinalPath)) {
          overWrite = true; // When target is an existing file, overwrite it.
        }
    
        if (conf.get(DistCpConstants.CONF_LABEL_SSL_CONF) != null) {
          initializeSSLConf(context);
        }
      }
    
    map
      @Override
      public void map(Text relPath, CopyListingFileStatus sourceFileStatus,
              Context context) throws IOException, InterruptedException {
        Path sourcePath = sourceFileStatus.getPath();
    
        if (LOG.isDebugEnabled())
          LOG.debug("DistCpMapper::map(): Received " + sourcePath + ", " + relPath);
    
        Path target = new Path(targetWorkPath.makeQualified(targetFS.getUri(),
                              targetFS.getWorkingDirectory()) + relPath.toString());
    
        EnumSet<DistCpOptions.FileAttribute> fileAttributes
                = getFileAttributeSettings(context);
        final boolean preserveRawXattrs = context.getConfiguration().getBoolean(
            DistCpConstants.CONF_LABEL_PRESERVE_RAWXATTRS, false);
    
        final String description = "Copying " + sourcePath + " to " + target;
        context.setStatus(description);
    
        LOG.info(description);
    
        try {
          CopyListingFileStatus sourceCurrStatus;
          FileSystem sourceFS;
          try {
            sourceFS = sourcePath.getFileSystem(conf);
            final boolean preserveXAttrs =
                fileAttributes.contains(FileAttribute.XATTR);
            sourceCurrStatus = DistCpUtils.toCopyListingFileStatus(sourceFS,
              sourceFS.getFileStatus(sourcePath),
              fileAttributes.contains(FileAttribute.ACL), 
              preserveXAttrs, preserveRawXattrs);
          } catch (FileNotFoundException e) {
            throw new IOException(new RetriableFileCopyCommand.CopyReadException(e));
          }
    
          FileStatus targetStatus = null;
    
          try {
            targetStatus = targetFS.getFileStatus(target);
          } catch (FileNotFoundException ignore) {
            if (LOG.isDebugEnabled())
              LOG.debug("Path could not be found: " + target, ignore);
          }
    
          if (targetStatus != null && (targetStatus.isDirectory() != sourceCurrStatus.isDirectory())) {
            throw new IOException("Can't replace " + target + ". Target is " +
                getFileType(targetStatus) + ", Source is " + getFileType(sourceCurrStatus));
          }
    
          if (sourceCurrStatus.isDirectory()) {
            createTargetDirsWithRetry(description, target, context);
            return;
          }
    
          FileAction action = checkUpdate(sourceFS, sourceCurrStatus, target);
          if (action == FileAction.SKIP) {
            LOG.info("Skipping copy of " + sourceCurrStatus.getPath()
                     + " to " + target);
            updateSkipCounters(context, sourceCurrStatus);
            context.write(null, new Text("SKIP: " + sourceCurrStatus.getPath()));
          } else {
            copyFileWithRetry(description, sourceCurrStatus, target, context,
                action, fileAttributes);
          }
    
          DistCpUtils.preserve(target.getFileSystem(conf), target, sourceCurrStatus,
              fileAttributes, preserveRawXattrs);
        } catch (IOException exception) {
          handleFailures(exception, sourceFileStatus, target, context);
        }
      }
    

    从输入参数可以看出来,这其实就是对上面分析过的UniformSizeInputFormat类里分片后的数据里的每一行进行处理,每行里存放的就是“K-V”对,也就是fileList.seq文件每行的内容。Map方法体前半部分有一大堆代码内容,其实都是一些准备和判断工作,为后面的拷贝服务,最终的拷贝动作发生在copyFileWithRetry(description,sourceCurrStatus, target, context, action, fileAttributes)这个函数中,进入这个函数一直往里面读,就能看到数据最终通过常用的Java输入输出流的方式完成点对点拷贝,最后拷贝方法源码如下:

       private void copyFileWithRetry(String description,
          FileStatus sourceFileStatus, Path target, Context context,
          FileAction action, EnumSet<DistCpOptions.FileAttribute> fileAttributes)
          throws IOException {
        long bytesCopied;
        try {
          bytesCopied = (Long) new RetriableFileCopyCommand(skipCrc, description,
              action).execute(sourceFileStatus, target, context, fileAttributes);
        } catch (Exception e) {
          context.setStatus("Copy Failure: " + sourceFileStatus.getPath());
          throw new IOException("File copy failed: " + sourceFileStatus.getPath() +
              " --> " + target, e);
        }
        incrementCounter(context, Counter.BYTESEXPECTED, sourceFileStatus.getLen());
        incrementCounter(context, Counter.BYTESCOPIED, bytesCopied);
        incrementCounter(context, Counter.COPY, 1);
      }
    

    最后调用copyToFile,copyBytes(sourceFileStatus, sourceOffset, outStream, BUFFER_SIZE, context);方法。

    元数据生成(createInputFileListing)

    前面提到在metafolder目录中会生成fileList.seq文件,而这个文件是怎么生成以及文件里面保存些什么内容呢?这个逻辑就在createInputFileListing(job)中完成的。
    查看 UniformSizeInputFormat 类中对 createRecordReader 的实现。

      @Override
      public RecordReader<Text, CopyListingFileStatus> createRecordReader(
          InputSplit split, TaskAttemptContext context)
          throws IOException, InterruptedException {
        return new SequenceFileRecordReader<Text, CopyListingFileStatus>();
      }
    

    回到createAndSubmitJob方法中,在job提交以前,createInputFileListing(job)。首先由getFileListingPath()方法创建一个空的seq文件,然后由buildListing()方法往这个seq文件写入数据,数据写入的整体逻辑就是遍历源路径下所有目录和文件,把每个文件的相对路径和它的CopyListingFileStatus以“K-V”对的形式写入fileList.seq每行中,最终就得到Mapper的输入了。

      protected Path createInputFileListing(Job job) throws IOException {
        Path fileListingPath = getFileListingPath();
        CopyListing copyListing = CopyListing.getCopyListing(job.getConfiguration(),
            job.getCredentials(), inputOptions);
        copyListing.buildListing(fileListingPath, inputOptions);
        return fileListingPath;
      }
    

    job提交以后,清理metaFolder,结束。

    参考文献:

    1. https://blog.csdn.net/github_34457546/article/details/69563629
    展开全文
  • Hadoop DistCp

    2017-09-11 11:57:00
    本文讲的是Hadoop DistCp,【IT168 资讯】DistCp(分布式拷贝)是用于大规模集群内部和集群之间拷贝的工具。 它使用Map/Reduce实现文件分发,错误处理和恢复,以及报告生成。 它把文件和目录的列表作为map任务的输入...
  • distcp 异常

    2018-05-08 09:20:09
    生产代码无任何变化,每天定时的distcp突然失败了。WARN [main] org.apache.hadoop.security.UserGroupInformation: PriviledgedActionException as:user (auth:SIMPLE) cause:java.io.FileNotFoundException: File ...
  • 工具-Hadoop distcp

    千次阅读 2019-03-30 21:59:52
    distcp 原理 distcp 操作方法 fs shell拷贝和移动 通常我们使用hadoop提供的fs shell来完成hdfs文件管理。为了对比dictcp,先看下常用的-cp和-mv的使用。 现有目录/lib包含文件1.data 2.data -cp如下操作 hadoop ...
  • DistCp命令是hadoop用户最常使用的命令之一,它位于hadooptools包中,代码不多,约1300多行,主要用于在两个HDFS集群之间快速拷贝数据。DistCp工具代码结构清晰易懂,通过分析该工具的代码有助于我们更好的理解MR...
  • distcp问题

    2017-10-17 13:59:47
    1 报check-sum mismatch between source_path and target_path ...hadoop distcp hdfs://xxxx:8020/mydata/hive/warehouse/db_ecar.db/bd_ads_flow_protrayal_total/ hdfs://yyyy:8020/user/hive/warehouse/d
  • Hadoop DistCp 命令

    千次阅读 2019-02-24 19:41:49
    Distcp(分布式拷贝)是用于大规模集群内部和集群之间拷贝的工具,使用Map/Reduce实现文件的分发、错误处理和恢复,以及生成相应的报告。要拷贝的文件和目录列表会作为map任务的输入,每个map任务处理部分文件的拷贝...
  • hadoop使用distcp问题解决 然后用distcp从1.0.3的集群拷数据到2.0.1的集群中。 遇到问题处理
  • java-distcp

    2021-09-29 17:30:08
    hadoop distcp hdfs://192.168.111.232:9000/temp/ hdfs://192.168.111.230:9000/temp2 查看防火墙状态 firewall-cmd --state 设置JAVA_HOME路径 whereis java export JAVA_HOME="/usr/java/jdk1.8.0_11
  • hadoop-distcp-2.6.0.jar

    2020-01-03 18:17:09
    distcp(分布式拷贝)是用于大规模集群内部和集群之间拷贝的工具。 它使用Map/Reduce实现文件分发,错误处理和恢复,以及报告生成。 它把文件和目录的列表作为map任务的输入,每个任务会完成源列表中部分文件的拷贝...
  • distcp集群数据迁移

    2019-05-22 15:58:23
    使用了distcp指令做数据迁移. ##distcp 指令:源->目标 (指令在目标端运行) 指令:hadoop distcp 源 目标 ##1、运行指令的用户,得有目标仓库的写权限 ##2、源端:必须是绝对路径(hdfs路径到表名路径) ##3、目标端:...
  • Hadoop—distcp

    2016-01-31 11:00:55
    DistCp(分布式拷贝)是用于大规模集群内部和集群之间拷贝的工具。 它使用Map/Reduce实现文件分发,错误处理和恢复,以及报告生成。 它把文件和目录的列表作为map任务的输入,每个任务会完成源列表中部分文件的拷贝...
  • Hadoop中distcp命令

    千次阅读 2019-01-14 18:22:25
    Hadoop中distcp命令 1.什么是distcp命令? Hadoop comes with a useful program called distcp for copying data to and from Hadoop filesystems in parallel. 2.distcp 是如何实现的? distcp is implemented ...
  • hadoop distcp命令简单使用

    千次阅读 2019-04-17 10:21:20
    hadoop分布式复制命令distcpdistcp一般用于在两个HDFS集群中传输数据。如果集群在hadoop的同一版本上运行,就适合使用hdfs方案。 hadoop distcp hdfs://namenode1/foo hdfs://namenode2/bar hadoop distcp ...
  • distcp使用要点

    2018-07-25 16:37:36
    引言:在公司数据迁移时,对distcp这个命令尝试了n多次,总算对他的工作原理有点心得。 1、首先确保两个集群的mapreduce计算框架没问题 2、开通目标集群所有机器到源集群namenode节点的网络 3、版本差距不是很大...
  • hadoop distcp使用

    2019-09-14 17:06:51
    distcp(分布式拷贝)是用于大规模集群内部和集群之间拷贝的工具。 它使用Map/Reduce实现文件分发,错误处理和恢复,以及报告生成。 它把文件和目录的列表作为map任务的输入,每个任务会完成源列表中部分文件的拷贝...
  • DistCp可行性分析

    2021-01-13 10:32:55
    DistCp(分布式拷贝)是用于大规模集群内部和集群之间拷贝的工具。 它使用Map/Reduce实现文件分发,错误处理和恢复,以及报告生成。 它把文件和目录的列表作为map任务的输入,每个任务会完成源列表中部分文件的拷贝...
  • at org.apache.hadoop.tools.DistCp.execute(DistCp.java:153) at org.apache.hadoop.tools.DistCp.run(DistCp.java:118) at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70) at org.apache....

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 3,923
精华内容 1,569
关键字:

distcp