Alluxio架构分析
  GQ7psP7UJw7k 2023年11月19日 19 0

1. 背景

对于Hadoop集群而言,将长期没有访问的冷数据放到DataNode中的磁盘存储成本较高,可以将这部分冷数据存储到S3中。这就引入一个问题,虽然Hadoop支持s3a的方式访问s3文件系统,但是访问时需要携带aksk,一旦用户拿到aksk,他们就有随意操控整个S3数据的权限,整个S3数据就不安全了。

为了解决这个安全问题,可以将S3文件系统挂载到Alluxio文件系统中,Alluxio的客户端可以通过Alluxio统一命名空间和接口来访问多个独立的存储系统,S3就是Alluxio的UFS(Under FileSystem)。这样,只有Alluxio服务会访问S3文件,Alluxio服务配置aksk;用户则直接通过Alluxio间接访问S3,用户侧无需aksk。Alluxio文件系统支持ACL权限认证,HDFS客户端还能够通过整合SDK直接访问Alluxio。

如下,将HDFS冷数据迁移到S3中。S3路径挂载到Alluxio文件系统的命名空间中,通过访问alluxio的/Data目录,就能获取S3根目录下的所有信息:

Untitled.png

2. Alluxio架构

Alluxio组件分为Master和Worker。Master负责维护元数据信息,Worker负责缓存底层文件系统的数据。注意:Alluxio能够挂载多种文件系统,例如HDFS、S3。

Master节点内有两个进程:

  1. AlluxioMaster:存储元数据,接受RPC请求,只有一个leader Master对外服务,其他master作为leader的候选者等待被选举。
  2. AlluxioJobMaster:Alluxio内置轻量级的作业调度框架,创建Job给JobWorker执行。

Worker节点内有两个进程:

  1. AlluxioWorker:存储block,接受RPC请求。
  2. AlluxioJobWorker:执行Job。

如下所示:

Untitled 1.png

3. Alluxio选举流程

https://blog.51cto.com/u_15327484/8286269介绍了Ratis的选举流程,Alluxio直接使用这个选举流程,没有进行任何改造,不详细展开。

4. Alluxio元数据更新流程

https://blog.51cto.com/u_15327484/8286269中,也介绍了Ratis的元数据更新流程。分析发现Ratis先将entry持久化到磁盘中,再通过StateMachine修改整个内存中的元数据。Ratis框架不了解什么样的请求会修改元数据,所有请求entry都持久化到磁盘中,即使有些entry不会变更元数据。

Alluxio为了提高效率,先将客户端请求对元数据进行修改,对于修改了元数据的entry,会落盘到磁盘中,Follower再读取entry修改状态机,Leader因为提前修改了元数据,不会再修改状态机了。

由于Leader提前修改了元数据,客户端访问元数据时能够看到变更快速生效,提高效率。

Alluxio元数据变更基础流程如下所示:

Untitled 2.png

详细流程如下,相对于Ratis,Alluxio的同步流程有两处变更:

  1. Alluxio Leader先变更元数据,再同步Log到follower中。注意,Follower还是先持久化entry,在更新状态机内存。
  2. Alluxio Leader从Follwoer下载Snapshot,自己不生成Snapshot,因为生成Snapshot时,无法变更内存中的元数据。

Untitled 3.png

如下,在Alluxio Leader获取修改请求时,先更新内存元数据,再同步entry到其他follower中:

public void applyAndJournal(Supplier<JournalContext> context, AddSyncPointEntry entry) {
LOG.info("Apply startSync {}", entry.getSyncpointPath());
  try {
    //先更新元数据
    apply(entry);
    //再同步entry到follower
    context.get().append(Journal.JournalEntry.newBuilder().setAddSyncPoint(entry).build());
  } catch (Throwable t) {
    ProcessUtils.fatalError(LOG, t, "Failed to apply %s", entry);
    throw t;// fatalError will usually system.exit
}
}

Alluxio的 JournalStateMachine状态机在修改元数据时,如果发现master为leader,就不修改元数据,因为已经提前修改过了:

private void applySingleEntry(JournalEntry entry) {
    //省略
    if (!mIgnoreApplys) {
      mJournalApplier.processJournalEntry(entry);
    }
  }

//mIgnoreApplys默认为false,当master为leader时,设置为true
private boolean mIgnoreApplys = false;
public synchronized long upgrade() {
    // Resume the journal applier if was suspended.
    if (mJournalApplier.isSuspended()) {
      try {
        resume();
      } catch (IOException e) {
        ProcessUtils.fatalError(LOG, e, "State-machine failed to catch up after suspension.");
      }
    }
    //如果Alluxio为Leader,就设置true
    mIgnoreApplys = true;
    return mNextSequenceNumberToRead - 1;
  }

Alluxio中,如果Master时Leader,当应用的事务超过40w,直接从其他Follower中下载snapshot:

public long takeSnapshot() {
    long index;
    StateLockManager stateLockManager = mStateLockManagerRef.get();
    if (!mIsLeader) {
      //follower直接创建snapshot
      index = takeLocalSnapshot(false);
    } else if (stateLockManager != null) {
      // the leader has been allowed to take a local snapshot by being given a non-null
      // StateLockManager through the #allowLeaderSnapshots method
      try (LockResource stateLock = stateLockManager.lockExclusive(StateLockOptions.defaults())) {
        //通过命令强制构建snapshot
        index = takeLocalSnapshot(true);
      } catch (Exception e) {
        return RaftLog.INVALID_LOG_INDEX;
      }
    } else {
      RaftGroup group;
      try (LockResource ignored = new LockResource(mGroupLock)) {
        if (mServerClosing) {
          return RaftLog.INVALID_LOG_INDEX;
        }
        // These calls are protected by mGroupLock and mServerClosing
        // as they will access the lock in RaftServerProxy.java
        // which is also accessed during raft server shutdown which
        // can cause a deadlock as the shutdown takes the lock while
        // waiting for this thread to finish
        Preconditions.checkState(mServer.getGroups().iterator().hasNext());
        group = mServer.getGroups().iterator().next();
      } catch (IOException e) {
        SAMPLING_LOG.warn("Failed to get raft group info: {}", e.getMessage());
        return RaftLog.INVALID_LOG_INDEX;
      }
      if (group.getPeers().size() < 2) {
        SAMPLING_LOG.warn("No follower to perform delegated snapshot. Please add more masters to "
            + "the quorum or manually take snapshot using 'alluxio fsadmin journal checkpoint'");
        return RaftLog.INVALID_LOG_INDEX;
      } else {
        //从其他Follower中获取snapshot
        index = mSnapshotManager.maybeCopySnapshotFromFollower();
      }
    }
    // update metrics if took a snapshot
    if (index != RaftLog.INVALID_LOG_INDEX) {
      mSnapshotLastIndex = index;
      mLastCheckPointTime = System.currentTimeMillis();
    }
    return index;
  }

Ratis中会判断是否需要创建snapshot,每隔40w条entry就会创建一次snapshot:

private boolean shouldTakeSnapshot() {
    if (autoSnapshotThreshold == null) {
      return false;
    } else if (shouldStop()) {
      return getLastAppliedIndex() - snapshotIndex.get() > 0;
    }
    return state == State.RUNNING && getLastAppliedIndex() - snapshotIndex.get() >= autoSnapshotThreshold;
  }

5. Alluxio元数据管理

Alluxio中,状态机在apply entry时,会先查看要更新那部分的元数据,再进行变更

private void applyToMaster(Journal.JournalEntry entry) {
    String masterName;
    try {
      //获取entry要变更的元数据管理对象
      masterName = JournalEntryAssociation.getMasterForEntry(entry);
    } catch (Exception t) {
      ProcessUtils.fatalError(LOG, t, "Unrecognized journal entry: %s", entry);
      throw new IllegalStateException();
    }
    try {
      Journaled master = mJournals.get(masterName).getStateMachine();
      LOG.trace("Applying entry to master {}: {} ", masterName, entry);
      //元数据管理对象变更
      master.processJournalEntry(entry);
      JournalUtils.sinkAppend(mJournalSinks, entry);
    } catch (Exception t) {
      JournalUtils.handleJournalReplayFailure(LOG, t,
          "Failed to apply journal entry to master %s. Entry: %s", masterName, entry);
    }
    // Store last applied sequence.
    mLastAppliedSequence = entry.getSequenceNumber();
  }

Journal状态机维护以下元数据对象:

  1. MetaMaster:维护WEBUI所需的元数据信息。
  2. FilesystemMaster:维护InodeTree信息,包含UFS元数据信息。
  3. BlockMaster:Block相关位置信息。
  4. MetricsMaster:维护指标信息。

通过DEBUG可以看到维护的对象:

Untitled 4.png

其中,FilesystemMaster维护了文件系统树的缓存:

  • InodeCache:Inode信息。
  • EdgeCache:Inode父子信息。
  • ListingCache:目录下的文件信息。

FilesystemMaster和BlockMaster元数据会存储到RocksDB中,避免元数据太多打爆内存。以FilesystemMaster为例,其元数据更新流程如下所示:

Untitled 5.png

【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月19日 0

暂无评论

推荐阅读
GQ7psP7UJw7k