1. 背景
文件内容的变更往往意味着block信息的变更,在datanode中,变更的block会发送给namenode,namenode会更新block信息。本文将介绍datanode心跳流程和block块汇报流程。
2. DataNode心跳线程模型
在Hadoop Federation架构中,一般由一对Active/Standby NameNode为一组作为一个namespace,每个namenode有独属的block pool,block pool是该命名空间下block的集合。不同的namespace可以共用相同的datanode进行存储,在datanode中,每个namespace对于不同的目录,存储所属的block。如下所示,该集群有四个namespace,在每台dn中,就会创建四个对应的block pool目录:
对于DataNode而言,会以block pool为单位和namenode汇报datanode信息。block pool由BlockPoolManager类管理。在启动DataNode时,会通过调用BlockPoolManager.doRefreshNamenodes在每台DataNode上,创建block pool/namespace对应的对象:BPOfferService。每个namespace对应一个BPOfferService,每个BPOfferService只负责向该Active/Standby NameNode发送心跳、块汇报信息。BPOfferService创建流程如下:
private void doRefreshNamenodes(
Map<String, Map<String, InetSocketAddress>> addrMap,
Map<String, Map<String, InetSocketAddress>> lifelineAddrMap)
throws IOException {
assert Thread.holdsLock(refreshNamenodesLock);
Set<String> toRefresh = Sets.newLinkedHashSet();
Set<String> toAdd = Sets.newLinkedHashSet();
Set<String> toRemove;
//获取dn中已经创建的nameserviceId,如果还有新的namespace没有创建,准备创建对应的BPOfferService
synchronized (this) {
// Step 1. For each of the new nameservices, figure out whether
// it's an update of the set of NNs for an existing NS,
// or an entirely new nameservice.
for (String nameserviceId : addrMap.keySet()) {
if (bpByNameserviceId.containsKey(nameserviceId)) {
toRefresh.add(nameserviceId);
} else {
toAdd.add(nameserviceId);
}
}
//如果nameserviceId不存在了,准备datanode中删掉对应的BPOfferService
// Step 2. Any nameservices we currently have but are no longer present
// need to be removed.
toRemove = Sets.newHashSet(Sets.difference(
bpByNameserviceId.keySet(), addrMap.keySet()));
// Step 3. Start new nameservices
if (!toAdd.isEmpty()) {
LOG.info("Starting BPOfferServices for nameservices: " +
Joiner.on(",").useForNull("<default>").join(toAdd));
for (String nsToAdd : toAdd) {
Map<String, InetSocketAddress> nnIdToAddr = addrMap.get(nsToAdd);
Map<String, InetSocketAddress> nnIdToLifelineAddr =
lifelineAddrMap.get(nsToAdd);
ArrayList<InetSocketAddress> addrs =
Lists.newArrayListWithCapacity(nnIdToAddr.size());
ArrayList<InetSocketAddress> lifelineAddrs =
Lists.newArrayListWithCapacity(nnIdToAddr.size());
for (String nnId : nnIdToAddr.keySet()) {
addrs.add(nnIdToAddr.get(nnId));
lifelineAddrs.add(nnIdToLifelineAddr != null ?
nnIdToLifelineAddr.get(nnId) : null);
}
//创建namespace对应的BPOfferService
BPOfferService bpos = createBPOS(nsToAdd, addrs, lifelineAddrs);
bpByNameserviceId.put(nsToAdd, bpos);
offerServices.add(bpos);
}
}
startAll();
}
创建BPOfferService对象时,会记录Active/Standby NameNode,每个NameNode创建对应的BPServiceActor对象,实际上BPServiceActor会执行真正的心跳等操作:
BPOfferService(
final String nameserviceId,
List<InetSocketAddress> nnAddrs,
List<InetSocketAddress> lifelineNnAddrs,
DataNode dn) {
Preconditions.checkArgument(!nnAddrs.isEmpty(),
"Must pass at least one NN.");
Preconditions.checkArgument(nnAddrs.size() == lifelineNnAddrs.size(),
"Must pass same number of NN addresses and lifeline addresses.");
this.nameserviceId = nameserviceId;
this.dn = dn;
//每个NameNode创建一个对应的BPServiceActor用于发送心跳等信息
for (int i = 0; i < nnAddrs.size(); ++i) {
this.bpServices.add(new BPServiceActor(nnAddrs.get(i),
lifelineNnAddrs.get(i), this));
}
}
最终,执行BlockPoolManager.startAll方法,会启动每个BPOfferService中的所有BPServiceActor线程。要注意,不管是Active NameNode还是Standby NameNode,都会接受心跳信息,但是只有Active Namenode才能向DataNode发送相应指令:
synchronized void startAll() throws IOException {
try {
UserGroupInformation.getLoginUser().doAs(
new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
for (BPOfferService bpos : offerServices) {
//启动BPOfferService
bpos.start();
}
return null;
}
});
} catch (InterruptedException ex) {
IOException ioe = new IOException();
ioe.initCause(ex.getCause());
throw ioe;
}
}
//启动BPServiceActor线程
class BPOfferService {
void start() {
for (BPServiceActor actor : bpServices) {
actor.start();
}
}
}
3. DataNode心跳和块汇报流程
在BPServiceActor.run()线程中,会先与NameNode进行握手,再调用offerService方法进入心跳和块汇报流程:
connectToNNAndHandshake();
while (shouldRun()) {
try {
offerService();
} catch (Exception ex) {
LOG.error("Exception in BPOfferService for " + this, ex);
sleepAndLogInterrupts(5000, "offering service");
}
}
offerService流程如下:
- scheduler.isHeartbeatDue计算当前时间是否可以开始进行心跳。
- scheduler.isBlockReportDue判断是否需要进行全量块汇报。
- sendHeartBeat发送心跳,并申请块汇报租约。
- sendIBRs发送增量块汇报。
- blockReport发送全量块汇报。
private void offerService() throws Exception {
while (shouldRun()) {
try {
//当前时间
final long startTime = scheduler.monotonicNow();
//计算时间间隔,判断是否应该发送心跳
final boolean sendHeartbeat = scheduler.isHeartbeatDue(startTime);
HeartbeatResponse resp = null;
if (sendHeartbeat) {
//
// All heartbeat messages include following info:
// -- Datanode name
// -- data transfer port
// -- Total capacity
// -- Bytes remaining
//
//fullBlockReportLeaseId为0表示当前BPServiceActor线程没有申请租约,如果要进行全量块汇报,就申请块汇报租约
boolean requestBlockReportLease = (fullBlockReportLeaseId == 0) &&
scheduler.isBlockReportDue(startTime);
if (!dn.areHeartbeatsDisabledForTests()) {
//发送心跳
resp = sendHeartBeat(requestBlockReportLease);
assert resp != null;
if (resp.getFullBlockReportLeaseId() != 0) {
if (fullBlockReportLeaseId != 0) {
LOG.warn(nnAddr + " sent back a full block report lease " +
"ID of 0x" +
Long.toHexString(resp.getFullBlockReportLeaseId()) +
", but we already have a lease ID of 0x" +
Long.toHexString(fullBlockReportLeaseId) + ". " +
"Overwriting old lease ID.");
}
//获取块汇报租约ID
fullBlockReportLeaseId = resp.getFullBlockReportLeaseId();
}
dn.getMetrics().addHeartbeat(scheduler.monotonicNow() - startTime);
// If the state of this NN has changed (eg STANDBY->ACTIVE)
// then let the BPOfferService update itself.
//
// Important that this happens before processCommand below,
// since the first heartbeat to a new active might have commands
// that we should actually process.
bpos.updateActorStatesFromHeartbeat(
this, resp.getNameNodeHaState());
state = resp.getNameNodeHaState().getState();
if (state == HAServiceState.ACTIVE) {
handleRollingUpgradeStatus(resp);
}
long startProcessCommands = monotonicNow();
if (!processCommand(resp.getCommands()))
continue;
long endProcessCommands = monotonicNow();
if (endProcessCommands - startProcessCommands > 2000) {
LOG.info("Took " + (endProcessCommands - startProcessCommands)
+ "ms to process " + resp.getCommands().length
+ " commands from NN");
}
}
}
//增量块汇报
if (!dn.areIBRDisabledForTests() &&
(ibrManager.sendImmediately()|| sendHeartbeat)) {
ibrManager.sendIBRs(bpNamenode, bpRegistration,
bpos.getBlockPoolId());
}
List<DatanodeCommand> cmds = null;
boolean forceFullBr =
scheduler.forceFullBlockReport.getAndSet(false);
if (forceFullBr) {
LOG.info("Forcing a full block report to " + nnAddr);
}
//全量块汇报
if ((fullBlockReportLeaseId != 0) || forceFullBr) {
cmds = blockReport(fullBlockReportLeaseId);
fullBlockReportLeaseId = 0;
}
processCommand(cmds == null ? null : cmds.toArray(new DatanodeCommand[cmds.size()]));
if (!dn.areCacheReportsDisabledForTests()) {
DatanodeCommand cmd = cacheReport();
processCommand(new DatanodeCommand[]{ cmd });
}
if (sendHeartbeat) {
dn.getMetrics().addHeartbeatTotal(
scheduler.monotonicNow() - startTime);
}
// There is no work to do; sleep until hearbeat timer elapses,
// or work arrives, and then iterate again.
//阻塞,等待下一次心跳事件到,就唤醒线程
ibrManager.waitTillNextIBR(scheduler.getHeartbeatWaitTime());
} //省略
} // while (shouldRun())
} // offerServicedfs.datanode.lifeline.interval.seconds
心跳时间判断:scheduler.isHeartbeatDue
读取dfs.heartbeat.interval配置,默认为3s,设置为心跳时间间隔。即没过3s,就向namenode发送心跳:
boolean isHeartbeatDue(long startTime) {
return (nextHeartbeatTime - startTime <= 0);
}
long scheduleNextHeartbeat() {
// Numerical overflow is possible here and is okay.
nextHeartbeatTime = monotonicNow() + heartbeatIntervalMs;
scheduleNextLifeline(nextHeartbeatTime);
return nextHeartbeatTime;
}
全量块汇报时间判断:scheduler.isBlockReportDue
读取dfs.blockreport.intervalMsec配置,获取全量块汇报间隔,默认为6h。其中,fullBlockReportLeaseId表表是全量块汇报时的租约ID,全量块汇报时租约由HDFS-7923引入,它为了防止NameNode在处理全量块汇报时处理的请求过多导致callQueue过大,造成服务阻塞。块汇报时租约设置租约数量(默认6个)和租约时长(5min)降低NameNode压力。不过当集群块数量多时,也会导致FBR速度变慢:
- dfs.namenode.max.full.block.report.leases=6
- dfs.namenode.full.block.report.lease.length.ms=5L * 60L * 1000L
当fullBlockReportLeaseId时,表示全量块汇报还未执行完:
boolean requestBlockReportLease = (fullBlockReportLeaseId == 0) &&
scheduler.isBlockReportDue(startTime);
void scheduleNextBlockReport() {
// If we have sent the first set of block reports, then wait a random
// time before we start the periodic block reports.
if (resetBlockReportTime) {
nextBlockReportTime = monotonicNow() +
ThreadLocalRandom.current().nextInt((int)(blockReportIntervalMs));
}
sendHeartBeat发送心跳
如下,发送datanode基础容量等信息给namenode:
HeartbeatResponse response = bpNamenode.sendHeartbeat(bpRegistration,
reports,
dn.getFSDataset().getCacheCapacity(),
dn.getFSDataset().getCacheUsed(),
dn.getXmitsInProgress(),
dn.getXceiverCount(),
numFailedVolumes,
volumeFailureSummary,
requestBlockReportLease,
slowPeers,
slowDisks);
sendIBRs发送增量块汇报
sendIBRs方法中,调用IncrementalBlockReportManager.generateIBRs中,从pendingIBRs中获取待汇报的block:
private synchronized StorageReceivedDeletedBlocks[] generateIBRs() {
final List<StorageReceivedDeletedBlocks> reports
= new ArrayList<>(pendingIBRs.size());
for (Map.Entry<DatanodeStorage, PerStorageIBR> entry
: pendingIBRs.entrySet()) {
final PerStorageIBR perStorage = entry.getValue();
// Send newly-received and deleted blockids to namenode
final ReceivedDeletedBlockInfo[] rdbi = perStorage.removeAll();
if (rdbi != null) {
reports.add(new StorageReceivedDeletedBlocks(entry.getKey(), rdbi));
}
}
pendingIBRs的block信息是在datanode完成删除block操作、正在接受block、接受完成block等操作后,准备发送给namenode的:
void notifyNamenodeReceivedBlock(ExtendedBlock block, String delHint,
String storageUuid, boolean isOnTransientStorage) {
notifyNamenodeBlock(block, BlockStatus.RECEIVED_BLOCK, delHint,
storageUuid, isOnTransientStorage);
}
void notifyNamenodeReceivingBlock(ExtendedBlock block, String storageUuid) {
notifyNamenodeBlock(block, BlockStatus.RECEIVING_BLOCK, null, storageUuid,
false);
}
void notifyNamenodeDeletedBlock(ExtendedBlock block, String storageUuid) {
notifyNamenodeBlock(block, BlockStatus.DELETED_BLOCK, null, storageUuid,
false);
}
最终调用IncrementalBlockReportManager.addRDBI将块信息放到pendingIBRs中:
synchronized void addRDBI(ReceivedDeletedBlockInfo rdbi,
DatanodeStorage storage) {
// Make sure another entry for the same block is first removed.
// There may only be one such entry.
for (PerStorageIBR perStorage : pendingIBRs.values()) {
if (perStorage.remove(rdbi.getBlock()) != null) {
break;
}
}
getPerStorageIBR(storage).put(rdbi);
}
它读取blockreport.incremental.intervalMsec配置,默认为10min,为增量块汇报的间隔。
blockReport发送全量块汇报
DatanodeStorage可以理解为datanode的单个磁盘,扫描每个磁盘下的block文件,读取dfs.blockreport.split.threshold配置,默认为10w。每次发送10w个block信息给namenode:
List<DatanodeCommand> blockReport(long fullBrLeaseId) throws IOException {
final ArrayList<DatanodeCommand> cmds = new ArrayList<DatanodeCommand>();
StorageBlockReport reports[] =
new StorageBlockReport[perVolumeBlockLists.size()];
//
for(Map.Entry<DatanodeStorage, BlockListAsLongs> kvPair : perVolumeBlockLists.entrySet()) {
BlockListAsLongs blockList = kvPair.getValue();
reports[i++] = new StorageBlockReport(kvPair.getKey(), blockList);
totalBlockCount += blockList.getNumberOfBlocks();
}
if (totalBlockCount < dnConf.blockReportSplitThreshold) {
// Below split threshold, send all reports in a single message.
DatanodeCommand cmd = bpNamenode.blockReport(
bpRegistration, bpos.getBlockPoolId(), reports,
new BlockReportContext(1, 0, reportId, fullBrLeaseId));
blockReportSizes.add(
calculateBlockReportPBSize(useBlocksBuffer, reports));
numRPCs = 1;
numReportsSent = reports.length;
if (cmd != null) {
cmds.add(cmd);
}
} else {
// Send one block report per message.
for (int r = 0; r < reports.length; r++) {
StorageBlockReport singleReport[] = { reports[r] };
//每次发送10w个block给namenode
DatanodeCommand cmd = bpNamenode.blockReport(
bpRegistration, bpos.getBlockPoolId(), singleReport,
new BlockReportContext(reports.length, r, reportId,
fullBrLeaseId));
blockReportSizes.add(
calculateBlockReportPBSize(useBlocksBuffer, singleReport));
numReportsSent++;
numRPCs++;
if (cmd != null) {
cmds.add(cmd);
}
}
}
success = true;
//省略
}
另外,BPServiceActor还会启动健康检测线程,用于向namenode汇报自己的健康存活信息。它读取dfs.datanode.lifeline.interval.seconds配置,默认是3*dfs.heartbeat.interval=9s,每9s向namenode发送健康状态:
while (shouldRun()) {
try {
if (lifelineNamenode == null) {
lifelineNamenode = dn.connectToLifelineNN(lifelineNnAddr);
}
sendLifelineIfDue();
Thread.sleep(scheduler.getLifelineWaitTime());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (IOException e) {
LOG.warn("IOException in LifelineSender for " + BPServiceActor.this,
e);
}
}
4. NameNode处理心跳和块汇报流程
NameNode在在处理DataNode的请求时,最终调用DatanodeProtocol的registerDatanode、sendHeartbeat、blockReceivedAndDeleted、blockReport方法,它的实现是NameNodeRpcServer。
4.1 registerDatanode
registerDatanode负责将该datanode节点放入到网络拓扑中,最终执行HeartbeatManager.addDatanode方法:
/** Add a datanode. */
void addDatanode(final DatanodeDescriptor node) {
// To keep host2DatanodeMap consistent with datanodeMap,
// remove from host2DatanodeMap the datanodeDescriptor removed
// from datanodeMap before adding node to host2DatanodeMap.
//加到datanodeMap和 host2DatanodeMap中
synchronized(datanodeMap) {
host2DatanodeMap.remove(datanodeMap.put(node.getDatanodeUuid(), node));
}
//加到网络中
networktopology.add(node); // may throw InvalidTopologyException
host2DatanodeMap.add(node);
checkIfClusterIsNowMultiRack(node);
if (LOG.isDebugEnabled()) {
LOG.debug(getClass().getSimpleName() + ".addDatanode: "
+ "node " + node + " is added to datanodeMap.");
}
}
4.2 sendHeartbeat
该方法最终执行DatanodeManager.handleHeartbeat()方法。心跳的具体流程如下:
- 先获取datanode的信息,判断是否允许连接(比如在exclude中),如果不允许的话,直接抛出异常。
- 判断是否注册过,如果没注册过,直接返回注册命令。
- 更新datanode的信息,主要就是更新DatanodeDescriptor中的信息,如使用空间,剩余空间等。
- 检查是否处于安全模式。
- 检查租约情况。
- 生成复制的命令。
- 生成删除的命令。
- 生成缓存相关的命令。
- 生成带宽相关的命令。
- 返回所有的命令。
/** Handle heartbeat from datanodes. */
public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
StorageReport[] reports, final String blockPoolId,
long cacheCapacity, long cacheUsed, int xceiverCount,
int maxTransfers, int failedVolumes,
VolumeFailureSummary volumeFailureSummary) throws IOException {
synchronized (heartbeatManager) {
synchronized (datanodeMap) {
DatanodeDescriptor nodeinfo = null;
try {
//获取datanode的信息
nodeinfo = getDatanode(nodeReg);
} catch(UnregisteredNodeException e) {
return new DatanodeCommand[]{RegisterCommand.REGISTER};
}
//是否允许连接
// Check if this datanode should actually be shutdown instead.
if (nodeinfo != null && nodeinfo.isDisallowed()) {
setDatanodeDead(nodeinfo);
throw new DisallowedDatanodeException(nodeinfo);
}
//检查是否注册过
if (nodeinfo == null || !nodeinfo.isRegistered()) {
return new DatanodeCommand[]{RegisterCommand.REGISTER};
}
//更新datanode的信息,如使用空间,剩余空间等
heartbeatManager.updateHeartbeat(nodeinfo, reports,
cacheCapacity, cacheUsed,
xceiverCount, failedVolumes,
volumeFailureSummary);
//是否处于安全模式
// If we are in safemode, do not send back any recovery / replication
// requests. Don't even drain the existing queue of work.
if(namesystem.isInSafeMode()) {
return new DatanodeCommand[0];
}
//检查租约情况
//check lease recovery
BlockInfoContiguousUnderConstruction[] blocks = nodeinfo
.getLeaseRecoveryCommand(Integer.MAX_VALUE);
if (blocks != null) {
BlockRecoveryCommand brCommand = new BlockRecoveryCommand(
blocks.length);
.................................
return new DatanodeCommand[] { brCommand };
}
final List<DatanodeCommand> cmds = new ArrayList<DatanodeCommand>();
//生成复制命令
//check pending replication
List<BlockTargetPair> pendingList = nodeinfo.getReplicationCommand(
maxTransfers);
if (pendingList != null) {
cmds.add(new BlockCommand(DatanodeProtocol.DNA_TRANSFER, blockPoolId,
pendingList));
}
//检查无效的数据块,生成删除命令
//check block invalidation
Block[] blks = nodeinfo.getInvalidateBlocks(blockInvalidateLimit);
if (blks != null) {
cmds.add(new BlockCommand(DatanodeProtocol.DNA_INVALIDATE,
blockPoolId, blks));
}
//生成缓存相关的命令
boolean sendingCachingCommands = false;
long nowMs = monotonicNow();
if (shouldSendCachingCommands &&
((nowMs - nodeinfo.getLastCachingDirectiveSentTimeMs()) >=
timeBetweenResendingCachingDirectivesMs)) {
DatanodeCommand pendingCacheCommand =
getCacheCommand(nodeinfo.getPendingCached(), nodeinfo,
DatanodeProtocol.DNA_CACHE, blockPoolId);
if (pendingCacheCommand != null) {
cmds.add(pendingCacheCommand);
sendingCachingCommands = true;
}
DatanodeCommand pendingUncacheCommand =
getCacheCommand(nodeinfo.getPendingUncached(), nodeinfo,
DatanodeProtocol.DNA_UNCACHE, blockPoolId);
if (pendingUncacheCommand != null) {
cmds.add(pendingUncacheCommand);
sendingCachingCommands = true;
}
if (sendingCachingCommands) {
nodeinfo.setLastCachingDirectiveSentTimeMs(nowMs);
}
}
blockManager.addKeyUpdateCommand(cmds, nodeinfo);
//生成带宽相关的命令
// check for balancer bandwidth update
if (nodeinfo.getBalancerBandwidth() > 0) {
cmds.add(new BalancerBandwidthCommand(nodeinfo.getBalancerBandwidth()));
// set back to 0 to indicate that datanode has been sent the new value
nodeinfo.setBalancerBandwidth(0);
}
//返回所有的命令
if (!cmds.isEmpty()) {
return cmds.toArray(new DatanodeCommand[cmds.size()]);
}
}
}
return new DatanodeCommand[0];
}
4.3 blockReceivedAndDeleted增量块汇报
NameNodeRpcServer.blockReceivedAndDeleted方法将块增量处理操作封装成为线程放入BlockManager的queue中:
public void blockReceivedAndDeleted(final DatanodeRegistration nodeReg,
String poolId, StorageReceivedDeletedBlocks[] receivedAndDeletedBlocks)
throws IOException {
final BlockManager bm = namesystem.getBlockManager();
for (final StorageReceivedDeletedBlocks r : receivedAndDeletedBlocks) {
bm.enqueueBlockOp(new Runnable() {
@Override
public void run() {
try {
namesystem.processIncrementalBlockReport(nodeReg, r);
} catch (Exception ex) {
// usually because the node is unregistered/dead. next heartbeat
// will correct the problem
blockStateChangeLog.error(
"*BLOCK* NameNode.blockReceivedAndDeleted: "
+ "failed from " + nodeReg + ": " + ex.getMessage());
}
}
});
}
}
BlockManager中有BlockReportProcessingThread线程,该线程会不断从queue中获取线程并异步运行。
FSNamesystem.processIncrementalBlockReport()中,根据块不同的状态,NameNode进行不同的逻辑处理:
for (ReceivedDeletedBlockInfo rdbi : srdb.getBlocks()) {
switch (rdbi.getStatus()) {
case DELETED_BLOCK:
removeStoredBlock(storageInfo, rdbi.getBlock(), node);
deleted++;
break;
case RECEIVED_BLOCK:
addBlock(storageInfo, rdbi.getBlock(), rdbi.getDelHints());
received++;
break;
case RECEIVING_BLOCK:
receiving++;
processAndHandleReportedBlock(storageInfo, rdbi.getBlock(),
ReplicaState.RBW, null);
break;
default:
String msg =
"Unknown block status code reported by " + node +
": " + rdbi;
blockLog.warn(msg);
assert false : msg; // if assertions are enabled, throw.
break;
}
以removeStoredBlock为例,它负责在blocksMap中删掉对应块信息:
// 从blocksMap中移除块->元数据,块->datanode映射的信息。
if (storedBlock == null || !blocksMap.removeNode(storedBlock, node)) {
blockLog.debug("BLOCK* removeStoredBlock: {} has already been" +
" removed from node {}", storedBlock, node);
return;
}
4.4 processReport全量块汇报
最终执行BlockManager.processReport处理全量块信息。通过reportDiff找到待添加的块,待删除的块,无效块,坏块,后续依次处理。
Collection<Block> processReport(
final DatanodeStorageInfo storageInfo,
final BlockListAsLongs report) throws IOException {
// Normal case:
// Modify the (block-->datanode) map, according to the difference
// between the old and new block report.
//
Collection<BlockInfoToAdd> toAdd = new ArrayList<>();
Collection<BlockInfo> toRemove = new HashSet<>();
Collection<Block> toInvalidate = new ArrayList<>();
Collection<BlockToMarkCorrupt> toCorrupt = new ArrayList<>();
Collection<StatefulBlockInfo> toUC = new ArrayList<>();
//对比,记录待添加的块,待删除的块,无效块,坏块,后续进行相应处理
reportDiff(storageInfo, report,
toAdd, toRemove, toInvalidate, toCorrupt, toUC);
DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
// Process the blocks on each queue
//UnderConstruction块记录
for (StatefulBlockInfo b : toUC) {
addStoredBlockUnderConstruction(b, storageInfo);
}
//删除块
for (BlockInfo b : toRemove) {
removeStoredBlock(b, node);
}
int numBlocksLogged = 0;
for (BlockInfoToAdd b : toAdd) {
//增加块
addStoredBlock(b.stored, b.reported, storageInfo, null,
numBlocksLogged < maxNumBlocksToLog);
numBlocksLogged++;
}
if (numBlocksLogged > maxNumBlocksToLog) {
blockLog.info("BLOCK* processReport: logged info for {} of {} " +
"reported.", maxNumBlocksToLog, numBlocksLogged);
}
for (Block b : toInvalidate) {
//无效块
addToInvalidates(b, node);
}
for (BlockToMarkCorrupt b : toCorrupt) {
//损坏块
markBlockAsCorrupt(b, storageInfo, node);
}
return toInvalidate;
}