精华内容
下载资源
问答
  • DataNode

    2021-01-19 11:59:39
    一、DataNode工作机制 一个数据在DataNode上以文件的形式存储在磁盘上,包括两个文件,一个是数据本身,一个是元数据(包括数据块的长度,块的校验和,以及时间戳)。 DataNode启动后像NameNode注册,通过后周期性...

    一、DataNode工作机制

    在这里插入图片描述

    1. 一个数据在DataNode上以文件的形式存储在磁盘上,包括两个文件,一个是数据本身,一个是元数据(包括数据块的长度,块的校验和,以及时间戳)。
    2. DataNode启动后像NameNode注册,通过后周期性(1小时)的向NameNode上报所有块信息
    3. NameNode与DataNode之间使用心跳进行检测节点是否可用,心跳没每3秒一次,心跳返回结果带有NameNode给该DataNode的命令,如复制块数据到另一台机器,或删除某个数据块。如果超过10分钟没有收到某个DataNode的心跳,则认为该节点不可用。
    4. 集群中可以安全的加入和退出一些机器

    二、数据完整性校验

    DataNode节点保证数据完整性的方法:

    1. 当DataNode读取Block时,它会计算CheckSum
    2. 如果计算后的CheckSum与Block创建时不一致,说明Block已经损坏
    3. Client读取其他DataNode上的Block
    4. DataNode在其文件创建后周期验证CheckSum

    三、DataNode掉线时限参数设置

    1. DataNode进程死亡或者网络故障战场DataNode无法与NameNode无法通信

    2. NameNode不会立即吧该节点判定为死亡,要经过一段时间,这段时间成为超时时长

    3. HDFS的默认超时时长为10分钟+30秒

      TimeOut = 2 * dfs.namenode.heartbeat.recheck-interval + 10 * dfs.heartbeat.interval

      默认配置:

      <property>
          <name>dfs.namenode.heartbeat.recheck-interval</name>
          <value>300000</value>
      </property>
      <property>
          <name>dfs.heartbeat.interval</name>
          <value>3</value>
      </property>
      

      hdfs-site.xml配置文件中的heartbeat.recheck.interval的单位为毫秒,dfs.heartbeat.interval的单位为秒。

    4. 可以通过修改上述两个配置修改超时时长。

    四、添加服役新数据节点

    1. 安装配置hadoop

    2. 将新的节点ip添加到/etc/hosts(hadoop105)

      192.168.1.102 hadoop102
      192.168.1.103 hadoop103
      192.168.1.104 hadoop104
      192.168.1.105 hadoop105
      
    3. slaves添加新节点

      hadoop102
      hadoop103
      hadoop104
      hadoop105
      
    4. 配置ssh免密

    5. hostsslaves分发到新节点

    6. 单独启动新节点

    五、退役旧数据节点

    5.1、添加白名单

    添加到白名单的主机节点,都允许访问NameNode,不在白名单的主机节点,都会被退出。
    步骤:

    1. 在NameNode的/opt/module/hadoop-2.7.2/etc/hadoop目录下创建dfs.hosts文件
      hadoop102
      hadoop103
      hadoop104
      

      上面四中添加了Hadoop05,现在要把它添加到白名单之外

    2. 在NameNode的hdfs-site.xml配置文件中增加dfs.hosts属性
      <property>
      <name>dfs.hosts</name>
      <value>/opt/module/hadoop-2.7.2/etc/hadoop/dfs.hosts</value>
      </property>
      
    3. 分发配置文件hdfs-site.xml
      xsync hdfs-site.xml
      
    4. 刷新NameNode
      hdfs dfsadmin -refreshNodes
      
    5. 更新ResourceManager节点
      yarn rmadmin -refreshNodes
      
    6. 在web就查看不到Hadoop105了
      在这里插入图片描述
      这种方式,是的不在白名单中的节点被排除在外了,如果该节点有数据,会造成数据不均衡
      ,使用start-balancer.sh使得集群再平衡
      [hadoop100@hadoop102 hadoop-2.7.2]$ start-balancer.sh 
      starting balancer, logging to /opt/module/hadoop-2.7.2/logs/hadoop-hadoop100-balancer-hadoop102.out
      Time Stamp               Iteration#  Bytes Already Moved  Bytes Left To Move  Bytes Being Moved
      

    5.2、添加黑名单

    在黑名单上面的主机会被强制退出

    1. 在NameNode的/opt/module/hadoop-2.7.2/etc/hadoop目录下创建dfs.hosts.exclude文件
      hadoop105
      
    2. 在NameNode的hdfs-site.xml配置文件中增加dfs.hosts.exclude属性
      <property>
      <name>dfs.hosts.exclude</name>
            <value>/opt/module/hadoop-2.7.2/etc/hadoop/dfs.hosts.exclude</value>
      </property>
      
    3. 刷新NameNode、刷新ResourceManager
      hdfs dfsadmin -refreshNodes
      yarn rmadmin -refreshNodes
      
    4. 检查Web浏览器,退役节点的状态为decommission in progress(退役中),说明数据节点正在复制块到其他节点
      在这里插入图片描述
    5. 等待退役节点状态为decommissioned(所有块已经复制完成),停止该节点及节点资源管理器。注意:如果副本数是3,服役的节点小于等于3,是不能退役成功的,需要修改副本数后才能退役
      在这里插入图片描述
    6. 停止节点
    7. 如果数据不均衡,在使用start-balancer.sh使得数据均衡

    注意:不允许白名单和黑名单中同时出现同一个主机名称

    六、DataNode多目标配置

    1. DataNode也可以配置成多个目录,每个目录存储的数据不一样。即:数据不是副本
    2. hdfs-site.xml中配置:
      <property>
              <name>dfs.datanode.data.dir</name>
      <value>file:///${hadoop.tmp.dir}/dfs/data1,file:///${hadoop.tmp.dir}/dfs/data2</value>
      </property>
      

    NameNode的多目录配置,多个目录的内容是一样的。而DataNode的多目录配置,各个目录的文件内容是不一样的,即DataNode的多目录是分开存储的。

    展开全文
  • 从节点JPS没有dataNode的解决办法

    千次阅读 2020-12-19 00:32:42
    从节点JPS没有dataNode的解决办法查看logs日志文件,找到Hadoop的安装目录下logs目录,用cat hadoop-root-datanode-localhost.localdomain.log进行日志查看1、java.net.BindException: 无法指定被请求的地址另一种...

    从节点JPS没有dataNode的解决办法

    查看logs日志文件,找到Hadoop的安装目录下logs目录,用cat hadoop-root-datanode-localhost.localdomain.log进行日志查看

    1、java.net.BindException: 无法指定被请求的地址

    另一种错误表现方式是:执行hadoop fs -put命令报错could only be replicated to 0 nodes instead of minReplication (=1)

    报错大致如下:

    20/12/01 00:45:48 WARN hdfs.DFSClient: DataStreamer Exception

    org.apache.hadoop.ipc.RemoteException(java.io.IOException): File /Hadoop/Input/wordcount.txt._COPYING_ could only be replicated to 0 nodes instead of minReplication (=1). There are 0 datanode(s) running and no node(s) are excluded in this operation.

    at org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.chooseTarget4NewBlock(BlockManager.java:1571)

    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getNewBlockTargets(FSNamesystem.java:3107)

    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3031)

    at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:725)

    at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:492)

    at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)

    at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)

    at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)

    at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)

    at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)

    at java.security.AccessController.doPrivileged(Native Method)

    at javax.security.auth.Subject.doAs(Subject.java:422)

    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)

    at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043)

    at org.apache.hadoop.ipc.Client.call(Client.java:1475)

    at org.apache.hadoop.ipc.Client.call(Client.java:1412)

    at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)

    at com.sun.proxy.$Proxy10.addBlock(Unknown Source)

    at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.addBlock(ClientNamenodeProtocolTranslatorPB.java:418)

    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

    at java.lang.reflect.Method.invoke(Method.java:498)

    at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)

    at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)

    at com.sun.proxy.$Proxy11.addBlock(Unknown Source)

    at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.locateFollowingBlock(DFSOutputStream.java:1455)

    at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1251)

    at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:448)

    put: File /Hadoop/Input/wordcount.txt._COPYING_ could only be replicated to 0 nodes instead of minReplication (=1). There are 0 datanode(s) running and no node(s) are excluded in this operation.

    解决办法

    在主节点执行JPS命令显示

    [root@master logs]# jps

    7282 NameNode

    7764 ResourceManager

    8153 Jps

    7551 SecondaryNameNode

    从节点执行JPS命令显示

    [root@localhost current]# jps

    4600 Jps

    4426 NodeManager

    显示错误原因是因为dataNode没有启动成功。

    下面我们查找dataNode启动失败的原因:

    在从节点上找到Hadoop的安装目录下logs

    目录,打开cat hadoop-root-datanode-localhost.localdomain.log

    查找错误,我的错误日志如下:

    2020-12-02 18:18:46,740 INFO org.apache.hadoop.http.HttpServer2: HttpServer.start() threw a non Bind IOException

    java.net.BindException: Port in use: localhost:0

    at org.apache.hadoop.http.HttpServer2.openListeners(HttpServer2.java:919)

    at org.apache.hadoop.http.HttpServer2.start(HttpServer2.java:856)

    at org.apache.hadoop.hdfs.server.datanode.web.DatanodeHttpServer.(DatanodeHttpServer.java:104)

    at org.apache.hadoop.hdfs.server.datanode.DataNode.startInfoServer(DataNode.java:760)

    at org.apache.hadoop.hdfs.server.datanode.DataNode.startDataNode(DataNode.java:1112)

    at org.apache.hadoop.hdfs.server.datanode.DataNode.(DataNode.java:429)

    at org.apache.hadoop.hdfs.server.datanode.DataNode.makeInstance(DataNode.java:2374)

    at org.apache.hadoop.hdfs.server.datanode.DataNode.instantiateDataNode(DataNode.java:2261)

    at org.apache.hadoop.hdfs.server.datanode.DataNode.createDataNode(DataNode.java:2308)

    at org.apache.hadoop.hdfs.server.datanode.DataNode.secureMain(DataNode.java:2485)

    at org.apache.hadoop.hdfs.server.datanode.DataNode.main(DataNode.java:2509)

    Caused by: java.net.BindException: 无法指定被请求的地址

    at sun.nio.ch.Net.bind0(Native Method)

    at sun.nio.ch.Net.bind(Net.java:433)

    at sun.nio.ch.Net.bind(Net.java:425)

    at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:223)

    at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74)

    at org.mortbay.jetty.nio.SelectChannelConnector.open(SelectChannelConnector.java:216)

    at org.apache.hadoop.http.HttpServer2.openListeners(HttpServer2.java:914)

    ... 10 more

    2020-12-02 18:18:46,745 INFO org.apache.hadoop.hdfs.server.datanode.DataNode: Shutdown complete.

    2020-12-02 18:18:46,745 FATAL org.apache.hadoop.hdfs.server.datanode.DataNode: Exception in secureMain

    java.net.BindException: Port in use: localhost:0

    at org.apache.hadoop.http.HttpServer2.openListeners(HttpServer2.java:919)

    at org.apache.hadoop.http.HttpServer2.start(HttpServer2.java:856)

    at org.apache.hadoop.hdfs.server.datanode.web.DatanodeHttpServer.(DatanodeHttpServer.java:104)

    at org.apache.hadoop.hdfs.server.datanode.DataNode.startInfoServer(DataNode.java:760)

    at org.apache.hadoop.hdfs.server.datanode.DataNode.startDataNode(DataNode.java:1112)

    at org.apache.hadoop.hdfs.server.datanode.DataNode.(DataNode.java:429)

    at org.apache.hadoop.hdfs.server.datanode.DataNode.makeInstance(DataNode.java:2374)

    at org.apache.hadoop.hdfs.server.datanode.DataNode.instantiateDataNode(DataNode.java:2261)

    at org.apache.hadoop.hdfs.server.datanode.DataNode.createDataNode(DataNode.java:2308)

    at org.apache.hadoop.hdfs.server.datanode.DataNode.secureMain(DataNode.java:2485)

    at org.apache.hadoop.hdfs.server.datanode.DataNode.main(DataNode.java:2509)

    Caused by: java.net.BindException: 无法指定被请求的地址

    at sun.nio.ch.Net.bind0(Native Method)

    at sun.nio.ch.Net.bind(Net.java:433)

    at sun.nio.ch.Net.bind(Net.java:425)

    at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:223)

    at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74)

    at org.mortbay.jetty.nio.SelectChannelConnector.open(SelectChannelConnector.java:216)

    at org.apache.hadoop.http.HttpServer2.openListeners(HttpServer2.java:914)

    ... 10 more

    2020-12-02 18:18:46,746 INFO org.apache.hadoop.util.ExitUtil: Exiting with status 1

    2020-12-02 18:18:46,748 INFO org.apache.hadoop.hdfs.server.datanode.DataNode: SHUTDOWN_MSG:

    /************************************************************

    SHUTDOWN_MSG: Shutting down DataNode at master/192.168.242.139

    ************************************************************/

    可以清楚的看到是因为HOSTS文件配置不对引起的故障,把HOSTS文件

    增加配置如下(我的HOSTS没有如下配置是因为被删除掉了……):

    127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4

    ::1 localhost localhost.localdomain localhost6 localhost6.localdomain6

    重新在主节点上启动“start-all.sh”,然后在从节点上执行JPS查看

    [root@localhost logs]# jps

    5398 Jps

    5131 NodeManager

    4958 DataNode

    可以看到dataNode已经成功起动。

    2、clusterID两个ID不相等

    在各从节点点查看:

    cat hadoop-root-datanode-localhost.localdomain.log大致错误如下:

    2020-12-09 18:10:40,810 WARN org.apache.hadoop.hdfs.server.common.Storage: Failed to add storage directory [DISK]file:/opt/hadoop-2.7.3/tmp/

    java.io.IOException: Incompatible clusterIDs in /opt/hadoop-2.7.3/tmp: namenode clusterID = CID-f9f0e847-beee-4d43-9e59-24687906ef30; datanode clusterID = CID-58e9d874-8916-44b8-841c-723e950b1f41

    at org.apache.hadoop.hdfs.server.datanode.DataStorage.doTransition(DataStorage.java:775)

    at org.apache.hadoop.hdfs.server.datanode.DataStorage.loadStorageDirectory(DataStorage.java:300)

    at org.apache.hadoop.hdfs.server.datanode.DataStorage.loadDataStorage(DataStorage.java:416)

    at org.apache.hadoop.hdfs.server.datanode.DataStorage.addStorageLocations(DataStorage.java:395)

    at org.apache.hadoop.hdfs.server.datanode.DataStorage.recoverTransitionRead(DataStorage.java:573)

    at org.apache.hadoop.hdfs.server.datanode.DataNode.initStorage(DataNode.java:1362)

    at org.apache.hadoop.hdfs.server.datanode.DataNode.initBlockPool(DataNode.java:1327)

    at org.apache.hadoop.hdfs.server.datanode.BPOfferService.verifyAndSetNamespaceInfo(BPOfferService.java:317)

    at org.apache.hadoop.hdfs.server.datanode.BPServiceActor.connectToNNAndHandshake(BPServiceActor.java:223)

    at org.apache.hadoop.hdfs.server.datanode.BPServiceActor.run(BPServiceActor.java:802)

    at java.lang.Thread.run(Thread.java:745)

    2020-12-09 18:10:40,814 FATAL org.apache.hadoop.hdfs.server.datanode.DataNode: Initialization failed for Block pool (Datanode Uuid unassigned) service to master/192.168.242.139:8020. Exiting.

    java.io.IOException: All specified directories are failed to load.

    at org.apache.hadoop.hdfs.server.datanode.DataStorage.recoverTransitionRead(DataStorage.java:574)

    at org.apache.hadoop.hdfs.server.datanode.DataNode.initStorage(DataNode.java:1362)

    at org.apache.hadoop.hdfs.server.datanode.DataNode.initBlockPool(DataNode.java:1327)

    at org.apache.hadoop.hdfs.server.datanode.BPOfferService.verifyAndSetNamespaceInfo(BPOfferService.java:317)

    at org.apache.hadoop.hdfs.server.datanode.BPServiceActor.connectToNNAndHandshake(BPServiceActor.java:223)

    at org.apache.hadoop.hdfs.server.datanode.BPServiceActor.run(BPServiceActor.java:802)

    at java.lang.Thread.run(Thread.java:745)

    2020-12-09 18:10:40,815 WARN org.apache.hadoop.hdfs.server.datanode.DataNode: Ending block pool service for: Block pool (Datanode Uuid unassigned) service to master/192.168.242.139:8020

    解决办法

    查看core-site.xml,找到存namenode元数据和datanode元数据的路径。

    hadoop.tmp.dir

    /opt/hadoop-2.7.3/tmp

    在这个路径/opt/hadoop-2.7.3/tmp下去找datanode的VERSION文件,把从节点的clusterID改成与主节点的相同,然后再主节点上执行start-dfs.sh试试,如果这时在从节点上执行jps命令后,发现DataNode已经启动了,证明此方法可行。

    3、报clusterID不一样,但是VERSION文件的clusterID确与主节点一样的解决方法

    有时候会碰到日志文件同样报clusterID不一样,但是经过确认VERSION文件的clusterID确实同主节点一样。这个时候查看下core-site.xml配置的datanode路径下是不是多了一个current文件,这个时候current文件夹下两样也有一个一样的VERSION文件,直接执行 rm -rf current/把该文件夹及文件夹下所有内容删除,再主节点上执行start-dfs.sh试试。

    展开全文
  • (三)DataNode的剖析

    2021-01-09 19:18:23
    DataNode初始化 DataNode注册 接下来讲解的是DataNode部分: 1)Datanode初始化:之前分析过jps,能看到的服务就认为是RPC的服务端。 在我们平时搭建集群的过程中,我们jps是能看到DataNode服务的。所以datanode...


    总体流程

    DataNode初始化

    在这里插入图片描述

    DataNode注册

    在这里插入图片描述
    接下来讲解的是DataNode部分
    1)Datanode初始化:之前分析过jps,能看到的服务就认为是RPC的服务端。
    在我们平时搭建集群的过程中,我们jps是能看到DataNode服务的。所以datanode应该就是RPC的服务端。
    2)DataNode的注册:HDFS是一个主从式的架构。Namenode是一个主节点。Datanode是从节点。所以我们从节点启动的时候就需要跟主节点进行注册。为什么这个知识点重要。是因为我们大数据里面95%的大数据的技术 都是主从式的架构。如果学习好了今天的代码你再去学习Flink Spark 也好,道理是一样的。
    3)Datanode发送心跳:从节点要发送心跳,如果不发送心跳,我们主节点就不知道从节点是否存活。

    知识点铺垫

    HDFS的高可用方案的原理

    • Hadoop1.X: 我们只有一个namenode 一个datanode
    • Namenode: 管理元数据
    • Datanode: 存储数据的,为了保证数据安全,然后每个副本都有3个备份。64M
      Namenode存在问题:
      1)namenode有单点故障的问题
      2)Namenode是一个有状态(管理了元数据)的服务
      所以接下来hadoop团对就会去解决这个问题。

    解决这个单点故障的问题:
    1)如何保证两个namenode的元数据要时时刻刻的保持一致。
    2)自动切换

    与NameNode相似,为服务端,所以在main启动

      public static void main(String args[]) {
        if (DFSUtil.parseHelpArgument(args, DataNode.USAGE, System.out, true)) {
          System.exit(0);
        }
        //TODO 核心代码
        secureMain(args, null);
      }
    
    public static void secureMain(String args[], SecureResources resources) {
        int errorCode = 0;
        try {
          StringUtils.startupShutdownMessage(DataNode.class, args, LOG);
          //TODO 这段代码是重要的代码
          //初始化DataNode
          DataNode datanode = createDataNode(args, null, resources);
          if (datanode != null) {
        	  //TODO 阻塞起来
            datanode.join();
          } else {
            errorCode = 1;
          }
        } catch (Throwable e) {
          LOG.fatal("Exception in secureMain", e);
          terminate(1, e);
        } finally {
          // We need to terminate the process here because either shutdown was called
          // or some disk related conditions like volumes tolerated or volumes required
          // condition was not met. Also, In secure mode, control will go to Jsvc
          // and Datanode process hangs if it does not exit.
          LOG.warn("Exiting Datanode");
          terminate(errorCode);
        }
      }
    
    public static DataNode createDataNode(String args[], Configuration conf,
          SecureResources resources) throws IOException {
        //TODO 实例化DataNode
        DataNode dn = instantiateDataNode(args, conf, resources);
        if (dn != null) {
          //TODO 启动DataNode后台线程
          //重要
          dn.runDatanodeDaemon();
        }
        return dn;
      }
    
     public static DataNode instantiateDataNode(String args [], Configuration conf,
          SecureResources resources) throws IOException {
        if (conf == null)
          conf = new HdfsConfiguration();
        
        if (args != null) {
          // parse generic hadoop options
          GenericOptionsParser hParser = new GenericOptionsParser(conf, args);
          args = hParser.getRemainingArgs();
        }
        
        if (!parseArguments(args, conf)) {
          printUsage(System.err);
          return null;
        }
        Collection<StorageLocation> dataLocations = getStorageLocations(conf);
        UserGroupInformation.setConfiguration(conf);
        SecurityUtil.login(conf, DFS_DATANODE_KEYTAB_FILE_KEY,
            DFS_DATANODE_KERBEROS_PRINCIPAL_KEY);
        //TODO 重要的代码
        return makeInstance(dataLocations, conf, resources);
      }
    

    makeInstance方法里面创建了一个DataNode(conf, locations, resources)对象

    DataNode(final Configuration conf,
               final List<StorageLocation> dataDirs,
               final SecureResources resources) throws IOException {
        super(conf);
        this.blockScanner = new BlockScanner(this, conf);
        this.lastDiskErrorCheck = 0;
        this.maxNumberOfBlocksToLog = conf.getLong(DFS_MAX_NUM_BLOCKS_TO_LOG_KEY,
            DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT);
    
        this.usersWithLocalPathAccess = Arrays.asList(
            conf.getTrimmedStrings(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY));
        this.connectToDnViaHostname = conf.getBoolean(
            DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME,
            DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME_DEFAULT);
        this.getHdfsBlockLocationsEnabled = conf.getBoolean(
            DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, 
            DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT);
        this.supergroup = conf.get(DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_KEY,
            DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT);
        this.isPermissionEnabled = conf.getBoolean(
            DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY,
            DFSConfigKeys.DFS_PERMISSIONS_ENABLED_DEFAULT);
        this.pipelineSupportECN = conf.getBoolean(
            DFSConfigKeys.DFS_PIPELINE_ECN_ENABLED,
            DFSConfigKeys.DFS_PIPELINE_ECN_ENABLED_DEFAULT);
    
        confVersion = "core-" +
            conf.get("hadoop.common.configuration.version", "UNSPECIFIED") +
            ",hdfs-" +
            conf.get("hadoop.hdfs.configuration.version", "UNSPECIFIED");
    
        // Determine whether we should try to pass file descriptors to clients.
        if (conf.getBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY,
                  DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT)) {
          String reason = DomainSocket.getLoadingFailureReason();
          if (reason != null) {
            LOG.warn("File descriptor passing is disabled because " + reason);
            this.fileDescriptorPassingDisabledReason = reason;
          } else {
            LOG.info("File descriptor passing is enabled.");
            this.fileDescriptorPassingDisabledReason = null;
          }
        } else {
          this.fileDescriptorPassingDisabledReason =
              "File descriptor passing was not configured.";
          LOG.debug(this.fileDescriptorPassingDisabledReason);
        }
    
        try {
          hostName = getHostName(conf);
          LOG.info("Configured hostname is " + hostName);
          //TODO 启动datanode
          startDataNode(conf, dataDirs, resources);
        } catch (IOException ie) {
          shutdown();
          throw ie;
        }
        final int dncCacheMaxSize =
            conf.getInt(DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_KEY,
                DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_DEFAULT) ;
        //TODO 构建者设计模式
        datanodeNetworkCounts =
            CacheBuilder.newBuilder()
                .maximumSize(dncCacheMaxSize)
                .build(new CacheLoader<String, Map<String, Long>>() {
                  @Override
                  public Map<String, Long> load(String key) throws Exception {
                    final Map<String, Long> ret = new HashMap<String, Long>();
                    ret.put("networkErrors", 0L);
                    return ret;
                  }
                });
      }
    

    启动DataNode

     void startDataNode(Configuration conf, 
                         List<StorageLocation> dataDirs,
                         SecureResources resources
                         ) throws IOException {
    
        // settings global for all BPs in the Data Node
        this.secureResources = resources;
        synchronized (this) {
          this.dataDirs = dataDirs;
        }
        this.conf = conf;
        this.dnConf = new DNConf(conf);
        checkSecureConfig(dnConf, conf, resources);
    
        this.spanReceiverHost = SpanReceiverHost.getInstance(conf);
    
        if (dnConf.maxLockedMemory > 0) {
          if (!NativeIO.POSIX.getCacheManipulator().verifyCanMlock()) {
            throw new RuntimeException(String.format(
                "Cannot start datanode because the configured max locked memory" +
                " size (%s) is greater than zero and native code is not available.",
                DFS_DATANODE_MAX_LOCKED_MEMORY_KEY));
          }
          if (Path.WINDOWS) {
            NativeIO.Windows.extendWorkingSetSize(dnConf.maxLockedMemory);
          } else {
            long ulimit = NativeIO.POSIX.getCacheManipulator().getMemlockLimit();
            if (dnConf.maxLockedMemory > ulimit) {
              throw new RuntimeException(String.format(
                "Cannot start datanode because the configured max locked memory" +
                " size (%s) of %d bytes is more than the datanode's available" +
                " RLIMIT_MEMLOCK ulimit of %d bytes.",
                DFS_DATANODE_MAX_LOCKED_MEMORY_KEY,
                dnConf.maxLockedMemory,
                ulimit));
            }
          }
        }
        LOG.info("Starting DataNode with maxLockedMemory = " +
            dnConf.maxLockedMemory);
    
        storage = new DataStorage();
        
        // global DN settings
        registerMXBean();
        //TODO 初始化DataXceiver
        initDataXceiver(conf);
        //TODO 启动HttpServer服务
        startInfoServer(conf);
        pauseMonitor = new JvmPauseMonitor(conf);
        pauseMonitor.start();
      
        // BlockPoolTokenSecretManager is required to create ipc server.
        this.blockPoolTokenSecretManager = new BlockPoolTokenSecretManager();
    
        // Login is done by now. Set the DN user name.
        dnUserName = UserGroupInformation.getCurrentUser().getShortUserName();
        LOG.info("dnUserName = " + dnUserName);
        LOG.info("supergroup = " + supergroup);
        //TODO 初始化RPC的服务
        initIpcServer(conf);
    
        metrics = DataNodeMetrics.create(conf, getDisplayName());
        metrics.getJvmMetrics().setPauseMonitor(pauseMonitor);
        
        //TODO 创建了BlockPoolManager
        //BlockPool,一个集群就有一个BlockPool
        //如果我们是联邦机制。就会有多个namenode,也就会有多个联邦,一个联邦就是一个blockpool
        //假设一个集群里面:4个NameNode: 2两个联邦
        //联邦一:hadoop1(Active) hadoop2(StandBy)(blockPool是同一个)
        //联邦二:hadoop3(Active)   hadoop4(StandBy)(blockPool是同一个)
        blockPoolManager = new BlockPoolManager(this);
    
        //TODO 重要
        //里面涉及到心跳内容
        blockPoolManager.refreshNamenodes(conf);
    
        // Create the ReadaheadPool from the DataNode context so we can
        // exit without having to explicitly shutdown its thread pool.
        readaheadPool = ReadaheadPool.getInstance();
        saslClient = new SaslDataTransferClient(dnConf.conf, 
            dnConf.saslPropsResolver, dnConf.trustedChannelResolver);
        saslServer = new SaslDataTransferServer(dnConf, blockPoolTokenSecretManager);
      }
    

    DataXceiver是DataNode用来接收客户端和其他DataNode传过来数据的服务。

    private void initDataXceiver(Configuration conf) throws IOException {
        // find free port or use privileged port provided
        //TODO 接收tcp请求的
        TcpPeerServer tcpPeerServer;
        if (secureResources != null) {
          tcpPeerServer = new TcpPeerServer(secureResources);
        } else {
          tcpPeerServer = new TcpPeerServer(dnConf.socketWriteTimeout,
              DataNode.getStreamingAddr(conf));
        }
        tcpPeerServer.setReceiveBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE);
        streamingAddr = tcpPeerServer.getStreamingAddr();
        LOG.info("Opened streaming server at " + streamingAddr);
        this.threadGroup = new ThreadGroup("dataXceiverServer");
        //重要的代码
        //TODO 实例化了一个DataXceiverServer
        //这个东西就是DataNode用来接收客户端和其他DataNode传过来数据的服务。
        xserver = new DataXceiverServer(tcpPeerServer, conf, this);
        
        //设置为后台线程
        this.dataXceiverServer = new Daemon(threadGroup, xserver);
        this.threadGroup.setDaemon(true); // auto destroy when empty
    
        if (conf.getBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY,
                  DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT) ||
            conf.getBoolean(DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC,
                  DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT)) {
          DomainPeerServer domainPeerServer =
                    getDomainPeerServer(conf, streamingAddr.getPort());
          if (domainPeerServer != null) {
            this.localDataXceiverServer = new Daemon(threadGroup,
                new DataXceiverServer(domainPeerServer, conf, this));
            LOG.info("Listening on UNIX domain socket: " +
                domainPeerServer.getBindPath());
          }
        }
        this.shortCircuitRegistry = new ShortCircuitRegistry(conf);
      }
    

    启动HttpServer,与namenode相似

    private void startInfoServer(Configuration conf)
        throws IOException {
        Configuration confForInfoServer = new Configuration(conf);
        confForInfoServer.setInt(HttpServer2.HTTP_MAX_THREADS, 10);
        //namenode启动的时候也启动了一个httpserver2 
        //TODO 用来接收http的请求。
        HttpServer2.Builder builder = new HttpServer2.Builder()
          .setName("datanode")
          .setConf(conf).setACL(new AccessControlList(conf.get(DFS_ADMIN, " ")))
          .addEndpoint(URI.create("http://localhost:0"))
          .setFindPort(true);
    
        this.infoServer = builder.build();
         //TODO 往这个httpserver上面绑定了多个servlet
        this.infoServer.addInternalServlet(null, "/streamFile/*", StreamFile.class);
        //获取校验文件
        this.infoServer.addInternalServlet(null, "/getFileChecksum/*",
            FileChecksumServlets.GetServlet.class);
        
        this.infoServer.setAttribute("datanode", this);
        this.infoServer.setAttribute(JspHelper.CURRENT_CONF, conf);
        this.infoServer.addServlet(null, "/blockScannerReport",
                                   BlockScanner.Servlet.class);
        //TODO 启动了http的服务
        this.infoServer.start();
        InetSocketAddress jettyAddr = infoServer.getConnectorAddress(0);
    
        // SecureDataNodeStarter will bind the privileged port to the channel if
        // the DN is started by JSVC, pass it along.
        ServerSocketChannel httpServerChannel = secureResources != null ?
          secureResources.getHttpServerChannel() : null;
        this.httpServer = new DatanodeHttpServer(conf, jettyAddr, httpServerChannel);
        httpServer.start();
        if (httpServer.getHttpAddress() != null) {
          infoPort = httpServer.getHttpAddress().getPort();
        }
        if (httpServer.getHttpsAddress() != null) {
          infoSecurePort = httpServer.getHttpsAddress().getPort();
        }
      }
    

    启动RPC

    private void initIpcServer(Configuration conf) throws IOException {
        InetSocketAddress ipcAddr = NetUtils.createSocketAddr(
            conf.getTrimmed(DFS_DATANODE_IPC_ADDRESS_KEY));
        
        // Add all the RPC protocols that the Datanode implements    
        RPC.setProtocolEngine(conf, ClientDatanodeProtocolPB.class,
            ProtobufRpcEngine.class);
        ClientDatanodeProtocolServerSideTranslatorPB clientDatanodeProtocolXlator = 
              new ClientDatanodeProtocolServerSideTranslatorPB(this);
        //处理客户端和datanode之间的请求
        BlockingService service = ClientDatanodeProtocolService
            .newReflectiveBlockingService(clientDatanodeProtocolXlator);
        
        //这个代码就是用来创建一个RPC的服务端
        ipcServer = new RPC.Builder(conf)
            .setProtocol(ClientDatanodeProtocolPB.class)
            .setInstance(service)
            .setBindAddress(ipcAddr.getHostName())
            .setPort(ipcAddr.getPort())
            .setNumHandlers(
                conf.getInt(DFS_DATANODE_HANDLER_COUNT_KEY,
                    DFS_DATANODE_HANDLER_COUNT_DEFAULT)).setVerbose(false)
            .setSecretManager(blockPoolTokenSecretManager).build();
        
        InterDatanodeProtocolServerSideTranslatorPB interDatanodeProtocolXlator = 
            new InterDatanodeProtocolServerSideTranslatorPB(this);
        //datanode与datanode之间进行通信协议
        service = InterDatanodeProtocolService
            .newReflectiveBlockingService(interDatanodeProtocolXlator);
        DFSUtil.addPBProtocol(conf, InterDatanodeProtocolPB.class, service,
            ipcServer);
         //另外还有一个协议
        TraceAdminProtocolServerSideTranslatorPB traceAdminXlator =
            new TraceAdminProtocolServerSideTranslatorPB(this);
        BlockingService traceAdminService = TraceAdminService
            .newReflectiveBlockingService(traceAdminXlator);
        DFSUtil.addPBProtocol(conf, TraceAdminProtocolPB.class, traceAdminService,
            ipcServer);
    
        LOG.info("Opened IPC server at " + ipcServer.getListenerAddress());
    
        // set service-level authorization security policy
        if (conf.getBoolean(
            CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false)) {
          ipcServer.refreshServiceAcl(conf, new HDFSPolicyProvider());
        }
      }
    

    心跳内容相关功能

      void refreshNamenodes(Configuration conf)
          throws IOException {
        LOG.info("Refresh request received for nameservices: " + conf.get
                (DFSConfigKeys.DFS_NAMESERVICES));
    
        Map<String, Map<String, InetSocketAddress>> newAddressMap = DFSUtil
                .getNNServiceRpcAddressesForCluster(conf);
    
        synchronized (refreshNamenodesLock) {
        	//TODO 重要代码
          doRefreshNamenodes(newAddressMap);
        }
      }
    
    private void doRefreshNamenodes(
          Map<String, Map<String, InetSocketAddress>> addrMap) throws IOException {
        assert Thread.holdsLock(refreshNamenodesLock);
    
        Set<String> toRefresh = Sets.newLinkedHashSet();
        Set<String> toAdd = Sets.newLinkedHashSet();
        Set<String> toRemove;
        /*
    	HDFS是一个分布式的文件系统,nameservice命名空间,指的是我们的目录
    	在高可用架构下,ActiveNameNode和standByNameNode,这两个管理的是同一个nameservice
    	联邦:里面就有多套HA架构
    	namenode:(hadoop1,hadoop2) (hadoop3,hadoop4)
    	*/
        synchronized (this) {
          // Step 1. For each of the new nameservices, figure out whether
          // it's an update of the set of NNs for an existing NS,
          // or an entirely new nameservice.
          //TODO 通常情况下:HDFS集群的架构是HA架构
          //nameservice(hadoop1 hadoop2)
        
         //如果是联邦架构,里面就会有多个
          for (String nameserviceId : addrMap.keySet()) {
            if (bpByNameserviceId.containsKey(nameserviceId)) {
              toRefresh.add(nameserviceId);
            } else {
              //TODO toAdd里面有多少有的联邦,一个联邦就是一个NameService
              toAdd.add(nameserviceId);
            }
          }
          
          // Step 2. Any nameservices we currently have but are no longer present
          // need to be removed.
          toRemove = Sets.newHashSet(Sets.difference(
              bpByNameserviceId.keySet(), addrMap.keySet()));
          
          assert toRefresh.size() + toAdd.size() ==
            addrMap.size() :
              "toAdd: " + Joiner.on(",").useForNull("<default>").join(toAdd) +
              "  toRemove: " + Joiner.on(",").useForNull("<default>").join(toRemove) +
              "  toRefresh: " + Joiner.on(",").useForNull("<default>").join(toRefresh);
    
          
          // Step 3. Start new nameservices
          if (!toAdd.isEmpty()) {
            LOG.info("Starting BPOfferServices for nameservices: " +
                Joiner.on(",").useForNull("<default>").join(toAdd));
    
            //TODO 遍历所有的联邦,一个联邦里面会有两个NameNode(HA)
            for (String nsToAdd : toAdd) {
              ArrayList<InetSocketAddress> addrs =
                Lists.newArrayList(addrMap.get(nsToAdd).values());
              //TODO 重要的关系
              //一个联邦对应一个BPOfferService
              //一个联邦里面的一个NameNode就是一个BPServiceActor
              //也就是正常来说一个BPOfferService对应两个BPServiceActor
              BPOfferService bpos = createBPOS(addrs);
              bpByNameserviceId.put(nsToAdd, bpos);
              offerServices.add(bpos);
            }
          }
          //TODO DataNode向NameNode进行注册和心跳
          startAll();
        }
    
        // Step 4. Shut down old nameservices. This happens outside
        // of the synchronized(this) lock since they need to call
        // back to .remove() from another thread
        if (!toRemove.isEmpty()) {
          LOG.info("Stopping BPOfferServices for nameservices: " +
              Joiner.on(",").useForNull("<default>").join(toRemove));
          
          for (String nsToRemove : toRemove) {
            BPOfferService bpos = bpByNameserviceId.get(nsToRemove);
            bpos.stop();
            bpos.join();
            // they will call remove on their own
          }
        }
        
        // Step 5. Update nameservices whose NN list has changed
        if (!toRefresh.isEmpty()) {
          LOG.info("Refreshing list of NNs for nameservices: " +
              Joiner.on(",").useForNull("<default>").join(toRefresh));
          
          for (String nsToRefresh : toRefresh) {
            BPOfferService bpos = bpByNameserviceId.get(nsToRefresh);
            ArrayList<InetSocketAddress> addrs =
              Lists.newArrayList(addrMap.get(nsToRefresh).values());
            bpos.refreshNNList(addrs);
          }
        }
      }
    
      //TODO DataNode向NameNode进行注册和心跳
      startAll();
    
    synchronized void startAll() throws IOException {
        try {
          UserGroupInformation.getLoginUser().doAs(
              new PrivilegedExceptionAction<Object>() {
                @Override
                public Object run() throws Exception {
                	//TODO 遍历所有的BPOfferService 遍历所有的联邦
                  for (BPOfferService bpos : offerServices) {
                    //TODO 重要
                    bpos.start();
                  }
                  return null;
                }
              });
        } catch (InterruptedException ex) {
          IOException ioe = new IOException();
          ioe.initCause(ex.getCause());
          throw ioe;
        }
      }
      
    
     void start() {
    	  //TODO 一个bpOfferService里面就会有多个Actor
        for (BPServiceActor actor : bpServices) {
        	//TODO DataNode进行注册和心跳
          actor.start();
        }
      }
    
      void start() {
        if ((bpThread != null) && (bpThread.isAlive())) {
          //Thread is started already
          return;
        }
        bpThread = new Thread(this, formatThreadName());
        //run
        bpThread.setDaemon(true); // needed for JUnit testing
        //TODO 启动线程,所以我们接写来观察run方法
        bpThread.start();
      }
    

    线程的start其实就是run方法

    public void run() {
        LOG.info(this + " starting to offer service");
        /**
         * TODO 这儿的这个代码也可以学习
         * 目的就是一定要注册上,因为注册这个功能比较重要所以使用了while循环
         * 一旦注册上了以后就break
         */
        try {
        //想方设法把下面的代码运行成功
          while (true) {
            // init stuff
            try {
              //TODO 注册核心代码
              connectToNNAndHandshake();
              break;
            } catch (IOException ioe) {
              // Initial handshake, storage recovery or registration failed
              runningState = RunningState.INIT_FAILED;
              if (shouldRetryInit()) {
                // Retry until all namenode's of BPOS failed initialization
                LOG.error("Initialization failed for " + this + " "
                    + ioe.getLocalizedMessage());
                //TODO 如果有问题sleep 5秒
                sleepAndLogInterrupts(5000, "initializing");
              } else {
                runningState = RunningState.FAILED;
                LOG.fatal("Initialization failed for " + this + ". Exiting. ", ioe);
                return;
              }
            }
          }
          //注册结束了
    
          runningState = RunningState.RUNNING;
          while (shouldRun()) {
            try {
            	//TODO 发送心跳
              offerService();
            } catch (Exception ex) {
              LOG.error("Exception in BPOfferService for " + this, ex);
              sleepAndLogInterrupts(5000, "offering service");
            }
          }
          runningState = RunningState.EXITED;
        } catch (Throwable ex) {
          LOG.warn("Unexpected exception in block pool " + this, ex);
          runningState = RunningState.FAILED;
        } finally {
          LOG.warn("Ending block pool service for: " + this);
          cleanUp();
        }
      }
    
    private void connectToNNAndHandshake() throws IOException {
        //TODO 获取到namenode的代理
        //datanode-》namenode
        //datanode调用namenode的方法,往namenode里面存储datanode的信息
        bpNamenode = dn.connectToNN(nnAddr);
    
        // First phase of the handshake with NN - get the namespace
        // info.
        NamespaceInfo nsInfo = retrieveNamespaceInfo();
        
        // Verify that this matches the other NN in this HA pair.
        // This also initializes our block pool in the DN if we are
        // the first NN connection for this BP.
        bpos.verifyAndSetNamespaceInfo(nsInfo);
        
        // Second phase of the handshake with the NN.
        //TODO 注册
        register(nsInfo);
      }
    
    void register(NamespaceInfo nsInfo) throws IOException {
        // The handshake() phase loaded the block pool storage
        // off disk - so update the bpRegistration object from that info
    	  
    	 //TODO 创建注册信息
         //可以观察注册信息
        bpRegistration = bpos.createRegistration();
    
        LOG.info(this + " beginning handshake with NN");
        while (shouldRun()) {
          try {
            // Use returned registration from namenode with updated fields
            //TODO 调用服务端的registerDatanode方法
            bpRegistration = bpNamenode.registerDatanode(bpRegistration);
            //如果执行到这儿,说明注册过程已经完成了。
            bpRegistration.setNamespaceInfo(nsInfo);
            break;
          } catch(EOFException e) {  // namenode might have just restarted
            LOG.info("Problem connecting to server: " + nnAddr + " :"
                + e.getLocalizedMessage());
            sleepAndLogInterrupts(1000, "connecting to server");
          } catch(SocketTimeoutException e) {  // namenode is busy
            LOG.info("Problem connecting to server: " + nnAddr);
            sleepAndLogInterrupts(1000, "connecting to server");
          }
        }
        
        LOG.info("Block pool " + this + " successfully registered with NN");
        bpos.registrationSucceeded(this, bpRegistration);
    
        // random short delay - helps scatter the BR from all DNs
        scheduleBlockReport(dnConf.initialBlockReportDelay);
      }
    

    实际调用的是namenodeRPCServer的下面方法

      public DatanodeRegistration registerDatanode(DatanodeRegistration nodeReg)
          throws IOException {
    	  //是否启动起来
        checkNNStartup();
        verifySoftwareVersion(nodeReg);
        //TODO 注册DataNode
        namesystem.registerDatanode(nodeReg);
        return nodeReg;
      }
    
     void registerDatanode(DatanodeRegistration nodeReg) throws IOException {
        writeLock();
        try {
          //TODO DataNodeManager处理关于DataNode的事
          getBlockManager().getDatanodeManager().registerDatanode(nodeReg);
          checkSafeMode();
        } finally {
          writeUnlock();
        }
      }
    

    registerDateNode里面的关键代码就是

            networktopology.add(nodeDescr);
            nodeDescr.setSoftwareVersion(nodeReg.getSoftwareVersion());
      
            // register new datanode
            
            //TODO 注册DataNode
            addDatanode(nodeDescr);
            // also treat the registration message as a heartbeat
            // no need to update its timestamp
            // because its is done when the descriptor is created
            
            //TODO 把注册上来的DataNode加入到HeartbeatManager里面
            //后面进行心跳管理
            heartbeatManager.addDatanode(nodeDescr);
            incrementVersionCount(nodeReg.getSoftwareVersion());
            startDecommissioningIfExcluded(nodeDescr);
    
    /** Add a datanode. */
      //Todo 注册DataNode说白了就是往一堆数据结构里添加信息
      void addDatanode(final DatanodeDescriptor node) {
        // To keep host2DatanodeMap consistent with datanodeMap,
        // remove  from host2DatanodeMap the datanodeDescriptor removed
        // from datanodeMap before adding node to host2DatanodeMap.
        synchronized(datanodeMap) {
          //TODO  datanodeMap里面添加数据
          host2DatanodeMap.remove(datanodeMap.put(node.getDatanodeUuid(), node));
        }
        //TODO 往拓扑的数据结构里面加入一条数据
        networktopology.add(node); // may throw InvalidTopologyException
        //TODO 往内存里面加入一条数据
        host2DatanodeMap.add(node);
        
        //如果以上内存数据结构里面的数据添加好了以后,
        //注册就完成了
        checkIfClusterIsNowMultiRack(node);
    
        if (LOG.isDebugEnabled()) {
          LOG.debug(getClass().getSimpleName() + ".addDatanode: "
              + "node " + node + " is added to datanodeMap.");
        }
      }
    

    在心跳功能里面记录datanode的信息

      synchronized void addDatanode(final DatanodeDescriptor d) {
        // update in-service node count
    	  
    	  //往各种数据结构里面存东西
        stats.add(d);
        //往datanodes list结构里面存进去了datanode的信息
        datanodes.add(d);
        d.isAlive = true;
      }
    

    注册结束

     //注册结束了
    
          runningState = RunningState.RUNNING;
          while (shouldRun()) {
            try {
            	//TODO 发送心跳
              offerService();
            } catch (Exception ex) {
              LOG.error("Exception in BPOfferService for " + this, ex);
              sleepAndLogInterrupts(5000, "offering service");
            }
          }
    
    private void offerService() throws Exception {
        LOG.info("For namenode " + nnAddr + " using"
            + " DELETEREPORT_INTERVAL of " + dnConf.deleteReportInterval + " msec "
            + " BLOCKREPORT_INTERVAL of " + dnConf.blockReportInterval + "msec"
            + " CACHEREPORT_INTERVAL of " + dnConf.cacheReportInterval + "msec"
            + " Initial delay: " + dnConf.initialBlockReportDelay + "msec"
            + "; heartBeatInterval=" + dnConf.heartBeatInterval);
    
        //
        // Now loop for a long time....
        //TODO 周期性
        while (shouldRun()) {
          try {
            final long startTime = monotonicNow();
    
            //
            // Every so often, send heartbeat or block-report
            //
            //TODO 心跳是每3秒进行一次
            if (startTime - lastHeartbeat >= dnConf.heartBeatInterval) {
              //
              // All heartbeat messages include following info:
              // -- Datanode name
              // -- data transfer port
              // -- Total capacity
              // -- Bytes remaining
              //
              lastHeartbeat = startTime;
              if (!dn.areHeartbeatsDisabledForTests()) {
            	  //NameNode是不直接跟DataNode进行连接的。
            	  //DataNode发送心跳给NameNode
            	  //NameNode接收到心跳以后,会返回来一些指令
            	  //DataNode接收到这些指令以后,根据这些指令做对应的操作。
                //TODO 发送心跳,返回来的是NameNode给的响应指令
                HeartbeatResponse resp = sendHeartBeat();
                assert resp != null;
                dn.getMetrics().addHeartbeat(monotonicNow() - startTime);
    
                // If the state of this NN has changed (eg STANDBY->ACTIVE)
                // then let the BPOfferService update itself.
                //
                // Important that this happens before processCommand below,
                // since the first heartbeat to a new active might have commands
                // that we should actually process.
                bpos.updateActorStatesFromHeartbeat(
                    this, resp.getNameNodeHaState());
                state = resp.getNameNodeHaState().getState();
    
                if (state == HAServiceState.ACTIVE) {
                  handleRollingUpgradeStatus(resp);
                }
    
                long startProcessCommands = monotonicNow();
                //获取到一些namenode发送过来的指令
                //TODO 里面使用了【指令设计模式】
                if (!processCommand(resp.getCommands()))
                  continue;
                long endProcessCommands = monotonicNow();
                if (endProcessCommands - startProcessCommands > 2000) {
                  LOG.info("Took " + (endProcessCommands - startProcessCommands)
                      + "ms to process " + resp.getCommands().length
                      + " commands from NN");
                }
              }
            }
            if (sendImmediateIBR ||
                (startTime - lastDeletedReport > dnConf.deleteReportInterval)) {
              reportReceivedDeletedBlocks();
              lastDeletedReport = startTime;
            }
    
            List<DatanodeCommand> cmds = blockReport();
            processCommand(cmds == null ? null : cmds.toArray(new DatanodeCommand[cmds.size()]));
    
            DatanodeCommand cmd = cacheReport();
            processCommand(new DatanodeCommand[]{ cmd });
    
            //
            // There is no work to do;  sleep until hearbeat timer elapses, 
            // or work arrives, and then iterate again.
            //
            long waitTime = dnConf.heartBeatInterval - 
            (monotonicNow() - lastHeartbeat);
            synchronized(pendingIncrementalBRperStorage) {
              if (waitTime > 0 && !sendImmediateIBR) {
                try {
                  pendingIncrementalBRperStorage.wait(waitTime);
                } catch (InterruptedException ie) {
                  LOG.warn("BPOfferService for " + this + " interrupted");
                }
              }
            } // synchronized
          } catch(RemoteException re) {
            String reClass = re.getClassName();
            if (UnregisteredNodeException.class.getName().equals(reClass) ||
                DisallowedDatanodeException.class.getName().equals(reClass) ||
                IncorrectVersionException.class.getName().equals(reClass)) {
              LOG.warn(this + " is shutting down", re);
              shouldServiceRun = false;
              return;
            }
            LOG.warn("RemoteException in offerService", re);
            try {
              long sleepTime = Math.min(1000, dnConf.heartBeatInterval);
              Thread.sleep(sleepTime);
            } catch (InterruptedException ie) {
              Thread.currentThread().interrupt();
            }
          } catch (IOException e) {
            LOG.warn("IOException in offerService", e);
          }
          processQueueMessages();
        } // while (shouldRun())
      } // offerService
    
     HeartbeatResponse sendHeartBeat() throws IOException {
        StorageReport[] reports =
            dn.getFSDataset().getStorageReports(bpos.getBlockPoolId());
        if (LOG.isDebugEnabled()) {
          LOG.debug("Sending heartbeat with " + reports.length +
                    " storage reports from service actor: " + this);
        }
        
        VolumeFailureSummary volumeFailureSummary = dn.getFSDataset()
            .getVolumeFailureSummary();
        int numFailedVolumes = volumeFailureSummary != null ?
            volumeFailureSummary.getFailedStorageLocations().length : 0;
            //TODO 发送心跳
           //获取到NameNode的代理,发送心跳
        return bpNamenode.sendHeartbeat(bpRegistration,
            reports,
            dn.getFSDataset().getCacheCapacity(),
            dn.getFSDataset().getCacheUsed(),
            dn.getXmitsInProgress(),
            dn.getXceiverCount(),
            numFailedVolumes,
            volumeFailureSummary);
      }
    

    其实是调用了namenoderpcserver的sendHeartbeat

      @Override // DatanodeProtocol
      public HeartbeatResponse sendHeartbeat(DatanodeRegistration nodeReg,
          StorageReport[] report, long dnCacheCapacity, long dnCacheUsed,
          int xmitsInProgress, int xceiverCount,
          int failedVolumes, VolumeFailureSummary volumeFailureSummary)
          throws IOException {
        checkNNStartup();
        verifyRequest(nodeReg);
        //TODO 处理DataNode发送过来的心跳
        return namesystem.handleHeartbeat(nodeReg, report,
            dnCacheCapacity, dnCacheUsed, xceiverCount, xmitsInProgress,
            failedVolumes, volumeFailureSummary);
      }
    
    HeartbeatResponse handleHeartbeat(DatanodeRegistration nodeReg,
          StorageReport[] reports, long cacheCapacity, long cacheUsed,
          int xceiverCount, int xmitsInProgress, int failedVolumes,
          VolumeFailureSummary volumeFailureSummary) throws IOException {
        readLock();
        try {
          //get datanode commands
          final int maxTransfer = blockManager.getMaxReplicationStreams()
              - xmitsInProgress;
          //TODO NameNode处理DataNode发送过来的心跳
          DatanodeCommand[] cmds = blockManager.getDatanodeManager().handleHeartbeat(
              nodeReg, reports, blockPoolId, cacheCapacity, cacheUsed,
              xceiverCount, maxTransfer, failedVolumes, volumeFailureSummary);
          
          //create ha status
          final NNHAStatusHeartbeat haState = new NNHAStatusHeartbeat(
              haContext.getState().getServiceState(),
              getFSImage().getLastAppliedOrWrittenTxId());
          //TODO 给DataNode返回响应
          return new HeartbeatResponse(cmds, haState, rollingUpgradeInfo);
        } finally {
          readUnlock();
        }
      }
    

    handelhearbeat的重要代码

        synchronized (heartbeatManager) {
          synchronized (datanodeMap) {
            DatanodeDescriptor nodeinfo = null;
            try {
            	//TODO 从已有datanodeMap里面获取到注册过来的DataNode信息
            	//如果能获取到这个datanode的信息说明以前就注册过了
            	//但是如果是第一次是那么dataNodemap里面是没有信息的
              nodeinfo = getDatanode(nodeReg);
            } catch(UnregisteredNodeException e) {
              return new DatanodeCommand[]{RegisterCommand.REGISTER};
            }
            
            // Check if this datanode should actually be shutdown instead. 
            if (nodeinfo != null && nodeinfo.isDisallowed()) {
              setDatanodeDead(nodeinfo);
              throw new DisallowedDatanodeException(nodeinfo);
            }
    
            if (nodeinfo == null || !nodeinfo.isAlive) {
              return new DatanodeCommand[]{RegisterCommand.REGISTER};
            }
            //TODO 更新心跳的重要的信息
            heartbeatManager.updateHeartbeat(nodeinfo, reports,
                                             cacheCapacity, cacheUsed,
                                             xceiverCount, failedVolumes,
                                             volumeFailureSummary);
    
            // If we are in safemode, do not send back any recovery / replication
            // requests. Don't even drain the existing queue of work.
            if(namesystem.isInSafeMode()) {
              return new DatanodeCommand[0];
            }
    

    更新心跳的重要信息实质是调用了

    1.updateHeartbeat方法里面再调用-2.node.updateHeartbeat()-3.updateHeartbeatState()-》
    然后干活的关键代码是
         //TODO 更改存储的信息
        setCacheCapacity(cacheCapacity);
        setCacheUsed(cacheUsed);
        setXceiverCount(xceiverCount);
        //TODO 修改上一次的心跳时间。
        setLastUpdate(Time.now());
        setLastUpdateMonotonic(Time.monotonicNow());
        通过心跳来判断datanode节点是否存活?
        当前时间-上一次心跳时间>=15min就认为它不存活了
    

    总结

    到目前为止:
    1)hadoopRPC
    2)Namenode的启动流程
    3)Datanode(1.初始化 2:注册 3:心跳)

    detail process

    在这里插入图片描述

    接下来我们要分析:namenode是如何管理元数据。我们看源码,还是要进行场景驱动的方式。

    展开全文
  • NameNode与DataNode的工作原理剖析作者:尹正杰版权声明:原创作品,谢绝转载!否则将追究法律责任。一.HDFS写数据流程1>.客户端通过Distributed FileSystem模块向NameNode请求上传文件,NameNode检查目标文件...

    NameNode与DataNode的工作原理剖析

    作者:尹正杰

    版权声明:原创作品,谢绝转载!否则将追究法律责任。

    一.HDFS写数据流程

    1>.客户端通过Distributed FileSystem模块向NameNode请求上传文件,NameNode检查目标文件是否已存在,父目录是否存在。2>.NameNode返回是否可以上传。3>.客户端请求第一个 Block上传到哪几个DataNode服务器上。4>.NameNode返回3个DataNode节点,分别为DataNode1、DataNode2、DataNode3。5>.客户端通过FSDataOutputStream模块请求DataNode1上传数据,DataNode1收到请求会继续调用DataNode2,然后DataNode2调用DataNode3,将这个通信管道建立完成。6>.DataNode1、DataNode2、DataNode3逐级应答客户端。7>.客户端开始往DataNode1上传第一个Block(先从磁盘读取数据放到一个本地内存缓存),以Packet为单位,DataNode1收到一个Packet就会传给DataNode2,DataNode2传给DataNode3;DataNode1每传一个packet会放入一个应答队列等待应答。8>.当一个Block传输完成之后,客户端再次请求NameNode上传第二个Block的服务器。(重复执行3-7步)。

    问第一个问题:请详细说明上面的第5步,DFSOutputStream是基于什么为单位上传数据的呢?

    DFSOutputStream会将文件分割成packets数据包,然后将这些packets写到其内部的一个叫做data queue(数据队列)。data queue会向NameNode节点请求适合存储数据副本的DataNode节点的列表,然后这些DataNode之前生成一个Pipeline数据流管道,我们假设副本集参数被设置为3,那么这个数据流管道中就有三个DataNode节点。

    问第二个问题: 在写数据的过程中,如果Pipeline数据流管道中的一个DataNode节点写失败了会发生什问题、需要做哪些内部处理呢?

    首先,Pipeline数据流管道会被关闭,ack queue中的packets会被添加到data queue的前面以确保不会发生packets数据包的丢失;

    接着,在正常的DataNode节点上的以保存好的block的ID版本会升级——这样发生故障的DataNode节点上的block数据会在节点恢复正常后被删除,失效节点也会被从Pipeline中删除;

    最后,剩下的数据会被写入到Pipeline数据流管道中的其他两个节点中。

    如果Pipeline中的多个节点在写数据是发生失败,那么只要写成功的block的数量达到dfs.replication.min(默认为1,Hadoop2.9.2版本已经将其更名为dfs.namenode.replication.min),那么就任务是写成功的,然后NameNode后通过一步的方式将block复制到其他节点,最后事数据副本达到dfs.replication参数配置的个数。

    整个写流程如下:

    第一步:

    客户端调用DistributedFileSystem的create()方法,开始创建新文件:DistributedFileSystem创建DFSOutputStream,产生一个RPC调用,让NameNode在文件系统的命名空间中创建这一新文件;

    第二步:

    NameNode接收到用户的写文件的RPC请求后,首先要执行各种检查,如客户是否有相关的创佳权限和该文件是否已存在等,检查都通过后才会创建一个新文件,并将操作记录到编辑日志,然后DistributedFileSystem会将DFSOutputStream对象包装在FSDataOutStream实例中,返回客户端;否则文件创建失败并且给客户端抛IOException。

    第三步:

    客户端开始写文件:DFSOutputStream会将文件分割成packets数据包,然后将这些packets写到其内部的一个叫做data queue(数据队列)。data queue会向NameNode节点请求适合存储数据副本的DataNode节点的列表,然后这些DataNode之前生成一个Pipeline数据流管道,我们假设副本集参数被设置为3,那么这个数据流管道中就有三个DataNode节点。

    第四步:

    首先DFSOutputStream会将packets向Pipeline数据流管道中的第一个DataNode节点写数据,第一个DataNode接收packets然后把packets写向Pipeline中的第二个节点,同理,第二个节点保存接收到的数据然后将数据写向Pipeline中的第三个DataNode节点。

    第五步:

    DFSOutputStream内部同样维护另外一个内部的写数据确认队列——ack queue。当Pipeline中的第三个DataNode节点将packets成功保存后,该节点回向第二个DataNode返回一个确认数据写成功的信息,第二个DataNode接收到该确认信息后在当前节点数据写成功后也会向Pipeline中第一个DataNode节点发送一个确认数据写成功的信息,然后第一个节点在收到该信息后如果该节点的数据也写成功后,会将packets从ack queue中将数据删除。

    在写数据的过程中,如果Pipeline数据流管道中的一个DataNode节点写失败了会发生什问题、需要做哪些内部处理呢?如果这种情况发生,那么就会执行一些操作:

    首先,Pipeline数据流管道会被关闭,ack queue中的packets会被添加到data queue的最前面以确保不会发生packets数据包的丢失;

    接着,在正常的DataNode节点上的以保存好的block的ID版本会升级——这样发生故障的DataNode节点上的block数据会在节点恢复正常后被删除,失效节点也会被从Pipeline中删除;

    最后,剩下的数据会被写入到Pipeline数据流管道中的其他两个节点中。

    如果Pipeline中的多个节点在写数据是发生失败,那么只要写成功的block的数量达到dfs.replication.min(默认为1),那么就任务是写成功的,然后NameNode后通过一步的方式将block复制到其他节点,最后使数据副本达到dfs.replication参数配置的个数。 因此,我们不得不怀疑该机制是否会导致一定的数据重复呢?

    第六步:

    完成写操作后,客户端调用close()关闭写操作,刷新数据;

    第七步:

    在数据刷新完后NameNode后关闭写操作流。到此,整个写操作完成。

    参考链接:https://flyingdutchman.iteye.com/blog/1900536

    Client是如何向Hadoop的HDFS中写数据的详解版本(烧脑版的解释,推荐大数据开发工程师阅读,大数据运维了解即可!)

    上面我们说到的dfs.replication.min属性官方已经被更名为dfs.namenode.replication.min,因此我们直接去官方文档查阅dfs.replication.min肯能会查不到,更多参数变更请参考官方说明:http://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/DeprecatedProperties.html(不推荐使用的属性下表列出了此版本的Hadoop中不推荐使用的配置属性名称及其替换。)

    二.机架感知

    对于常见情况,当复制因子为3时,HDFS的放置策略是将一个副本放在本地机架中的一个节点上,另一个放在本地机架中的另一个节点上,最后一个放在不同机架中的另一个节点上。副本节点的选择大致为:1>.第一个副本在Client所处的节点上,如果客户端在集群外,随机选一个;2>.第二个副本和第一个副本位于相同机架,随机节点;3>.第三个部分位于不同几家,随机节点;

    感兴趣的小伙伴可以参考官方文档:

    参考一:http://hadoop.apache.org/docs/r2.9.2/hadoop-project-dist/hadoop-hdfs/HdfsDesign.html#Data_Replication参考二:http://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/RackAwareness.html

    三.HDFS读数据流程

    1>.客户端通过Distributed FileSystem向NameNode请求下载文件,NameNode通过查询元数据,找到文件块所在的DataNode地址。2>.挑选一台DataNode(就近原则,然后随机)服务器,请求读取数据。3>.DataNode开始传输数据给客户端(从磁盘里面读取数据输入流,以Packet为单位来做校验)。4>.客户端以Packet为单位接收,先在本地缓存,然后写入目标文件。

    四.DataNode工作原理

    1>.一个数据块在DataNode上以文件形式存储在磁盘上,包括两个文件,一个是数据本身,一个是元数据包括数据块的长度,块数据的校验和,以及时间戳。2>.DataNode启动后向NameNode注册,通过后,周期性(1小时)的向NameNode上报所有的块信息。3>.心跳是每3秒一次,心跳返回结果带有NameNode给该DataNode的命令如复制块数据到另一台机器,或删除某个数据块。如果超过10分钟没有收到某个DataNode的心跳,则认为该节点不可用。4>.集群运行中可以安全加入和退出一些机器。

    五.数据完整性

    1>.当DataNode读取block的时候,它会计算checksum;2>.如果计算后的checksum,与block创建时值不一样,说明block已经损坏;3>.client读取其他DataNode上的block;4>.datanode在其文件创建后周期验证checksum;

    六.掉线时限参数设置

    datanode进程死亡或者网络故障造成datanode无法与namenode通信,namenode不会立即把该节点判定为死亡,要经过一段时间,这段时间暂称作超时时长。HDFS默认的超时时长为10分钟+30秒。如果定义超时时间为timeout,则超时时长的计算公式为:“timeout = 2 * dfs.namenode.heartbeat.recheck-interval + 10 * dfs.heartbeat.interval ”。

    而默认的"dfs.namenode.heartbeat.recheck-interval"大小为5分钟,"dfs.heartbeat.interval"默认为3秒。需要注意的是hdfs-site.xml配置文件中"heartbeat.recheck.interval"的单位为毫秒,"dfs.heartbeat.interval"的单位为秒。

    [root@node101.yinzhengjie.org.cn ~]# cat /yinzhengjie/softwares/hadoop-2.9.2/etc/hadoop/hdfs-site.xml<?xml version="1.0" encoding="UTF-8"?>

    dfs.namenode.checkpoint.period

    3600

    dfs.namenode.name.dir

    /data/hadoop/hdfs/dfs/name

    dfs.replication

    2

    dfs.namenode.heartbeat.recheck-interval

    300000

    dfs.heartbeat.interval

    3

    [root@node101.yinzhengjie.org.cn~]#

    [root@node101.yinzhengjie.org.cn ~]# cat /yinzhengjie/softwares/hadoop-2.9.2/etc/hadoop/hdfs-site.xml

    七.DataNode的目录结构

    和NameNode不同的是,DataNode的存储目录是初始阶段自动创建的,不需要额外格式化。

    1>.查看DataNode目录下对应的版本号("${hadoop.tmp.dir}/dfs/data/current/VERSION")

    [root@node101.yinzhengjie.org.cn ~]# ll /data/hadoop/hdfs/dfs/data/current/total8drwx------. 4 root root 4096 Apr 12 18:44 BP-883662044-172.30.1.101-1555064443805

    -rw-r--r--. 1 root root 229 Apr 12 18:44VERSION

    [root@node101.yinzhengjie.org.cn~]#

    [root@node101.yinzhengjie.org.cn ~]# ll /data/hadoop/hdfs/dfs/data/current/

    [root@node101.yinzhengjie.org.cn ~]# cat /data/hadoop/hdfs/dfs/data/current/VERSION

    #Fri Apr12 18:44:23 CST 2019storageID=DS-e181274d-eace-44c1-b001-ac26fbfa3f8c       #存储id号

    clusterID=CID-e7603940-eaba-4ce6-9ecd-3a449027b432       #集群id,全局唯一

    cTime=0                                #标记了datanode存储系统的创建时间,对于刚刚格式化的存储系统,这个属性为0;但是在文件系统升级之后,改值会更新到新的时间戳。datanodeUuid=a7c28347-2816-47ee-a3f9-153d11e162bf       #datanode的唯一标识码

    storageType=DATA_NODE                        #存储类型

    layoutVersion=-57                          #一般情况下是一个负数,通常只有HDFS增加新特性时才会更新这个版本号。[root@node101.yinzhengjie.org.cn~]#

    [root@node101.yinzhengjie.org.cn~]#

    [root@node102.yinzhengjie.org.cn ~]# cat /data/hadoop/hdfs/dfs/data/current/VERSION

    #Fri Apr12 18:44:23 CST 2019storageID=DS-e181274d-eace-44c1-b001-ac26fbfa3f8c

    clusterID=CID-e7603940-eaba-4ce6-9ecd-3a449027b432

    cTime=0datanodeUuid=a7c28347-2816-47ee-a3f9-153d11e162bf

    storageType=DATA_NODE

    layoutVersion=-57[root@node102.yinzhengjie.org.cn~]#

    [root@node102.yinzhengjie.org.cn ~]# cat /data/hadoop/hdfs/dfs/data/current/VERSION

    [root@node103.yinzhengjie.org.cn ~]# cat /data/hadoop/hdfs/dfs/data/current/VERSION

    #Fri Apr12 18:44:23 CST 2019storageID=DS-e181274d-eace-44c1-b001-ac26fbfa3f8c

    clusterID=CID-e7603940-eaba-4ce6-9ecd-3a449027b432

    cTime=0datanodeUuid=a7c28347-2816-47ee-a3f9-153d11e162bf

    storageType=DATA_NODE

    layoutVersion=-57[root@node103.yinzhengjie.org.cn~]#

    [root@node103.yinzhengjie.org.cn ~]# cat /data/hadoop/hdfs/dfs/data/current/VERSION

    2>.查看DataNode目录下对应数据块的版本号("${hadoop.tmp.dir}/dfs/data/current/BP-*/current/VERSION")

    [root@node101.yinzhengjie.org.cn ~]# cat /data/hadoop/hdfs/dfs/data/current/BP-883662044-172.30.1.101-1555064443805/current/VERSION

    #Fri Apr12 18:44:23 CST 2019namespaceID=1161472027                     #是datanode首次访问namenode的时候从namenode处获取的storageID对每个datanode来说是唯一的(但对于单个datanode中所有存储目录来说则是相同的),namenode可以用这个属性来区分不同datanode。cTime=1555064443805                       #标记了datanode存储系统的创建时间,对于刚刚格式化的存储系统,这个属性为0;但是在文件系统升级之后,该值会更新到新的时间戳。blockpoolID=BP-883662044-172.30.1.101-1555064443805    #标识一个block pool,并且是跨集群的全局唯一。当一个新的NameSpace被创建的时候(format过程的一部分)会创建并持久化一个唯一ID,在创建过程构建全局唯一的BlockPoolID此人为的配置更可靠一些。NameNode将BlockPoolID持久化到磁盘中,在后续的启动过程中,会再次load并使用。layoutVersion=-57                        #改值是一个负整数。通常只有HDFS增加新特性时才会更新这个版本号。[root@node101.yinzhengjie.org.cn~]#

    [root@node101.yinzhengjie.org.cn~]#

    [root@node102.yinzhengjie.org.cn ~]# cat /data/hadoop/hdfs/dfs/data/current/BP-883662044-172.30.1.101-1555064443805/current/VERSION

    #Fri Apr12 18:44:23 CST 2019namespaceID=1161472027cTime=1555064443805blockpoolID=BP-883662044-172.30.1.101-1555064443805layoutVersion=-57[root@node102.yinzhengjie.org.cn~]#

    [root@node102.yinzhengjie.org.cn~]#

    [root@node102.yinzhengjie.org.cn ~]# cat /data/hadoop/hdfs/dfs/data/current/BP-883662044-172.30.1.101-1555064443805/current/VERSION

    [root@node103.yinzhengjie.org.cn ~]# cat /data/hadoop/hdfs/dfs/data/current/BP-883662044-172.30.1.101-1555064443805/current/VERSION

    #Fri Apr12 18:44:23 CST 2019namespaceID=1161472027cTime=1555064443805blockpoolID=BP-883662044-172.30.1.101-1555064443805layoutVersion=-57[root@node103.yinzhengjie.org.cn~]#

    [root@node103.yinzhengjie.org.cn ~]# cat /data/hadoop/hdfs/dfs/data/current/BP-883662044-172.30.1.101-1555064443805/current/VERSION

    八.DataNode多目录配置(我们在主节点做了任何修改后,最好同步到整个集群中去,否则可能会导致部分节点启动失败!)

    DataNode也可以配置成多个目录,每个目录存储的数据不一样。即:数据不是副本!切记,DataNode的多目录配置和NameNode的多目录配置效果是不一样的! NameNode配置多目录是为了把元数据存储多份,达到配置备份的目的。

    关于已经有数据的HDFS集群中,配置案例如下:

    [root@node101.yinzhengjie.org.cn ~]# cat /yinzhengjie/softwares/hadoop-2.9.2/etc/hadoop/core-site.xml<?xml version="1.0" encoding="UTF-8"?>

    fs.defaultFS

    hdfs://node101.yinzhengjie.org.cn:8020

    hadoop.tmp.dir

    /data/hadoop/hdfs

    [root@node101.yinzhengjie.org.cn~]#

    [root@node101.yinzhengjie.org.cn ~]# cat /yinzhengjie/softwares/hadoop-2.9.2/etc/hadoop/core-site.xml

    [root@node101.yinzhengjie.org.cn ~]# cat /yinzhengjie/softwares/hadoop-2.9.2/etc/hadoop/hdfs-site.xml<?xml version="1.0" encoding="UTF-8"?>

    dfs.namenode.checkpoint.period

    3600

    dfs.namenode.name.dir

    file:///${hadoop.tmp.dir}/dfs/namenode1,file:///${hadoop.tmp.dir}/dfs/namenode2,file:///${hadoop.tmp.dir}/dfs/namenode3

    dfs.datanode.data.dir

    file:///${hadoop.tmp.dir}/dfs/data1,file:///${hadoop.tmp.dir}/dfs/data2

    dfs.replication

    2

    dfs.namenode.heartbeat.recheck-interval

    300000

    dfs.heartbeat.interval

    3

    [root@node101.yinzhengjie.org.cn~]#

    [root@node101.yinzhengjie.org.cn ~]# cat /yinzhengjie/softwares/hadoop-2.9.2/etc/hadoop/hdfs-site.xml

    [root@node101.yinzhengjie.org.cn ~]# scp -r /yinzhengjie/softwares/hadoop-2.9.2/ node102.yinzhengjie.org.cn:/yinzhengjie/softwares/

    [root@node101.yinzhengjie.org.cn ~]# scp -r /yinzhengjie/softwares/hadoop-2.9.2/ node102.yinzhengjie.org.cn:/yinzhengjie/softwares/        #我们需要把配置同步到其他节点中

    [root@node101.yinzhengjie.org.cn ~]# scp -r /yinzhengjie/softwares/hadoop-2.9.2/ node103.yinzhengjie.org.cn:/yinzhengjie/softwares/

    [root@node101.yinzhengjie.org.cn ~]# scp -r /yinzhengjie/softwares/hadoop-2.9.2/ node103.yinzhengjie.org.cn:/yinzhengjie/softwares/

    [root@node101.yinzhengjie.org.cn ~]# cat /data/hadoop/hdfs/dfs/data1/current/VERSION

    #Mon Apr15 15:47:43 CST 2019storageID=DS-a29dd65a-de0e-44b1-b51b-5d537f0ab7f1

    clusterID=CID-377f58b3-a3a2-4ca7-bf72-7d47714cf9cd

    cTime=0datanodeUuid=d1d3a605-0218-42b9-9638-255343195296storageType=DATA_NODE

    layoutVersion=-57[root@node101.yinzhengjie.org.cn~]#

    [root@node101.yinzhengjie.org.cn ~]# cat /data/hadoop/hdfs/dfs/data1/current/VERSION                 #为什么我们说配置存储的多目录原因是数据是存储在不同的目录的并么有备份,因为他们的storageID不同!

    [root@node101.yinzhengjie.org.cn ~]# cat /data/hadoop/hdfs/dfs/data2/current/VERSION

    #Mon Apr15 15:47:43 CST 2019storageID=DS-9f2fa0b3-e9d7-4743-a9e8-ff2d81370200

    clusterID=CID-377f58b3-a3a2-4ca7-bf72-7d47714cf9cd

    cTime=0datanodeUuid=d1d3a605-0218-42b9-9638-255343195296storageType=DATA_NODE

    layoutVersion=-57[root@node101.yinzhengjie.org.cn~]#

    [root@node101.yinzhengjie.org.cn ~]# cat /data/hadoop/hdfs/dfs/data2/current/VERSION

    [root@node102.yinzhengjie.org.cn ~]# cat /data/hadoop/hdfs/dfs/data1/current/VERSION

    #Mon Apr15 15:47:43 CST 2019storageID=DS-053dada1-36dd-490b-a1d5-1a523bcfc6f3

    clusterID=CID-377f58b3-a3a2-4ca7-bf72-7d47714cf9cd

    cTime=0datanodeUuid=b43206d7-eb51-48b5-b269-6bd6502b5f9f

    storageType=DATA_NODE

    layoutVersion=-57[root@node102.yinzhengjie.org.cn~]#

    [root@node102.yinzhengjie.org.cn ~]# cat /data/hadoop/hdfs/dfs/data1/current/VERSION

    [root@node102.yinzhengjie.org.cn ~]# cat /data/hadoop/hdfs/dfs/data2/current/VERSION

    #Mon Apr15 15:47:43 CST 2019storageID=DS-28a3e682-3ae8-4ce2-abf5-6691b669ef1a

    clusterID=CID-377f58b3-a3a2-4ca7-bf72-7d47714cf9cd

    cTime=0datanodeUuid=b43206d7-eb51-48b5-b269-6bd6502b5f9f

    storageType=DATA_NODE

    layoutVersion=-57[root@node102.yinzhengjie.org.cn~]#

    [root@node102.yinzhengjie.org.cn ~]# cat /data/hadoop/hdfs/dfs/data2/current/VERSION

    [root@node103.yinzhengjie.org.cn ~]# cat /data/hadoop/hdfs/dfs/data1/current/VERSION

    #Mon Apr15 15:47:43 CST 2019storageID=DS-e55b7230-fc9b-4122-a19d-30cb5855d455

    clusterID=CID-377f58b3-a3a2-4ca7-bf72-7d47714cf9cd

    cTime=0datanodeUuid=ed3bea6a-f5cd-45e9-8302-6de2106ec863

    storageType=DATA_NODE

    layoutVersion=-57[root@node103.yinzhengjie.org.cn~]#

    [root@node103.yinzhengjie.org.cn ~]# cat /data/hadoop/hdfs/dfs/data1/current/VERSION

    [root@node103.yinzhengjie.org.cn ~]# cat /data/hadoop/hdfs/dfs/data2/current/VERSION

    #Mon Apr15 15:47:43 CST 2019storageID=DS-3d59f0c6-ebbf-4f3d-b470-256c01a200d4

    clusterID=CID-377f58b3-a3a2-4ca7-bf72-7d47714cf9cd

    cTime=0datanodeUuid=ed3bea6a-f5cd-45e9-8302-6de2106ec863

    storageType=DATA_NODE

    layoutVersion=-57[root@node103.yinzhengjie.org.cn~]#

    [root@node103.yinzhengjie.org.cn ~]# cat /data/hadoop/hdfs/dfs/data2/current/VERSION

    九.Hadoop的集群管理之服役和退役

    展开全文
  • 2021-11-28 DataNode启动

    2021-11-28 20:35:10
    源码基于hadoop-3.3.0 1 概述 DataNode类封装了整个数据节点逻辑的实现。 它通过DataStorage以及FsDatasetImpl管理着数据节点存储上的所有数据块,DataNode类还会通过流式接口对客户端和其他数据节点提供读数据块、 ...
  • DataNode在启动之后会周期性地向NameNode发送心跳,那么内部是如何实现的呢?
  • 1. 概述HDFS集群分为两大角色:NameNode、DataNode(Secondary NameNode)NameNode负责管理整个文件系统的元数据,记录存放在哪些datanode中,以及存放路径dataNode 负责管理用户的文件数据块文件会按照固定大小(block...
  • 当我动态添加一个Hadoop从节点的之后,出现了一个问题:[root@hadoop current]#hadoop-daemon.sh start datanodestarting datanode, logging to /usr/local/hadoop1.1/libexec/../logs/hadoop-root-datanode-hadoop....
  • 2021-12-03 DataNode注册与心跳

    千次阅读 2021-12-03 22:39:31
    本文基于hadoop-3.3.0 目录 1 概述 ...而注册的流程是在DataNode的构造函数中的startDataNode方法中调用refreshNamenode方法完成: blockPoolManager.refreshNamenodes(getConf()); 2 注册过程
  • 启动DataNode 提示Missing NameNode address start all没有报错,但是发现这NameNode的webUI上面DataNode没有挂上。 进入DataNode查看日志发现下面问题。 datanode 进程没有起来 NodeManager启动过一段时间退出了。...
  • 以及每一个路径(文件)所对应的block块信息(block的id,及所在的datanode服务器) (4)文件的各个block的存储管理由datanode节点承担 datanode是HDFS集群从节点,每一个block都可以在多个datanode上存储多个副本...
  • heartbeat.recheck.intervalname> 2000value> property> dfs.heartbeat.intervalname> 1value> property> 2、观察验证datanode功能 (1)上传一个文件,观察文件的block具体的物理存放情况: (2)在每一台datanode...
  • 二、DataNode 是如何向 NameNode 发送心跳的 我们从 hadoop 源码看 DataNode 是如何发送心跳的 1、从 DataNode 类的 main 方法开始 2、创建 DataNode 3、实例化 DataNode 4、创建实例 5、 new 了 DataNode 6、 这个...
  • 但实际上,Datanode还是需要保存一部分Datanode自身的元数据的, 这些元数据是通过Datanode磁盘存储上的一些文件和目录来保存的。 Datanode可以定义多个存储目录保存数据块,Datanode的多个存储目录存储的数据块并...
  • 启动./start-dfs.sh后jps发现没有datanode进程。查看日志2018-02-27 13:54:27,918 INFO org.apache.hadoop.ipc.Server: IPC Server Responder: starting2018-02-27 13:54:29,140 INFO org.apache.hadoop.hdfs.server...
  • 有时候我们start-dfs.sh启动了hadoop但是发现datanode进程不存在一、原因当我们使用hadoop namenode -format格式化namenode时,会在namenode数据文件夹(这个文件夹为自己配置文件中dfs.name.dir的路径)中保存一个...
  • 一、概述HDFS集群以Master-Slave模式运行,主要有两类节点:一个Namenode(即Master)和多个Datanode(即Slave)。HDFS Architecture: 二、NamenodeNamenode 管理者文件系统的Namespace。它维护着文件系统树(filesystem...
  • 前言:我们上一节看到了如何搭建和验证我们hadoop平台,在验证时我们使用jps命令查看进程,如下:DataNode、NameNode、SecondaryNameNode那么这些进程都具有哪些功能,这些功能是怎么服务于hadoop平台的,进程之间是...
  • 重新format namenode后,datanode无法正常启动 测试环境,由于测试需求,重新format namenode后,导致datanode无法正常启动。 1. 查看datanode日志,可以发现错误“Initialization failed for Block pool <...
  • DataNode作用概述 DataNode:就是Slave。NameNode下达命令,DataNode执行实际的操作。 存储实际的数据块 执行数据块的读/写操作 DataNode工作机制 一个数据块在DataNode上以文件形式存储在磁盘上,包括两个文件,...
  • FsDatasetImpl会通过持有一个 FsVolumeList 对象对 Datanode 上定义的所有存储目录下的数据块进行管理与操作,而 FsVolumeList 对象统一管理 Datanode 上定义的多个 FsVolumelmpl 对象。FsDatasetlmpl 还会持有一个...
  • 配置完impala后重启集群,使用jps后发现所有datanode都没启动。
  • 解决开启hadoop后datanode没有启动 查看机器上记录的datanode的日志文件,查看错误信息 2021-11-17 13:50:03,722 WARN org.apache.hadoop.hdfs.server.common.Storage: Failed to add storage directory [DISK]file:...
  • 然后再启动发现一个DataNode都没有启动 排除 查看日志发现 java.io.IOException:Incompatible clusterIDs in /home/storm/hadoop/dfs/data: namenode clusterID XXXX 日志上看,datanode的clusterID 和 namenode的...
  • Datanode工作原理

    2021-05-29 19:44:20
    首先要知道,datanode是用来存放block块信息的。 datanode启动的时候主动注册,namenode就知道集群中有哪些datanode 注册之后会上报block信息,告诉namenode存储了哪些block块及其信息信息 Datanode默认60分钟上报...
  • 如果NodeManager和Datanode运行在一个结点上,并且配置了相同的磁盘目录,那么单个磁盘写满,可能导致NodeManager处于unhealth状态,从而引起该结点无法运行yarn任务(原因可以从这篇文章中找答案)。 【处理和规避...
  • 启动停止HDFS的DataNode

    2021-01-05 14:06:59
    启动前提是已经设置好了Hadoop临时目录位置 ,如果没设置的话,去...启动DataNode: sh $HADOOP_HOME/sbin/hadoop-daemon.sh start datanode 停止DataNode: sh $HADOOP_HOME/sbin/hadoop-daemon.sh stop datanode ...
  • Datanode详解: 作用:提供真实文件数据的存储服务,是一个文件管理系统, 你可以类比于MySQL, mysql仅仅是一个数据库管理软件,它将数据转换为mysql坑理解的格式(eg: 表-->记录-->字段)然后存储在window/linux操作...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 96,605
精华内容 38,642
关键字:

datanode