JournalNode元数据处理过程
  GQ7psP7UJw7k 2023年11月05日 52 0

1. 背景

在Hadoop2.x之前,只有一台NameNode负责对外提供服务,另外一台secondary NameNode只用于合并fsimage,不提供对外元数据服务。因此NameNode和secondary NameNode都存在单点问题。

为了解决secondary NameNode单点问题,HDFS引入多个JournalNode服务存储操作日志,取代单台secondary NameNode。JournalNode引入了Quorum机制:Active NameNode写EditLog时,除了向NameNode本地磁盘写操作日志,还会向所有JournalNode发送写请求。对于2N+1台JN组成的集群,最多可以容忍N个JN节点异常。

由于NameNode允许JournalNode半数写失败,在NameNode恢复editlog时,会先调用FSEditLog.recoverUnclosedStreams()方法让JournalNode 集群中各个节点上的 EditLog 达成一致。因此NameNode的元数据遵从最终一致性,而不是强一致性。元数据恢复期间,NameNode为安全模式,不可访问,因此HDFS是CP架构。

2. JournalNode概念

  1. fsimage:fsimage文件是Hadoop系统元数据的永久性检查点,包含了系统中所有文件的目录和文件inode序列化信息。
  2. edits:edits日志文件存放hadoop所有操作的日志信息,操作首先会被记录到edits文件中,定时合并为fsimage文件,在没有开启HA的情况下是由secondary nn来进行合并操作,开启HA的情况下是JournalNode节点来进行合并以及同步。
  3. epoch:epoch是paxos协议中的一个概念,可以用于标识active NameNode。每次NameNode切换时,epoch就会加1,每个请求都会向JournalNode携带epoch。一旦epoch小于JournalNode中维护的last-promised-epoch,说明该NameNode是切换之前的NameNode,此时集群发生了脑裂,禁止此NameNode向JournalNode发送editlog。
  4. txid:每个editlog都有对应的事务txid。
  5. segment:多个连续的txid事务存储到一个editlog文件中,只会有一个segment处于正在写的状态(Inprogress),而其他的segment文件则都处于写完关闭的状态(Finalized)。每当写一个新的segment时,会比较epoch是否比editlog维护的last-writer-epoch大,如果大就写入到segment文件中。
  6. committed-txid:journalNode记录己接受,正在处理的事务id。
  7. edits_$starttxid-$endtxid:已经写完关闭的segment文件,该文件记录了从开始事务ID到结束事务ID的连续的editlog信息。
  8. edits_inprogress_$lasttxid:当前正在写的segment文件,文件名中记录了开始事务ID。
  9. VERSION:记录集群的相关信息,包括命名空间ID,集群ID,创建时间等。

dfs.namenode.name.dir配置定义了namenode中editlog日志位置:

<property>
    <name>dfs.namenode.name.dir</name>
    <value>file:///disk1/dfs/name,file:///disk2/dfs/name</value>
  </property>

dfs.journalnode.edits.dir配置定义了journalNode中editlog日志位置:

<name>dfs.journalnode.edits.dir</name>
    <value>/home/data/jn</value>

namenode中editlog和fsimage如下所示:

Untitled.png

journalNode中editlog如下所示:

Untitled 1.png

3. NameNode操作日志写入JournalNode流程

3.1 客户端rpc请求到NameNode生成edit op操作过程

https://blog.51cto.com/u_15327484/8089923文章中,分析了客户端create请求时,NameNode会在内存的FSNamesystem文件系统树中更新对应的INodeFile对象。

NameNodeRpcServer.create方法在调用过程中,FSNamesystem.startFileInt方法先在HDFS文件系统树中创建文件INode,再通过getEditLog().logSync()记录操作日志:

private HdfsFileStatus startFileInt(String src,
      PermissionStatus permissions, String holder, String clientMachine,
      EnumSet<CreateFlag> flag, boolean createParent, short replication,
      long blockSize, CryptoProtocolVersion[] supportedVersions,
      String ecPolicyName, boolean logRetryCache) throws IOException {
      //开始在HDFS文件系统树中创建文件
      try {
        stat = FSDirWriteFileOp.startFile(this, iip, permissions, holder,
            clientMachine, flag, createParent, replication, blockSize, feInfo,
            toRemoveBlocks, shouldReplicate, ecPolicyName, logRetryCache);
    
    } finally {
      writeUnlock("create");
      // There might be transactions logged while trying to recover the lease.
      // They need to be sync'ed even when an exception was thrown.
      if (!skipSync) {
        //记录操作日志到editlog中
        getEditLog().logSync();
        if (toRemoveBlocks != null) {
          removeBlocks(toRemoveBlocks);
          toRemoveBlocks.clear();
        }
      }
    }

    return stat;
  }

在FSDirWriteFileOp.startFile方法中,先调用addFile方法在文件系统树中增加INodeFile,然后调用fsd.getEditLog().logOpenFile方法记录create请求对应的OpenFile类型的操作日志:

static HdfsFileStatus startFile(
      FSNamesystem fsn, INodesInPath iip,
      PermissionStatus permissions, String holder, String clientMachine,
      EnumSet<CreateFlag> flag, boolean createParent,
      short replication, long blockSize,
      FileEncryptionInfo feInfo, INode.BlocksMapUpdateInfo toRemoveBlocks,
      boolean shouldReplicate, String ecPolicyName, boolean logRetryEntry)
      throws IOException {
    //省略
    //在文件系统树中增加INodeFile
    if (parent != null) {
      iip = addFile(fsd, parent, iip.getLastLocalName(), permissions,
          replication, blockSize, holder, clientMachine, shouldReplicate,
          ecPolicyName);
      newNode = iip != null ? iip.getLastINode().asFile() : null;
    }
    //省略
    //在editlog中记录OpenFile类型的操作日志
    fsd.getEditLog().logOpenFile(src, newNode, overwrite, logRetryEntry);
    if (NameNode.stateChangeLog.isDebugEnabled()) {
      NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: added " +
          src + " inode " + newNode.getId() + " " + holder);
    }
    return FSDirStatAndListingOp.getFileInfo(fsd, iip, false, false);
  }

FSEditLog.logOpenFile方法创建了AddOp对象,记录了操作内容,最后将该对象内容通过logEdit方法记录到输出流中:

public void logOpenFile(String path, INodeFile newNode, boolean overwrite,
      boolean toLogRpcIds) {
    Preconditions.checkArgument(newNode.isUnderConstruction());
    PermissionStatus permissions = newNode.getPermissionStatus();
    //构建操作日志
    AddOp op = AddOp.getInstance(cache.get())
      .setInodeId(newNode.getId())
      .setPath(path)
      .setReplication(newNode.getFileReplication())
      .setModificationTime(newNode.getModificationTime())
      .setAccessTime(newNode.getAccessTime())
      .setBlockSize(newNode.getPreferredBlockSize())
      .setBlocks(newNode.getBlocks())
      .setPermissionStatus(permissions)
      .setClientName(newNode.getFileUnderConstructionFeature().getClientName())
      .setClientMachine(
          newNode.getFileUnderConstructionFeature().getClientMachine())
      .setOverwrite(overwrite)
      .setStoragePolicyId(newNode.getLocalStoragePolicyID())
      .setErasureCodingPolicyId(newNode.getErasureCodingPolicyID());

    AclFeature f = newNode.getAclFeature();
    if (f != null) {
      op.setAclEntries(AclStorage.readINodeLogicalAcl(newNode));
    }

    XAttrFeature x = newNode.getXAttrFeature();
    if (x != null) {
      op.setXAttrs(x.getXAttrs());
    }

    logRpcIds(op, toLogRpcIds);
    //输出
    logEdit(op);
  }

如下是logOpenFile对应的AddOp定义,当执行ClientProtocol.create和ClientProtocol.append时,会生成AddOp这种操作日志:

/**
   * {@literal @AtMostOnce} for {@link ClientProtocol#create} and
   * {@link ClientProtocol#append}
   */

FSEditLog.logEdit方法开始写入事务内容:

  1. 先调用beginTransaction开启事务,txid自增。
  2. 将AddOp操作内容写入到editLogStream输出流中。
  3. 调用logSync异步写入到磁盘和远程journalNode中。
void logEdit(final FSEditLogOp op) {
    boolean needsSync = false;
    //省略
      // check if it is time to schedule an automatic sync
      //以事务的方式写入edit
      needsSync = doEditTransaction(op);
      //省略
    // Sync the log if an automatic sync is required.
    if (needsSync) {
      logSync();
    }
  }

//开启事务
synchronized boolean doEditTransaction(final FSEditLogOp op) {
    //开启事务
    long start = beginTransaction();
    op.setTransactionId(txid);

    try {
      //将AddOp写入到editLogStream输出流中
      editLogStream.write(op);
    } catch (IOException ex) {
      // All journals failed, it is handled in logSync.
    } finally {
      op.reset();
    }
    endTransaction(start);
    return shouldForceSync();
  }

//开启事务时,将事务txid自增
private long beginTransaction() {
    assert Thread.holdsLock(this);
    // get a new transactionId
    txid++;

    //
    // record the transactionId when new data was written to the edits log
    //
    TransactionId id = myTransactionId.get();
    id.txid = txid;
    return monotonicNow();
  }

在editLogStream.write写入时,发现它写入的是bufCurrent:

public void writeOp(FSEditLogOp op, int logVersion) throws IOException {
  bufCurrent.writeOp(op, logVersion);
}

在logSync方法中:

  1. 判断edit是否还未同步完,并且正在同步,就等待1s再尝试同步。
  2. 如果已经同步完,就退出。
  3. 交换currentBuffer和ReadyBuffer的执行。
  4. 将ReadyBuffer内容写出到磁盘和journalNode中。

NameNode双缓冲Buffer架构如下所示:

Untitled 2.png

代码如下:

EditLogOutputStream logStream = null;
      synchronized (this) {
        try {
          printStatistics(false);
          //判断edit是否还未同步完,并且正在同步,就等待1s再尝试同步。
          // if somebody is already syncing, then wait
          while (mytxid > synctxid && isSyncRunning) {
            try {
              wait(1000);
            } catch (InterruptedException ie) {
            }
          }
  
          //
          // If this transaction was already flushed, then nothing to do
          //如果已经同步完,就退出。
          if (mytxid <= synctxid) {
            return;
          }

          // now, this thread will do the sync.  track if other edits were
          // included in the sync - ie. batched.  if this is the only edit
          // synced then the batched count is 0
          editsBatchedInSync = txid - synctxid - 1;
          syncStart = txid;
          isSyncRunning = true;
          sync = true;

          // swap buffers
          try {
            if (journalSet.isEmpty()) {
              throw new IOException("No journals available to flush");
            }
            //交换currentBuffer和ReadyBuffer的执行
            editLogStream.setReadyToFlush();
          } catch (IOException e) {
            final String msg =
                "Could not sync enough journals to persistent storage " +
                "due to " + e.getMessage() + ". " +
                "Unsynced transactions: " + (txid - synctxid);
            LOG.error(msg, new Exception());
            synchronized(journalSetLock) {
              IOUtils.cleanupWithLogger(LOG, journalSet);
            }
            terminate(1, msg);
          }
        } finally {
          // Prevent RuntimeException from blocking other log edit write 
          doneWithAutoSyncScheduling();
        }
        //editLogStream may become null,
        //so store a local variable for flush.
        logStream = editLogStream;
      }
      
      // do the sync
      long start = monotonicNow();
      try {
        if (logStream != null) {
          //将ReadyBuffer内容写出到磁盘和journalNode中
          logStream.flush();
        }
      } catch (IOException ex) {
        synchronized (this) {
          final String msg =
              "Could not sync enough journals to persistent storage. "
              + "Unsynced transactions: " + (txid - synctxid);
          LOG.error(msg, new Exception());
          synchronized(journalSetLock) {
            IOUtils.cleanupWithLogger(LOG, journalSet);
          }
          terminate(1, msg);
        }
      }

editLogStream.setReadyToFlush交换bufCurrent和bufReady的指针,准备将bufReady的数据写出:

public void setReadyToFlush() {
    assert isFlushed() : "previous data not flushed yet";
    TxnBuffer tmp = bufReady;
    bufReady = bufCurrent;
    bufCurrent = tmp;
  }

3.2 NameNode editlog落盘和写入journalnode过程

通过3.1节可以了解到,通过EditLogOutputStream.flush方法,NameNode将ReadyBuffer内容写出到磁盘和journalNode中。

EditLogOutputStream是一个抽象类,先看一下它的实现:

Untitled 3.png

其中:

  1. EditLogFileOutputStream:用于写本地磁盘。
  2. QuorumOutputStream:用于写JournalNode。
  3. JournalSetOutputStream:包装类,可以添加EditLogFileOutputStream和QuorumOutputStream对象,他会调用EditLogFileOutputStream和QuorumOutputStream的方法从而依次写本地磁盘和写JournalNode。

在NameNode调用startLogSegment按批次写入edits时,就指定editLogStream为JournalSetOutputStream:

editLogStream = journalSet.startLogSegment(segmentTxId, layoutVersion);

public EditLogOutputStream startLogSegment(final long txId,
      final int layoutVersion) throws IOException {
    mapJournalsAndReportErrors(new JournalClosure() {
      @Override
      public void apply(JournalAndStream jas) throws IOException {
        jas.startLogSegment(txId, layoutVersion);
      }
    }, "starting log segment " + txId);
    return new JournalSetOutputStream();
  }

回到logSync方法,最终调用JournalSet.mapJournalsAndReportErrors方法,它最终调用每个journals对象的getCurrentStream().flushAndSync方法:

protected void flushAndSync(final boolean durable) throws IOException {
      //调用mapJournalsAndReportErrors方法,注意定义了apply方法为jas.getCurrentStream().flushAndSync
      mapJournalsAndReportErrors(new JournalClosure() {
        @Override
        public void apply(JournalAndStream jas) throws IOException {
          if (jas.isActive()) {
            jas.getCurrentStream().flushAndSync(durable);
          }
        }
      }, "flushAndSync");
    }
//执行apply方法,即jas.getCurrentStream().flushAndSync
private void mapJournalsAndReportErrors(
      JournalClosure closure, String status) throws IOException{

    List<JournalAndStream> badJAS = Lists.newLinkedList();
    for (JournalAndStream jas : journals) {
      try {
        closure.apply(jas);
      //省略
  }

再查看journals如何初始化的。在NameNode启动时,调用initJournals,会在journals对象中添加FileJournalManager和QuorumJournalManager:

private synchronized void initJournals(List<URI> dirs) {
    int minimumRedundantJournals = conf.getInt(
        DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_MINIMUM_KEY,
        DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_MINIMUM_DEFAULT);

    synchronized(journalSetLock) {
      journalSet = new JournalSet(minimumRedundantJournals);

      for (URI u : dirs) {
        boolean required = FSNamesystem.getRequiredNamespaceEditsDirs(conf)
            .contains(u);
        if (u.getScheme().equals(NNStorage.LOCAL_URI_SCHEME)) {
          StorageDirectory sd = storage.getStorageDirectory(u);
          if (sd != null) {
            journalSet.add(new FileJournalManager(conf, sd, storage),
                required, sharedEditsDirs.contains(u));
          }
        } else {
          journalSet.add(createJournal(u), required,
              sharedEditsDirs.contains(u));
        }
      }
    }
 
    if (journalSet.isEmpty()) {
      LOG.error("No edits directories configured!");
    } 
  }

FileJournalManager和QuorumJournalManager都会管理自己的stream数据流,它们都会调用startLogSegment方法分别创建EditLogFileOutputStream和QuorumOutputStream写出:

public void startLogSegment(long txId, int layoutVersion) throws IOException {
      Preconditions.checkState(stream == null);
      disabled = false;
      stream = journal.startLogSegment(txId, layoutVersion);
    }

对于本地落盘过程,直接调用EditsDoubleBuffer.flushTo将bufReady写出到磁盘:

public void flushTo(OutputStream out) throws IOException {
    bufReady.writeTo(out); // write data to file
    bufReady.reset(); // erase all data in the buffer
  }

对于写入JournalNode过程,直接调用QuorumOutputStream.flushAndSync方法,通过AsyncLoggerSet发送edits:

QuorumCall<AsyncLogger, Void> qcall = loggers.sendEdits(
          segmentTxId, firstTxToFlush,
          numReadyTxns, data);

public QuorumCall<AsyncLogger, Void> sendEdits(
      long segmentTxId, long firstTxnId, int numTxns, byte[] data) {
    Map<AsyncLogger, ListenableFuture<Void>> calls = Maps.newHashMap();
    //每个AsyncLogger都发送一遍
    for (AsyncLogger logger : loggers) {
      ListenableFuture<Void> future = 
        logger.sendEdits(segmentTxId, firstTxnId, numTxns, data);
      calls.put(logger, future);
    }
    return QuorumCall.create(calls);
  }

AsyncLoggerSet中包含多个AsyncLogger,每个AsyncLogger对应一个journalNode,它用于向对应的JournalNode发送edits。如下是AsyncLogger初始化过程:

for (InetSocketAddress addr : addrs) {
      ret.add(factory.createLogger(conf, nsInfo, jid, nameServiceId, addr));
    }

AsyncLogger会启动一个线程提交该edits。线程哪构建journalNode的proxy:

ret = singleThreadExecutor.submit(new Callable<Void>() {
        @Override
        public Void call() throws IOException {
          throwIfOutOfSync();

          long rpcSendTimeNanos = System.nanoTime();
          try {
            getProxy().journal(createReqInfo(),
                segmentTxId, firstTxnId, numTxns, data);
          } catch (IOException e) {
            QuorumJournalManager.LOG.warn(
                "Remote journal " + IPCLoggerChannel.this + " failed to " +
                "write txns " + firstTxnId + "-" + (firstTxnId + numTxns - 1) +
                ". Will try to write to this JN again after the next " +
                "log roll.", e); 
            synchronized (IPCLoggerChannel.this) {
              outOfSync = true;
            }
            throw e;
          }

最后AsyncLoggerSet根据getMajoritySize()计算最多的journalNode失败数量,超过该数量就报错:

int getMajoritySize() {
    return loggers.size() / 2 + 1;
  }

if (q.countSuccesses() < majority) {
      q.rethrowException("Got too many exceptions to achieve quorum size " +
          getMajorityString());
    }

3.3 JournalNode落盘editlog流程

NameNode通过QJournalProtocol.journal请求JournalNode写入edits。JournalNode服务端会调用

JournalNodeRpcServer.journal方法进行处理。

JournalNodeRpcServer执行Journal.journal()方法:

public void journal(RequestInfo reqInfo,
      long segmentTxId, long firstTxnId,
      int numTxns, byte[] records) throws IOException {
    jn.getOrCreateJournal(reqInfo.getJournalId(), reqInfo.getNameServiceId())
       //执行Journal.journal()方法
       .journal(reqInfo, segmentTxId, firstTxnId, numTxns, records);
  }

在Journal.journal()方法中,EditLogFileOutputStream将edits写入curBuffer,setReadyToFlush将curBuffer和ReadyBuffer进行交换指针,ReadyBuffer写入磁盘中:

//curSegment为EditLogFileOutputStream
curSegment.writeRaw(records, 0, records.length);
    curSegment.setReadyToFlush();
    curSegment.flush(shouldFsync);

最终,JournalNode将会将edits发送给Standby Namenode,代码暂未定位到。

3.4 NameNode edits落盘与发送JouranlNode流程总结

流程图如下所示:

Untitled 4.png

4. Standby NameNode 加载最新edits过程

为了防止journalNode中存放过多的edits文件,在StandBy NameNode中,定期向jouranlNode获取edits文件,将edits文件合并成为fsimage,合并后的edits文件就可以进行删除。这个过程就是checkpoint。

NameNode启动时,都默认置为Standby状态。FSNamesystem启动startStandbyServices方法,启动EditLogTailer线程;如果要退出Standby状态,进入Active状态,就会停止EditLogTailer线程。这样就保证了只有Standby NameNode执行EditLogTailer。Standby NameNode同时会启动StandbyCheckpointer线程。

  • EditLogTailer线程:定时将最新的本地磁盘中的edits或journalnode应用到内存的fsimage中。
  • StandbyCheckpointer线程:将内存中的fsimage保存到本地磁盘中,然后发送给active NameNode。
void startStandbyServices(final Configuration conf, boolean isObserver)
      throws IOException {
    //省略
    //启动EditLogTailer
    editLogTailer = new EditLogTailer(this, conf);
    editLogTailer.start();

    //启动
    standbyCheckpointer = new StandbyCheckpointer(conf, this);
    standbyCheckpointer.start();
    //省略
  }

void stopStandbyServices() throws IOException {
   if (standbyCheckpointer != null) {
      standbyCheckpointer.stop();
    }
    if (editLogTailer != null) {
      editLogTailer.stop();
    }
  }

EditLogTailer实际上启动的是EditLogTailerThread线程。EditLogTailerThread先后调用doWork、doTailEdits方法,循环定期将edits更新内存中的fsimage:

public long doTailEdits() throws IOException, InterruptedException {
    // Write lock needs to be interruptible here because the 
    // transitionToActive RPC takes the write lock before calling
    // tailer.stop() -- so if we're not interruptible, it will
    // deadlock.
   
      //获取StandBy NameNode内存中fsimage
      FSImage image = namesystem.getFSImage();

     
      //省略
        //根据输入流加载edits
        editsLoaded = image.loadEdits(
            streams, namesystem, maxTxnsPerLock, null, null);
      //省略
    }
  }

如下,EditLogFileOutputStream和QuorumOutputStream都构建输入流。即从本地和journalNode中读取edits:

public void selectInputStreams(Collection<EditLogInputStream> streams,
      long fromTxId, boolean inProgressOk, boolean onlyDurableTxns) {
    final PriorityQueue<EditLogInputStream> allStreams = 
        new PriorityQueue<EditLogInputStream>(64,
            EDIT_LOG_INPUT_STREAM_COMPARATOR);
    for (JournalAndStream jas : journals) {
        jas.getManager().selectInputStreams(allStreams, fromTxId,
            inProgressOk, onlyDurableTxns);
    }
  }

FSImage调用loadEdits方法,依次从本地和journalNode加载最新edits:

for (EditLogInputStream editIn : editStreams) {
        //省略
        try {
          remainingReadTxns -= loader.loadFSEdits(editIn, lastAppliedTxId + 1,
                  remainingReadTxns, startOpt, recovery);
        //省略
      }

FSEditLogLoader.loadEditRecords方法从输入流中解析最新的FSEditLogOp,加载到文件系统树中:

try {
          FSEditLogOp op;
          try {
            op = in.readOp();
            if (op == null) {
              break;
            }
          }//省略
            long inodeId = applyEditLogOp(op, fsDir, startOpt,
                in.getVersion(true), lastInodeId);
            if (lastInodeId < inodeId) {
              lastInodeId = inodeId;
            }

加载最新editslog流程图如下所示:

Untitled 5.png

5. Standby NameNode checkpoint过程

Standby NameNode启动时,会启动CheckpointerThread线程。线程循环定期执行doCheckpoint(sendRequest)方法开始checkpoint。doCheckpoint方法中,先将fsimage信息保存到本地磁盘中,再将本地磁盘中的fsimage文件发送给active NameNode:

//将内存中的fsimage信息保存到本地磁盘中
FSImage img = namesystem.getFSImage();
img.saveNamespace(namesystem, imageType, canceler);
//将本地磁盘中的fsimage文件发送给active NameNode
List<Future<TransferFsImage.TransferResult>> uploads =
        new ArrayList<Future<TransferFsImage.TransferResult>>();
    for (final URL activeNNAddress : activeNNAddresses) {
      Future<TransferFsImage.TransferResult> upload =
          executor.submit(new Callable<TransferFsImage.TransferResult>() {
            @Override
            public TransferFsImage.TransferResult call()
                throws IOException, InterruptedException {
              CheckpointFaultInjector.getInstance().duringUploadInProgess();
              return TransferFsImage.uploadImageFromStorage(activeNNAddress, conf, namesystem
                  .getFSImage().getStorage(), imageType, txid, canceler);
            }
          });
      uploads.add(upload);
    }

首先看FSImage.saveNamespace方法,它保存fsimage到本地磁盘:

public synchronized void saveNamespace(FSNamesystem source, NameNodeFile nnf,
      Canceler canceler) throws IOException {
    //省略
        saveFSImageInAllDirs(source, nnf, imageTxId, canceler);
   //省略
  }

FSImage启动FSImageSaver线程,调用saveFSImage方法,将fsimage和md5信息写入到不同的文件中:

void saveFSImage(SaveNamespaceContext context, StorageDirectory sd,
      NameNodeFile dstType) throws IOException {
    long txid = context.getTxId();
    File newFile = NNStorage.getStorageFile(sd, NameNodeFile.IMAGE_NEW, txid);
    File dstFile = NNStorage.getStorageFile(sd, dstType, txid);
    
    FSImageFormatProtobuf.Saver saver = new FSImageFormatProtobuf.Saver(context);
    FSImageCompression compression = FSImageCompression.createCompression(conf);
    //将fsimage写入文件中
    long numErrors = saver.save(newFile, compression);
    if (numErrors > 0) {
      // The image is likely corrupted.
      LOG.error("Detected " + numErrors + " errors while saving FsImage " +
          dstFile);
      exitAfterSave.set(true);
    }
    //写入md5文件中
    MD5FileUtils.saveMD5File(dstFile, saver.getSavedDigest());
    storage.setMostRecentCheckpointInfo(txid, Time.now());
  }

保存完fsimage后,Standby Namenode会将在异步线程中调用TransferFsImage.uploadImageFromStorage方法将fsimage文件发送给active NameNode。

private static void uploadImage(URL url, Configuration conf,
      NNStorage storage, NameNodeFile nnf, long txId, Canceler canceler)
      throws IOException {

    File imageFile = storage.findImageFile(nnf, txId);
    if (imageFile == null) {
      throw new IOException("Could not find image with txid " + txId);
    }

    HttpURLConnection connection = null;
      //建立与Active NameNode的连接
      connection = (HttpURLConnection) connectionFactory.openConnection(
          urlWithParams, UserGroupInformation.isSecurityEnabled());
      // Set the request to PUT
      connection.setRequestMethod("PUT");
      connection.setDoOutput(true);

      //省略
      //将fsimage文件发送给Active NameNode
      // Write the file to output stream.
      writeFileToPutRequest(conf, connection, imageFile, canceler);

      //省略
  }

checkpoint流程如下所示:

Untitled 6.png

另外当NameNode从Standby状态切换到Active状态时,会加载本地的fsimage文件,还会增量读取journalnode中的最新editlog。它调用EditLogTailer.catchupDuringFailover方法,执行doTailEdits实现,而doTailEdits流程上述已经讲过:

public void catchupDuringFailover() throws IOException {
    Preconditions.checkState(tailerThread == null ||
        !tailerThread.isAlive(),
        "Tailer thread should not be running once failover starts");
    // Important to do tailing as the login user, in case the shared
    // edits storage is implemented by a JournalManager that depends
    // on security credentials to access the logs (eg QuorumJournalManager).
    SecurityUtil.doAsLoginUser(new PrivilegedExceptionAction<Void>() {
      @Override
      public Void run() throws Exception {
        try {
          // It is already under the full name system lock and the checkpointer
          // thread is already stopped. No need to acqure any other lock.
          doTailEdits();
        } catch (InterruptedException e) {
          throw new IOException(e);
        }
        return null;
      }
    });
  }
【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月08日 0

暂无评论

推荐阅读
GQ7psP7UJw7k
最新推荐 更多

2024-05-03