DataNode心跳与块汇报流程
  GQ7psP7UJw7k 2023年11月14日 17 0

1. 背景

文件内容的变更往往意味着block信息的变更,在datanode中,变更的block会发送给namenode,namenode会更新block信息。本文将介绍datanode心跳流程和block块汇报流程。

2. DataNode心跳线程模型

在Hadoop Federation架构中,一般由一对Active/Standby NameNode为一组作为一个namespace,每个namenode有独属的block pool,block pool是该命名空间下block的集合。不同的namespace可以共用相同的datanode进行存储,在datanode中,每个namespace对于不同的目录,存储所属的block。如下所示,该集群有四个namespace,在每台dn中,就会创建四个对应的block pool目录:

Untitled.png

对于DataNode而言,会以block pool为单位和namenode汇报datanode信息。block pool由BlockPoolManager类管理。在启动DataNode时,会通过调用BlockPoolManager.doRefreshNamenodes在每台DataNode上,创建block pool/namespace对应的对象:BPOfferService。每个namespace对应一个BPOfferService,每个BPOfferService只负责向该Active/Standby NameNode发送心跳、块汇报信息。BPOfferService创建流程如下:

private void doRefreshNamenodes(
      Map<String, Map<String, InetSocketAddress>> addrMap,
      Map<String, Map<String, InetSocketAddress>> lifelineAddrMap)
      throws IOException {
    assert Thread.holdsLock(refreshNamenodesLock);
    
    Set<String> toRefresh = Sets.newLinkedHashSet();
    Set<String> toAdd = Sets.newLinkedHashSet();
    Set<String> toRemove;
    //获取dn中已经创建的nameserviceId,如果还有新的namespace没有创建,准备创建对应的BPOfferService
    synchronized (this) {
      // Step 1. For each of the new nameservices, figure out whether
      // it's an update of the set of NNs for an existing NS,
      // or an entirely new nameservice.
      for (String nameserviceId : addrMap.keySet()) {
        if (bpByNameserviceId.containsKey(nameserviceId)) {
          toRefresh.add(nameserviceId);
        } else {
          toAdd.add(nameserviceId);
        }
      }
      //如果nameserviceId不存在了,准备datanode中删掉对应的BPOfferService
      // Step 2. Any nameservices we currently have but are no longer present
      // need to be removed.
      toRemove = Sets.newHashSet(Sets.difference(
          bpByNameserviceId.keySet(), addrMap.keySet()));
      
   
      // Step 3. Start new nameservices
      if (!toAdd.isEmpty()) {
        LOG.info("Starting BPOfferServices for nameservices: " +
            Joiner.on(",").useForNull("<default>").join(toAdd));
      
        for (String nsToAdd : toAdd) {
          Map<String, InetSocketAddress> nnIdToAddr = addrMap.get(nsToAdd);
          Map<String, InetSocketAddress> nnIdToLifelineAddr =
              lifelineAddrMap.get(nsToAdd);
          ArrayList<InetSocketAddress> addrs =
              Lists.newArrayListWithCapacity(nnIdToAddr.size());
          ArrayList<InetSocketAddress> lifelineAddrs =
              Lists.newArrayListWithCapacity(nnIdToAddr.size());
          for (String nnId : nnIdToAddr.keySet()) {
            addrs.add(nnIdToAddr.get(nnId));
            lifelineAddrs.add(nnIdToLifelineAddr != null ?
                nnIdToLifelineAddr.get(nnId) : null);
          }
          //创建namespace对应的BPOfferService
          BPOfferService bpos = createBPOS(nsToAdd, addrs, lifelineAddrs);
          bpByNameserviceId.put(nsToAdd, bpos);
          offerServices.add(bpos);
        }
      }
      startAll();
    }

创建BPOfferService对象时,会记录Active/Standby NameNode,每个NameNode创建对应的BPServiceActor对象,实际上BPServiceActor会执行真正的心跳等操作:

BPOfferService(
      final String nameserviceId,
      List<InetSocketAddress> nnAddrs,
      List<InetSocketAddress> lifelineNnAddrs,
      DataNode dn) {
    Preconditions.checkArgument(!nnAddrs.isEmpty(),
        "Must pass at least one NN.");
    Preconditions.checkArgument(nnAddrs.size() == lifelineNnAddrs.size(),
        "Must pass same number of NN addresses and lifeline addresses.");
    this.nameserviceId = nameserviceId;
    this.dn = dn;
    //每个NameNode创建一个对应的BPServiceActor用于发送心跳等信息
    for (int i = 0; i < nnAddrs.size(); ++i) {
      this.bpServices.add(new BPServiceActor(nnAddrs.get(i),
          lifelineNnAddrs.get(i), this));
    }
  }

最终,执行BlockPoolManager.startAll方法,会启动每个BPOfferService中的所有BPServiceActor线程。要注意,不管是Active NameNode还是Standby NameNode,都会接受心跳信息,但是只有Active Namenode才能向DataNode发送相应指令:

synchronized void startAll() throws IOException {
    try {
      UserGroupInformation.getLoginUser().doAs(
          new PrivilegedExceptionAction<Object>() {
            @Override
            public Object run() throws Exception {
              for (BPOfferService bpos : offerServices) {
                //启动BPOfferService
                bpos.start();
              }
              return null;
            }
          });
    } catch (InterruptedException ex) {
      IOException ioe = new IOException();
      ioe.initCause(ex.getCause());
      throw ioe;
    }
  }

//启动BPServiceActor线程
class BPOfferService {
  void start() {
    for (BPServiceActor actor : bpServices) {
      actor.start();
    }
  }
}

3. DataNode心跳和块汇报流程

在BPServiceActor.run()线程中,会先与NameNode进行握手,再调用offerService方法进入心跳和块汇报流程:

connectToNNAndHandshake();

while (shouldRun()) {
        try {
          offerService();
        } catch (Exception ex) {
          LOG.error("Exception in BPOfferService for " + this, ex);
          sleepAndLogInterrupts(5000, "offering service");
        }
      }

offerService流程如下:

  1. scheduler.isHeartbeatDue计算当前时间是否可以开始进行心跳。
  2. scheduler.isBlockReportDue判断是否需要进行全量块汇报。
  3. sendHeartBeat发送心跳,并申请块汇报租约。
  4. sendIBRs发送增量块汇报。
  5. blockReport发送全量块汇报。
private void offerService() throws Exception {
    while (shouldRun()) {
      try {
        //当前时间
        final long startTime = scheduler.monotonicNow();

        //计算时间间隔,判断是否应该发送心跳
        final boolean sendHeartbeat = scheduler.isHeartbeatDue(startTime);
        HeartbeatResponse resp = null;
        if (sendHeartbeat) {
          //
          // All heartbeat messages include following info:
          // -- Datanode name
          // -- data transfer port
          // -- Total capacity
          // -- Bytes remaining
          //
          //fullBlockReportLeaseId为0表示当前BPServiceActor线程没有申请租约,如果要进行全量块汇报,就申请块汇报租约
          boolean requestBlockReportLease = (fullBlockReportLeaseId == 0) &&
                  scheduler.isBlockReportDue(startTime);
          if (!dn.areHeartbeatsDisabledForTests()) {
            //发送心跳
            resp = sendHeartBeat(requestBlockReportLease);
            assert resp != null;
            if (resp.getFullBlockReportLeaseId() != 0) {
              if (fullBlockReportLeaseId != 0) {
                LOG.warn(nnAddr + " sent back a full block report lease " +
                        "ID of 0x" +
                        Long.toHexString(resp.getFullBlockReportLeaseId()) +
                        ", but we already have a lease ID of 0x" +
                        Long.toHexString(fullBlockReportLeaseId) + ". " +
                        "Overwriting old lease ID.");
              }
              //获取块汇报租约ID
              fullBlockReportLeaseId = resp.getFullBlockReportLeaseId();
            }
            dn.getMetrics().addHeartbeat(scheduler.monotonicNow() - startTime);

            // If the state of this NN has changed (eg STANDBY->ACTIVE)
            // then let the BPOfferService update itself.
            //
            // Important that this happens before processCommand below,
            // since the first heartbeat to a new active might have commands
            // that we should actually process.
            bpos.updateActorStatesFromHeartbeat(
                this, resp.getNameNodeHaState());
            state = resp.getNameNodeHaState().getState();

            if (state == HAServiceState.ACTIVE) {
              handleRollingUpgradeStatus(resp);
            }

            long startProcessCommands = monotonicNow();
            if (!processCommand(resp.getCommands()))
              continue;
            long endProcessCommands = monotonicNow();
            if (endProcessCommands - startProcessCommands > 2000) {
              LOG.info("Took " + (endProcessCommands - startProcessCommands)
                  + "ms to process " + resp.getCommands().length
                  + " commands from NN");
            }
          }
        }
        //增量块汇报
        if (!dn.areIBRDisabledForTests() &&
            (ibrManager.sendImmediately()|| sendHeartbeat)) {
          ibrManager.sendIBRs(bpNamenode, bpRegistration,
              bpos.getBlockPoolId());
        }

        List<DatanodeCommand> cmds = null;
        boolean forceFullBr =
            scheduler.forceFullBlockReport.getAndSet(false);
        if (forceFullBr) {
          LOG.info("Forcing a full block report to " + nnAddr);
        }
        //全量块汇报
        if ((fullBlockReportLeaseId != 0) || forceFullBr) {
          cmds = blockReport(fullBlockReportLeaseId);
          fullBlockReportLeaseId = 0;
        }
        processCommand(cmds == null ? null : cmds.toArray(new DatanodeCommand[cmds.size()]));

        if (!dn.areCacheReportsDisabledForTests()) {
          DatanodeCommand cmd = cacheReport();
          processCommand(new DatanodeCommand[]{ cmd });
        }

        if (sendHeartbeat) {
          dn.getMetrics().addHeartbeatTotal(
              scheduler.monotonicNow() - startTime);
        }

        // There is no work to do;  sleep until hearbeat timer elapses, 
        // or work arrives, and then iterate again.
        //阻塞,等待下一次心跳事件到,就唤醒线程
        ibrManager.waitTillNextIBR(scheduler.getHeartbeatWaitTime());
      } //省略
    } // while (shouldRun())
  } // offerServicedfs.datanode.lifeline.interval.seconds

心跳时间判断:scheduler.isHeartbeatDue

读取dfs.heartbeat.interval配置,默认为3s,设置为心跳时间间隔。即没过3s,就向namenode发送心跳:

boolean isHeartbeatDue(long startTime) {
      return (nextHeartbeatTime - startTime <= 0);
    }

long scheduleNextHeartbeat() {
      // Numerical overflow is possible here and is okay.
      nextHeartbeatTime = monotonicNow() + heartbeatIntervalMs;
      scheduleNextLifeline(nextHeartbeatTime);
      return nextHeartbeatTime;
    }

全量块汇报时间判断:scheduler.isBlockReportDue

读取dfs.blockreport.intervalMsec配置,获取全量块汇报间隔,默认为6h。其中,fullBlockReportLeaseId表表是全量块汇报时的租约ID,全量块汇报时租约由HDFS-7923引入,它为了防止NameNode在处理全量块汇报时处理的请求过多导致callQueue过大,造成服务阻塞。块汇报时租约设置租约数量(默认6个)和租约时长(5min)降低NameNode压力。不过当集群块数量多时,也会导致FBR速度变慢:

  • dfs.namenode.max.full.block.report.leases=6
  • dfs.namenode.full.block.report.lease.length.ms=5L * 60L * 1000L

当fullBlockReportLeaseId时,表示全量块汇报还未执行完:

boolean requestBlockReportLease = (fullBlockReportLeaseId == 0) &&
                  scheduler.isBlockReportDue(startTime);

void scheduleNextBlockReport() {
      // If we have sent the first set of block reports, then wait a random
      // time before we start the periodic block reports.
      if (resetBlockReportTime) {
        nextBlockReportTime = monotonicNow() +
            ThreadLocalRandom.current().nextInt((int)(blockReportIntervalMs));
}

sendHeartBeat发送心跳

如下,发送datanode基础容量等信息给namenode:

HeartbeatResponse response = bpNamenode.sendHeartbeat(bpRegistration,
    reports,
    dn.getFSDataset().getCacheCapacity(),
    dn.getFSDataset().getCacheUsed(),
    dn.getXmitsInProgress(),
    dn.getXceiverCount(),
    numFailedVolumes,
    volumeFailureSummary,
    requestBlockReportLease,
    slowPeers,
    slowDisks);

sendIBRs发送增量块汇报

sendIBRs方法中,调用IncrementalBlockReportManager.generateIBRs中,从pendingIBRs中获取待汇报的block:

private synchronized StorageReceivedDeletedBlocks[] generateIBRs() {
    final List<StorageReceivedDeletedBlocks> reports
        = new ArrayList<>(pendingIBRs.size());
    for (Map.Entry<DatanodeStorage, PerStorageIBR> entry
        : pendingIBRs.entrySet()) {
      final PerStorageIBR perStorage = entry.getValue();

        // Send newly-received and deleted blockids to namenode
      final ReceivedDeletedBlockInfo[] rdbi = perStorage.removeAll();
      if (rdbi != null) {
        reports.add(new StorageReceivedDeletedBlocks(entry.getKey(), rdbi));
      }
    }

pendingIBRs的block信息是在datanode完成删除block操作、正在接受block、接受完成block等操作后,准备发送给namenode的:

void notifyNamenodeReceivedBlock(ExtendedBlock block, String delHint,
      String storageUuid, boolean isOnTransientStorage) {
    notifyNamenodeBlock(block, BlockStatus.RECEIVED_BLOCK, delHint,
        storageUuid, isOnTransientStorage);
  }

  void notifyNamenodeReceivingBlock(ExtendedBlock block, String storageUuid) {
    notifyNamenodeBlock(block, BlockStatus.RECEIVING_BLOCK, null, storageUuid,
        false);
  }

  void notifyNamenodeDeletedBlock(ExtendedBlock block, String storageUuid) {
    notifyNamenodeBlock(block, BlockStatus.DELETED_BLOCK, null, storageUuid,
        false);
  }

最终调用IncrementalBlockReportManager.addRDBI将块信息放到pendingIBRs中:

synchronized void addRDBI(ReceivedDeletedBlockInfo rdbi,
      DatanodeStorage storage) {
    // Make sure another entry for the same block is first removed.
    // There may only be one such entry.
    for (PerStorageIBR perStorage : pendingIBRs.values()) {
      if (perStorage.remove(rdbi.getBlock()) != null) {
        break;
      }
    }
    getPerStorageIBR(storage).put(rdbi);
  }

它读取blockreport.incremental.intervalMsec配置,默认为10min,为增量块汇报的间隔。

blockReport发送全量块汇报

DatanodeStorage可以理解为datanode的单个磁盘,扫描每个磁盘下的block文件,读取dfs.blockreport.split.threshold配置,默认为10w。每次发送10w个block信息给namenode:

List<DatanodeCommand> blockReport(long fullBrLeaseId) throws IOException {
    final ArrayList<DatanodeCommand> cmds = new ArrayList<DatanodeCommand>();

    
    StorageBlockReport reports[] =
        new StorageBlockReport[perVolumeBlockLists.size()];
    //
    for(Map.Entry<DatanodeStorage, BlockListAsLongs> kvPair : perVolumeBlockLists.entrySet()) {
      BlockListAsLongs blockList = kvPair.getValue();
      reports[i++] = new StorageBlockReport(kvPair.getKey(), blockList);
      totalBlockCount += blockList.getNumberOfBlocks();
    }

      if (totalBlockCount < dnConf.blockReportSplitThreshold) {
        // Below split threshold, send all reports in a single message.
        DatanodeCommand cmd = bpNamenode.blockReport(
            bpRegistration, bpos.getBlockPoolId(), reports,
            new BlockReportContext(1, 0, reportId, fullBrLeaseId));
        blockReportSizes.add(
            calculateBlockReportPBSize(useBlocksBuffer, reports));
        numRPCs = 1;
        numReportsSent = reports.length;
        if (cmd != null) {
          cmds.add(cmd);
        }
      } else {
        // Send one block report per message.
        for (int r = 0; r < reports.length; r++) {
          StorageBlockReport singleReport[] = { reports[r] };
          //每次发送10w个block给namenode
          DatanodeCommand cmd = bpNamenode.blockReport(
              bpRegistration, bpos.getBlockPoolId(), singleReport,
              new BlockReportContext(reports.length, r, reportId,
                  fullBrLeaseId));
          blockReportSizes.add(
              calculateBlockReportPBSize(useBlocksBuffer, singleReport));
          numReportsSent++;
          numRPCs++;
          if (cmd != null) {
            cmds.add(cmd);
          }
        }
      }
      success = true;
    //省略
  }

另外,BPServiceActor还会启动健康检测线程,用于向namenode汇报自己的健康存活信息。它读取dfs.datanode.lifeline.interval.seconds配置,默认是3*dfs.heartbeat.interval=9s,每9s向namenode发送健康状态:

while (shouldRun()) {
        try {
          if (lifelineNamenode == null) {
            lifelineNamenode = dn.connectToLifelineNN(lifelineNnAddr);
          }
          sendLifelineIfDue();
          Thread.sleep(scheduler.getLifelineWaitTime());
        } catch (InterruptedException e) {
          Thread.currentThread().interrupt();
        } catch (IOException e) {
          LOG.warn("IOException in LifelineSender for " + BPServiceActor.this,
              e);
        }
      }

4. NameNode处理心跳和块汇报流程

NameNode在在处理DataNode的请求时,最终调用DatanodeProtocol的registerDatanode、sendHeartbeat、blockReceivedAndDeleted、blockReport方法,它的实现是NameNodeRpcServer。

4.1 registerDatanode

registerDatanode负责将该datanode节点放入到网络拓扑中,最终执行HeartbeatManager.addDatanode方法:

/** Add a datanode. */
  void addDatanode(final DatanodeDescriptor node) {
    // To keep host2DatanodeMap consistent with datanodeMap,
    // remove  from host2DatanodeMap the datanodeDescriptor removed
    // from datanodeMap before adding node to host2DatanodeMap.
    
    //加到datanodeMap和 host2DatanodeMap中
    synchronized(datanodeMap) {
      host2DatanodeMap.remove(datanodeMap.put(node.getDatanodeUuid(), node));
    }

    //加到网络中
    networktopology.add(node); // may throw InvalidTopologyException
    host2DatanodeMap.add(node);
    checkIfClusterIsNowMultiRack(node);

    if (LOG.isDebugEnabled()) {
      LOG.debug(getClass().getSimpleName() + ".addDatanode: "
          + "node " + node + " is added to datanodeMap.");
    }
  }

4.2 sendHeartbeat

该方法最终执行DatanodeManager.handleHeartbeat()方法。心跳的具体流程如下:

  1. 先获取datanode的信息,判断是否允许连接(比如在exclude中),如果不允许的话,直接抛出异常。
  2. 判断是否注册过,如果没注册过,直接返回注册命令。
  3. 更新datanode的信息,主要就是更新DatanodeDescriptor中的信息,如使用空间,剩余空间等。
  4. 检查是否处于安全模式。
  5. 检查租约情况。
  6. 生成复制的命令。
  7. 生成删除的命令。
  8. 生成缓存相关的命令。
  9. 生成带宽相关的命令。
  10. 返回所有的命令。
/** Handle heartbeat from datanodes. */
  public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
      StorageReport[] reports, final String blockPoolId,
      long cacheCapacity, long cacheUsed, int xceiverCount, 
      int maxTransfers, int failedVolumes,
      VolumeFailureSummary volumeFailureSummary) throws IOException {
    synchronized (heartbeatManager) {
      synchronized (datanodeMap) {
        DatanodeDescriptor nodeinfo = null;
        try {
          //获取datanode的信息
          nodeinfo = getDatanode(nodeReg);
        } catch(UnregisteredNodeException e) {
          return new DatanodeCommand[]{RegisterCommand.REGISTER};
        }
        //是否允许连接
        // Check if this datanode should actually be shutdown instead. 
        if (nodeinfo != null && nodeinfo.isDisallowed()) {
          setDatanodeDead(nodeinfo);
          throw new DisallowedDatanodeException(nodeinfo);
        }

        //检查是否注册过
        if (nodeinfo == null || !nodeinfo.isRegistered()) {
          return new DatanodeCommand[]{RegisterCommand.REGISTER};
        }

        //更新datanode的信息,如使用空间,剩余空间等
        heartbeatManager.updateHeartbeat(nodeinfo, reports,
                                         cacheCapacity, cacheUsed,
                                         xceiverCount, failedVolumes,
                                         volumeFailureSummary);

        //是否处于安全模式
        // If we are in safemode, do not send back any recovery / replication
        // requests. Don't even drain the existing queue of work.
        if(namesystem.isInSafeMode()) {
          return new DatanodeCommand[0];
        }

         //检查租约情况
        //check lease recovery
        BlockInfoContiguousUnderConstruction[] blocks = nodeinfo
            .getLeaseRecoveryCommand(Integer.MAX_VALUE);
        if (blocks != null) {
          BlockRecoveryCommand brCommand = new BlockRecoveryCommand(
              blocks.length);
           .................................
          return new DatanodeCommand[] { brCommand };
        }

       
        final List<DatanodeCommand> cmds = new ArrayList<DatanodeCommand>();
        //生成复制命令
        //check pending replication
        List<BlockTargetPair> pendingList = nodeinfo.getReplicationCommand(
              maxTransfers);
        if (pendingList != null) {
          cmds.add(new BlockCommand(DatanodeProtocol.DNA_TRANSFER, blockPoolId,
              pendingList));
        }
        //检查无效的数据块,生成删除命令
        //check block invalidation
        Block[] blks = nodeinfo.getInvalidateBlocks(blockInvalidateLimit);
        if (blks != null) {
          cmds.add(new BlockCommand(DatanodeProtocol.DNA_INVALIDATE,
              blockPoolId, blks));
        }
        //生成缓存相关的命令
        boolean sendingCachingCommands = false;
        long nowMs = monotonicNow();
        if (shouldSendCachingCommands && 
            ((nowMs - nodeinfo.getLastCachingDirectiveSentTimeMs()) >=
                timeBetweenResendingCachingDirectivesMs)) {
          DatanodeCommand pendingCacheCommand =
              getCacheCommand(nodeinfo.getPendingCached(), nodeinfo,
                DatanodeProtocol.DNA_CACHE, blockPoolId);
          if (pendingCacheCommand != null) {
            cmds.add(pendingCacheCommand);
            sendingCachingCommands = true;
          }
          DatanodeCommand pendingUncacheCommand =
              getCacheCommand(nodeinfo.getPendingUncached(), nodeinfo,
                DatanodeProtocol.DNA_UNCACHE, blockPoolId);
          if (pendingUncacheCommand != null) {
            cmds.add(pendingUncacheCommand);
            sendingCachingCommands = true;
          }
          if (sendingCachingCommands) {
            nodeinfo.setLastCachingDirectiveSentTimeMs(nowMs);
          }
        }

        blockManager.addKeyUpdateCommand(cmds, nodeinfo);
         //生成带宽相关的命令
        // check for balancer bandwidth update
        if (nodeinfo.getBalancerBandwidth() > 0) {
          cmds.add(new BalancerBandwidthCommand(nodeinfo.getBalancerBandwidth()));
          // set back to 0 to indicate that datanode has been sent the new value
          nodeinfo.setBalancerBandwidth(0);
        }

        //返回所有的命令
        if (!cmds.isEmpty()) {
          return cmds.toArray(new DatanodeCommand[cmds.size()]);
        }
      }
    }

    return new DatanodeCommand[0];
  }

4.3 blockReceivedAndDeleted增量块汇报

NameNodeRpcServer.blockReceivedAndDeleted方法将块增量处理操作封装成为线程放入BlockManager的queue中:

public void blockReceivedAndDeleted(final DatanodeRegistration nodeReg,
      String poolId, StorageReceivedDeletedBlocks[] receivedAndDeletedBlocks)
          throws IOException {
   
    final BlockManager bm = namesystem.getBlockManager();
    for (final StorageReceivedDeletedBlocks r : receivedAndDeletedBlocks) {
      bm.enqueueBlockOp(new Runnable() {
        @Override
        public void run() {
          try {
            namesystem.processIncrementalBlockReport(nodeReg, r);
          } catch (Exception ex) {
            // usually because the node is unregistered/dead.  next heartbeat
            // will correct the problem
            blockStateChangeLog.error(
                "*BLOCK* NameNode.blockReceivedAndDeleted: "
                    + "failed from " + nodeReg + ": " + ex.getMessage());
          }
        }
      });
    }
  }

BlockManager中有BlockReportProcessingThread线程,该线程会不断从queue中获取线程并异步运行。

FSNamesystem.processIncrementalBlockReport()中,根据块不同的状态,NameNode进行不同的逻辑处理:

for (ReceivedDeletedBlockInfo rdbi : srdb.getBlocks()) {
      switch (rdbi.getStatus()) {
      case DELETED_BLOCK:
        removeStoredBlock(storageInfo, rdbi.getBlock(), node);
        deleted++;
        break;
      case RECEIVED_BLOCK:
        addBlock(storageInfo, rdbi.getBlock(), rdbi.getDelHints());
        received++;
        break;
      case RECEIVING_BLOCK:
        receiving++;
        processAndHandleReportedBlock(storageInfo, rdbi.getBlock(),
                                      ReplicaState.RBW, null);
        break;
      default:
        String msg = 
          "Unknown block status code reported by " + node +
          ": " + rdbi;
        blockLog.warn(msg);
        assert false : msg; // if assertions are enabled, throw.
        break;
      }

以removeStoredBlock为例,它负责在blocksMap中删掉对应块信息:

// 从blocksMap中移除块->元数据,块->datanode映射的信息。
      if (storedBlock == null || !blocksMap.removeNode(storedBlock, node)) {
        blockLog.debug("BLOCK* removeStoredBlock: {} has already been" +
            " removed from node {}", storedBlock, node);
        return;
      }

4.4 processReport全量块汇报

最终执行BlockManager.processReport处理全量块信息。通过reportDiff找到待添加的块,待删除的块,无效块,坏块,后续依次处理。

Collection<Block> processReport(
      final DatanodeStorageInfo storageInfo,
      final BlockListAsLongs report) throws IOException {
    // Normal case:
    // Modify the (block-->datanode) map, according to the difference
    // between the old and new block report.
    //
    Collection<BlockInfoToAdd> toAdd = new ArrayList<>();
    Collection<BlockInfo> toRemove = new HashSet<>();
    Collection<Block> toInvalidate = new ArrayList<>();
    Collection<BlockToMarkCorrupt> toCorrupt = new ArrayList<>();
    Collection<StatefulBlockInfo> toUC = new ArrayList<>();
    //对比,记录待添加的块,待删除的块,无效块,坏块,后续进行相应处理
    reportDiff(storageInfo, report,
                 toAdd, toRemove, toInvalidate, toCorrupt, toUC);

    DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
    // Process the blocks on each queue
    //UnderConstruction块记录
    for (StatefulBlockInfo b : toUC) { 
      addStoredBlockUnderConstruction(b, storageInfo);
    }
    //删除块
    for (BlockInfo b : toRemove) {
      removeStoredBlock(b, node);
    }
    int numBlocksLogged = 0;
    for (BlockInfoToAdd b : toAdd) {
      //增加块
      addStoredBlock(b.stored, b.reported, storageInfo, null,
          numBlocksLogged < maxNumBlocksToLog);
      numBlocksLogged++;
    }
    if (numBlocksLogged > maxNumBlocksToLog) {
      blockLog.info("BLOCK* processReport: logged info for {} of {} " +
          "reported.", maxNumBlocksToLog, numBlocksLogged);
    }
    for (Block b : toInvalidate) {
      //无效块
      addToInvalidates(b, node);
    }
    for (BlockToMarkCorrupt b : toCorrupt) {
      //损坏块
      markBlockAsCorrupt(b, storageInfo, node);
    }

    return toInvalidate;
  }
【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月14日 0

暂无评论

推荐阅读
GQ7psP7UJw7k
最新推荐 更多

2024-05-03