Apache Ratis在Alluxio中应用
  GQ7psP7UJw7k 2023年11月19日 14 0

1. 背景

在alluxio1.8中,alluxio master只支持单节点部署,一旦挂掉,整个集群将不可用。alluxio 2.x后,提供了高可用方案:Alluxio组件中嵌入Apache Ratis代码,由Ratis负责选举leader,Alluxio的各个master在同步edit log时,由Ratis提供edit log的一致性传输。

Ratis服务基于Raft共识算法,该算法保证分布式集群中只有一个leader master对外提供服务,其他standby master在leader master退出时再竞争成为master。

本文会先后介绍Raft算法框架,不会详解琢磨细节,同时介绍Raft在Apache Ratis中的应用,最后介绍Alluxio中使用Apache Ratis进行选举和更新edit log流程。

2. Raft算法介绍

Raft算法起始于一篇论文:https://raft.github.io/raft.pdf,它有一个比较好的中文解读:

http://arthurchiao.art/blog/raft-paper-zh/#71-snapshot。有另外一种共识算法PAXOS,它比较复杂,难以实现,因此Apache Ratis项目选择基于Raft算法进行实现。

Raft算法是一致性算法,它主要负责两大功能:

  1. 在奇数个master中选举出来一个master作为leader。
  2. leader master接受客户端op操作,将op信息发送给standby master中保存。

2.1 Raft选举流程

在实现Raft算法的集群中,每个节点会有三种状态,出现不同的时间后,状态会进行转换。这种转换过程就是状态机:

Untitled.png

上述状态机表达的意思如下所示:

  1. Follower 只会响应来自其他节点的请求;如果一个 follower 某段时间内收不到 leader 的请求,它会变成一个 candidate 然后发起一轮选举。
  2. 获得大多数选票的 candidate 将成为新的 leader,其他candidate变成follower。
  3. 通常情况下,除非发生故障,否则在任的 leader 会持续担任下去。

在选举过程中,每个master都有一个任期(item),任期大的master会获得投票,超过半数票就转为leader。重新选举时,每个master都会增大iterm,以增加选举leader的成功率。

2.2 Raft日志传输流程

在分布式系统中,每次变更都会由leader发送给follower。每个变更都以log日志的方式持久化存储到各个master机器中。在Raft算法中,为了保证系统的性能,只要半数以上的节点中log一致,同步操作才算结束。

Raft算法会将log应用到内存中,使得master切换时,内存中已经存在大部分数据,减少切换时间。状态机会定义log应用到内存的接口。

Raft日志传输流程如下所示:

  1. 客户端向Leader发送请求,变更分布式系统中存储的值。
  2. Leader的共识模块负责将变更操作持久化到本地磁盘,发送给follower机器持久化到磁盘中。
  3. 当半数以上持久化操作成功后,leader将log应用到状态机中。
  4. 将结果返回给用户。

Untitled 1.png

每个master中都会保存每个log文件,为了避免文件数不断膨胀,follower master会定期从内存中生成snapshot保存到磁盘上,snapshot中有最新的事务ID,删除log文件列表中小于该事务ID的文件。

由于生成snapshot时,状态机无法更新,leader master一般不会进行snapshot,除非手动操作。正常情况下,leader都是从其他follower机器中下载snapshot文件的。

3. Raft算法和HDFS高可用区别

  1. HDFS高可用基于zookeeper的临时节点记录,而Raft算法则是每个master间直接进行选举。
  2. follower既接收log文件,又负责checkpoint,还负责将snapshot发送给leader,它的作用类似于Journal Node+Standby NameNode。Raft中的快照等价于HDFS中的fsimage,Raft中的log等价于HDFS中的edit log。

3. Apache Ratis介绍

3.1 Ratis选举流程

Ratis项目中,定义了RaftServerProtocol,用于定义RaftServer间投票的rpc接口:

public interface RaftServerProtocol {
  enum Op {REQUEST_VOTE, APPEND_ENTRIES, INSTALL_SNAPSHOT}

  RequestVoteReplyProto requestVote(RequestVoteRequestProto request) throws IOException;

  AppendEntriesReplyProto appendEntries(AppendEntriesRequestProto request) throws IOException;

  InstallSnapshotReplyProto installSnapshot(InstallSnapshotRequestProto request) throws IOException;

  StartLeaderElectionReplyProto startLeaderElection(StartLeaderElectionRequestProto request) throws IOException;
}

RaftServerImpl类实现了该接口,当RaftServer接收到startLeaderElection时,开始选举:

class RaftServerImpl implements RaftServer.Division,
    RaftServerProtocol, RaftServerAsynchronousProtocol,
    RaftClientProtocol, RaftClientAsynchronousProtocol{

  public StartLeaderElectionReplyProto startLeaderElection(StartLeaderElectionRequestProto request) throws IOException {
    final RaftRpcRequestProto r = request.getServerRequest();
    final RaftPeerId leaderId = RaftPeerId.valueOf(r.getRequestorId());
    final RaftGroupId leaderGroupId = ProtoUtils.toRaftGroupId(r.getRaftGroupId());
    final TermIndex leaderLastEntry = TermIndex.valueOf(request.getLeaderLastEntry());

    CodeInjectionForTesting.execute(START_LEADER_ELECTION, getId(), leaderId, request);

    //省略
      //开始选举
      changeToCandidate(true);
      return ServerProtoUtils.toStartLeaderElectionReplyProto(leaderId, getMemberId(), true);
    }
  }
}

后续到RoleInfo.startLeaderElection启动LeaderElection线程:

void startLeaderElection(RaftServerImpl server, boolean force) {
    if (pauseLeaderElection.get()) {
      return;
    }
    updateAndGet(leaderElection, new LeaderElection(server, force)).start();
  }

选举时,先后调用askForVotes方法进行预投票和投票:

public void run() {
    //省略

    final Timer.Context electionContext = server.getLeaderElectionMetrics().getLeaderElectionTimer().time();
    try {
      if (skipPreVote || askForVotes(Phase.PRE_VOTE)) {
        if (askForVotes(Phase.ELECTION)) {
          server.changeToLeader();
        }
      }
    } //省略
  }

在LeaderElection.askForVotes中,如果状态为candidate,就无限进行投票:

private boolean askForVotes(Phase phase) throws InterruptedException, IOException {
    //shouldRun表示如果当前节点正在运行,并且状态为candidate,并且存活,就不断循环遍历投票
    for(int round = 0; shouldRun(); round++) {
      final long electionTerm;
      final RaftConfigurationImpl conf;
      synchronized (server) {
        if (!shouldRun()) {
          return false;
        }
        final ConfAndTerm confAndTerm = server.getState().initElection(phase);
        electionTerm = confAndTerm.getTerm();
        conf = confAndTerm.getConf();
      }

      LOG.info("{} {} round {}: submit vote requests at term {} for {}", this, phase, round, electionTerm, conf);
      //开始投票
      final ResultAndTerm r = submitRequestAndWaitResult(phase, conf, electionTerm);
      LOG.info("{} {} round {}: result {}", this, phase, round, r);
      //省略
  }

最后,在LeaderElection.submitRequests方法中,执行requestVote rpc请求,向其他节点开始投票:

private int submitRequests(Phase phase, long electionTerm, TermIndex lastEntry,
      Collection<RaftPeer> others, Executor voteExecutor) {
    int submitted = 0;
    for (final RaftPeer peer : others) {
      final RequestVoteRequestProto r = ServerProtoUtils.toRequestVoteRequestProto(
          server.getMemberId(), peer.getId(), electionTerm, lastEntry, phase == Phase.PRE_VOTE);
      //开始投票
      voteExecutor.submit(() -> server.getServerRpc().requestVote(r));
      submitted++;
    }
    return submitted;
  }

3.2 Ratis日志同步流程

Ratis日志同步流程如下:

  1. 先将entry存储为本地log,发送给每个follower。
  2. 修改状态机中的数据。
  3. 没处理40w事务,就开始创建snapshot。follower可以直接创建snapshot,但是master只能通过命令手动建snapshot,否则就从follower下载snapshot。

Leader在接受客户端RPC请求后,先同步Journal到Follower中,再修改元数据。即元数据生效的延迟较高。流程图如下所示: image.png

RaftClientProtocol接口submitClientRequest方法定义客户端提交请求:

public interface RaftClientProtocol {
  RaftClientReply submitClientRequest(RaftClientRequest request) throws IOException;
}

RaftServerImpl.submitClientRequest实现了该方法,它中间会调用appendTransaction开始执行append操作:

private CompletableFuture<RaftClientReply> appendTransaction(
      RaftClientRequest request, TransactionContext context, CacheEntry cacheEntry) throws IOException {
    assertLifeCycleState(LifeCycle.States.RUNNING);
    CompletableFuture<RaftClientReply> reply;

    final PendingRequest pending;
    synchronized (this) {
      reply = checkLeaderState(request, cacheEntry, true);
      if (reply != null) {
        return reply;
      }

      // append the message to its local log
      final LeaderStateImpl leaderState = role.getLeaderStateNonNull();
      final PendingRequests.Permit permit = leaderState.tryAcquirePendingRequest(request.getMessage());
      //省略
      try {
        //持久化到本地
        state.appendLog(context);
      } //省略

      // put the request into the pending queue
      //构建appendEntry请求
      pending = leaderState.addPendingRequest(permit, request, context);
      if (pending == null) {
        cacheEntry.failWithException(new ResourceUnavailableException(
            getMemberId() + ": Failed to add a pending write request for " + request));
        return cacheEntry.getReplyFuture();
      }
      //发送appendEntry请求给candidate
      leaderState.notifySenders();
    }
    return pending.getFuture();
  }

本地持久化

ServerState.appendLog开始进行本地持久化,最终调用SegmentedRaftLog.appendEntryImpl方法落盘:

protected CompletableFuture<Long> appendEntryImpl(LogEntryProto entry) {
    final Timer.Context context = getRaftLogMetrics().getRaftLogAppendEntryTimer().time();
    checkLogState();
    if (LOG.isTraceEnabled()) {
      LOG.trace("{}: appendEntry {}", getName(), LogProtoUtils.toLogEntryString(entry));
    }
    try(AutoCloseableLock writeLock = writeLock()) {
      validateLogEntry(entry);
      final LogSegment currentOpenSegment = cache.getOpenSegment();
      if (currentOpenSegment == null) {
        //写到cache和log文件中
        cache.addOpenSegment(entry.getIndex());
        fileLogWorker.startLogSegment(entry.getIndex());
      } else if (isSegmentFull(currentOpenSegment, entry)) {
        cache.rollOpenSegment(true);
        fileLogWorker.rollLogSegment(currentOpenSegment);
      } else if (currentOpenSegment.numOfEntries() > 0 &&
          currentOpenSegment.getLastTermIndex().getTerm() != entry.getTerm()) {
        // the term changes
        final long currentTerm = currentOpenSegment.getLastTermIndex().getTerm();
        Preconditions.assertTrue(currentTerm < entry.getTerm(),
            "open segment's term %s is larger than the new entry's term %s",
            currentTerm, entry.getTerm());
        cache.rollOpenSegment(true);
        fileLogWorker.rollLogSegment(currentOpenSegment);
      }

      //TODO(runzhiwang): If there is performance problem, start a daemon thread to checkAndEvictCache
      checkAndEvictCache();

      // If the entry has state machine data, then the entry should be inserted
      // to statemachine first and then to the cache. Not following the order
      // will leave a spurious entry in the cache.
      CompletableFuture<Long> writeFuture =
          fileLogWorker.writeLogEntry(entry).getFuture();
      if (stateMachineCachingEnabled) {
        // The stateMachineData will be cached inside the StateMachine itself.
        cache.appendEntry(LogProtoUtils.removeStateMachineData(entry),
            LogSegment.Op.WRITE_CACHE_WITH_STATE_MACHINE_CACHE);
      } else {
        cache.appendEntry(entry, LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE);
      }
      return writeFuture;
    } catch (Exception e) {
      LOG.error("{}: Failed to append {}", getName(), LogProtoUtils.toLogEntryString(entry), e);
      throw e;
    } finally {
      context.stop();
    }
  }

发送给candidate

本地落盘执行完后,LeaderStateImpl.addPendingRequest开始构建请求,准备发送给candidate。构建完后,执行notifySenders通知LogAppender发送给candidate:

PendingRequest addPendingRequest(PendingRequests.Permit permit, RaftClientRequest request, TransactionContext entry) {
    if (LOG.isDebugEnabled()) {
      LOG.debug("{}: addPendingRequest at {}, entry={}", this, request,
          LogProtoUtils.toLogEntryString(entry.getLogEntry()));
    }
    return pendingRequests.add(permit, request, entry);
  }

void notifySenders() {
    senders.forEach(LogAppender::notifyLogAppender);
  }

LogAppenderDefault.run方法发送entry:

public void run() throws InterruptedException, IOException {
    while (isRunning()) {
      //省略
          //发送Entry
          final AppendEntriesReplyProto r = sendAppendEntriesWithRetries();
          if (r != null) {
            handleReply(r);
          }
        }
      }
      if (isRunning() && !hasAppendEntries()) {
        getEventAwaitForSignal().await(getHeartbeatWaitTimeMs(), TimeUnit.MILLISECONDS);
      }
      getLeaderState().checkHealth(getFollower());
    }
  }

sendAppendEntriesWithRetries开始发送请求:

private AppendEntriesReplyProto sendAppendEntriesWithRetries()
      throws InterruptedException, InterruptedIOException, RaftLogIOException {
    int retry = 0;
    AppendEntriesRequestProto request = null;
    while (isRunning()) { // keep retrying for IOException
      //省略
        final AppendEntriesReplyProto r = getServerRpc().appendEntries(request);
        //省略
  }

它最终执行的是RaftServerProtocol接口中的appendEntries方法,用于向其他candidate发送appendEntry请求:

public interface RaftServerProtocol {
  enum Op {REQUEST_VOTE, APPEND_ENTRIES, INSTALL_SNAPSHOT}

  RequestVoteReplyProto requestVote(RequestVoteRequestProto request) throws IOException;

  AppendEntriesReplyProto appendEntries(AppendEntriesRequestProto request) throws IOException;

  InstallSnapshotReplyProto installSnapshot(InstallSnapshotRequestProto request) throws IOException;

  StartLeaderElectionReplyProto startLeaderElection(StartLeaderElectionRequestProto request) throws IOException;
}

更新statemachine

candidate更新成功后,leader执行onFollowerSuccessAppendEntries准备更新状态机:

public void onFollowerSuccessAppendEntries(FollowerInfo follower) {
    if (isAttendingVote(follower)) {
      submitUpdateCommitEvent();
    } else {
      eventQueue.submit(checkStagingEvent);
    }
  }

半数以上更新成功,开始updateCommitIndex:

private void updateCommit(long majority, long min) {
    final long oldLastCommitted = raftLog.getLastCommittedIndex();
    if (majority > oldLastCommitted) {
      // Get the headers before updating commit index since the log can be purged after a snapshot
      final LogEntryHeader[] entriesToCommit = raftLog.getEntries(oldLastCommitted + 1, majority + 1);

      if (server.getState().updateCommitIndex(majority, currentTerm, true)) {
        updateCommit(entriesToCommit);
      }
    }
    watchRequests.update(ReplicationLevel.ALL, min);
  }

updateCommitIndex通知stateMachineUpdater处理log:

boolean updateCommitIndex(long majorityIndex, long curTerm, boolean isLeader) {
    if (log.updateCommitIndex(majorityIndex, curTerm, isLeader)) {
      stateMachineUpdater.notifyUpdater();
      return true;
    }
    return false;
  }

StateMachineUpdater线程死循环,等待signal信息,获取log中的entry并应用到statemachine中:

public void run() {
    for(; state != State.STOP; ) {
      try {
        waitForCommit();

        if (state == State.RELOAD) {
          reload();
        }
        //将log应用到状态机中
        final MemoizedSupplier<List<CompletableFuture<Message>>> futures = applyLog();
        checkAndTakeSnapshot(futures);

        if (shouldStop()) {
          checkAndTakeSnapshot(futures);
          stop();
        }
      } catch (Throwable t) {
        if (t instanceof InterruptedException && state == State.STOP) {
          LOG.info("{} was interrupted.  Exiting ...", this);
        } else {
          state = State.EXCEPTION;
          LOG.error(this + " caught a Throwable.", t);
          server.close();
        }
      }
    }
  }

4. Apache Ratis在Alluxio中的应用

4.1 Raft选举流程

Alluxio Master启动时,启动JournalSystem线程,实际上是RaftJournalSystem类:

public class AlluxioMasterProcess extends MasterProcess {
  protected final JournalSystem mJournalSystem;
  public void start() throws Exception {
    mJournalSystem.start();
  }
}

后续执行RaftJournalSystem.startInternal方法,joinQuorum开始选举:

private RaftServer mServer;

public synchronized void startInternal() {
    LOG.info("Initializing Raft Journal System");
    mPeerId = RaftJournalUtils.getPeerId(mLocalAddress);
    Set<RaftPeer> peers = mClusterAddresses.stream()
        .map(addr -> RaftPeer.newBuilder()
                .setId(RaftJournalUtils.getPeerId(addr))
                .setAddress(addr)
                .build()
        )
        .collect(Collectors.toSet());
    mRaftGroup = RaftGroup.valueOf(RAFT_GROUP_ID, peers);
    LOG.info("Starting Raft journal system. Cluster addresses: {}. Local address: {}",
        mClusterAddresses, mLocalAddress);
    try {
      initServer();
      long startTime = System.currentTimeMillis();
      mServer.start();
      LOG.info("Started Raft Journal System in {}ms", System.currentTimeMillis() - startTime);
    //省略
    //开始选举
    joinQuorum();
  }

joinQuorum方法构建选举请求并发送:

private void joinQuorum() {
    // Send a request to join the quorum.
    // If the server is already part of the quorum, this operation is a noop.
    AddQuorumServerRequest request = AddQuorumServerRequest.newBuilder()
        .setServerAddress(NetAddress.newBuilder()
            .setHost(mLocalAddress.getHostString())
            .setRpcPort(mLocalAddress.getPort()))
        .build();
    RaftClient client = createClient();
    client.async().sendReadOnly(Message.valueOf(
        UnsafeByteOperations.unsafeWrap(
            JournalQueryRequest
                .newBuilder()
                .setAddQuorumServerRequest(request)
                .build().toByteArray()
        ))).whenComplete((reply, t) -> {
          if (t != null) {
            LogUtils.warnWithException(LOG, "Exception occurred while joining quorum", t);
          }
          if (reply != null && reply.getException() != null) {
            LogUtils.warnWithException(LOG,
                "Received an error while joining quorum", reply.getException());
          }
          try {
            client.close();
          } catch (IOException e) {
            LogUtils.warnWithException(LOG, "Exception occurred closing raft client", e);
          }
        });
  }

4.2 日志更新流程

以Alluxio执行逻辑操作为例,DefaultBlockMaster.removeBlocks会讲op操作封装成entry记录到context中。Alluxio直接修改内存中,即状态机中的block元数据,再发送落盘和发送给其他follower:

public void removeBlocks(Collection<Long> blockIds, boolean delete) throws UnavailableException {
    try (JournalContext journalContext = createJournalContext()) {
      for (long blockId : blockIds) {
        Set<Long> workerIds;
        try (LockResource r = lockBlock(blockId)) {
          Optional<BlockMeta> block = mBlockMetaStore.getBlock(blockId);
          if (!block.isPresent()) {
            continue;
          }
          List<BlockLocation> locations = mBlockMetaStore.getLocations(blockId);
          workerIds = new HashSet<>(locations.size());
          for (BlockLocation loc : locations) {
            workerIds.add(loc.getWorkerId());
          }
          if (delete) {
            // Make sure blockId is removed from mLostBlocks when the block metadata is deleted.
            // Otherwise blockId in mLostBlock can be dangling index if the metadata is gone.
            //直接修改内存中的block数据
            mLostBlocks.remove(blockId);
            mBlockMetaStore.removeBlock(blockId);
            JournalEntry entry = JournalEntry.newBuilder()
                .setDeleteBlock(DeleteBlockEntry.newBuilder().setBlockId(blockId)).build();
            //将entry加入到context中
            journalContext.append(entry);
          }
        }
   //省略
    }
  }

MasterJournalContext 会讲entry放到AsyncJournalWriter的mQueue中:

public long appendEntry(JournalEntry entry) {
    // TODO(gpang): handle bounding the queue if it becomes too large.
    mCounter.incrementAndGet();
    mQueue.offer(entry);
    return mCounter.get();
  }

AsyncJournalWriter线程执行doFlush方法,调用Ratis接口发送entry:

private void doFlush() {
    // Runs the loop until ::stop() is called.
    while (!mStopFlushing) {

      while (mQueue.isEmpty() && !mStopFlushing) {
        //省略

      try {
        long startTime = System.nanoTime();

        // Write pending entries to journal.
        while (!mQueue.isEmpty()) {
          // Get, but do not remove, the head entry.
          JournalEntry entry = mQueue.peek();
          if (entry == null) {
            // No more entries in the queue. Break write session.
            break;
          }
          mJournalWriter.write(entry);
          JournalUtils.sinkAppend(mJournalSinks, entry);
          // Remove the head entry, after the entry was successfully written.
          mQueue.poll();
          mWriteCounter++;

          if (((System.nanoTime() - startTime) >= mFlushBatchTimeNs) && !mStopFlushing) {
            // This thread has been writing to the journal for enough time. Break out of the
            // infinite while-loop.
            break;
          }
        }

        // Either written new entries or previous flush had been failed.
        if (mFlushCounter.get() < mWriteCounter) {
          try (Timer.Context ctx = MetricsSystem
              .timer(MetricKey.MASTER_JOURNAL_FLUSH_TIMER.getName()).time()) {
             //调用Ratis接发送entry接口
             mJournalWriter.flush();
          }
          JournalUtils.sinkFlush(mJournalSinks);
          mFlushCounter.set(mWriteCounter);
        }

        // Notify tickets that have been served to wake up.
        Iterator<FlushTicket> ticketIterator = mTicketSet.iterator();
        while (ticketIterator.hasNext()) {
          FlushTicket ticket = ticketIterator.next();
          if (ticket.getTargetCounter() <= mFlushCounter.get()) {
            ticket.setCompleted();
            ticketIterator.remove();
          }
        }
      } //省略
    }
  }

4.3 Alluxio状态机

RaftJournalSystem中RaftServer就是服务端Ratis rpc处理类,将Alluxio自定义的状态机JournalStateMachine放入RaftServer中,后续Ratis修改的状态机就是JournalStateMachine:

private JournalStateMachine mStateMachine;
RaftServer mServer = RaftServer.newBuilder()
        .setServerId(mPeerId)
        .setGroup(mRaftGroup)
        .setStateMachine(mStateMachine)
        .setProperties(properties)
        .setParameters(parameters)
        .build();

如下所示:JournalStateMachine实现了状态机接口:

public class JournalStateMachine extends BaseStateMachine {}

状态机在更新Entry时,会判断是否是leader,如果是leader,之前已经更新过了,不需要再更新状态机了:

private void applySingleEntry(JournalEntry entry) {
    if (mClosed) {
      return;
    }
    long newSN = entry.getSequenceNumber();
    if (newSN < mNextSequenceNumberToRead) {
      // This can happen due to retried writes. For example, if flushing [3, 4] fails, we will
      // retry, and the log may end up looking like [1, 2, 3, 4, 3, 4] if the original request
      // eventually succeeds. Once we've read the first "4", we must ignore the next two entries.
      LOG.debug("Ignoring duplicate journal entry with SN {} when next SN is {}", newSN,
          mNextSequenceNumberToRead);
      return;
    }
    if (newSN > mNextSequenceNumberToRead) {
      ProcessUtils.fatalError(LOG,
          "Unexpected journal entry. The next expected SN is %s, but"
              + " encountered an entry with SN %s. Full journal entry: %s",
          mNextSequenceNumberToRead, newSN, entry);
    }

    mNextSequenceNumberToRead++;
    //leader无需再apply entry
    if (!mIgnoreApplys) {
      mJournalApplier.processJournalEntry(entry);
    }
  }

在follower更新状态机时,其实就是更新DefaultBlockMaster的BlockMetaStore:

private void applyToMaster(Journal.JournalEntry entry) {
    String masterName;
    try {
      masterName = JournalEntryAssociation.getMasterForEntry(entry);
    } catch (Exception t) {
      ProcessUtils.fatalError(LOG, t, "Unrecognized journal entry: %s", entry);
      throw new IllegalStateException();
    }
    try {
      Journaled master = mJournals.get(masterName).getStateMachine();
      LOG.trace("Applying entry to master {}: {} ", masterName, entry);
      master.processJournalEntry(entry);
      JournalUtils.sinkAppend(mJournalSinks, entry);
    } catch (Exception t) {
      JournalUtils.handleJournalReplayFailure(LOG, t,
          "Failed to apply journal entry to master %s. Entry: %s", masterName, entry);
    }
    // Store last applied sequence.
    mLastAppliedSequence = entry.getSequenceNumber();
  }

如下,DefaultBlockMaster在处理entry时,BlockMetaStore删除block数据:

if (entry.hasDeleteBlock()) {
      mBlockMetaStore.removeBlock(entry.getDeleteBlock().getBlockId());
【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月19日 0

暂无评论

推荐阅读
GQ7psP7UJw7k