1. 背景
对于Hadoop集群而言,将长期没有访问的冷数据放到DataNode中的磁盘存储成本较高,可以将这部分冷数据存储到S3中。这就引入一个问题,虽然Hadoop支持s3a的方式访问s3文件系统,但是访问时需要携带aksk,一旦用户拿到aksk,他们就有随意操控整个S3数据的权限,整个S3数据就不安全了。
为了解决这个安全问题,可以将S3文件系统挂载到Alluxio文件系统中,Alluxio的客户端可以通过Alluxio统一命名空间和接口来访问多个独立的存储系统,S3就是Alluxio的UFS(Under FileSystem)。这样,只有Alluxio服务会访问S3文件,Alluxio服务配置aksk;用户则直接通过Alluxio间接访问S3,用户侧无需aksk。Alluxio文件系统支持ACL权限认证,HDFS客户端还能够通过整合SDK直接访问Alluxio。
如下,将HDFS冷数据迁移到S3中。S3路径挂载到Alluxio文件系统的命名空间中,通过访问alluxio的/Data目录,就能获取S3根目录下的所有信息:
2. Alluxio架构
Alluxio组件分为Master和Worker。Master负责维护元数据信息,Worker负责缓存底层文件系统的数据。注意:Alluxio能够挂载多种文件系统,例如HDFS、S3。
Master节点内有两个进程:
- AlluxioMaster:存储元数据,接受RPC请求,只有一个leader Master对外服务,其他master作为leader的候选者等待被选举。
- AlluxioJobMaster:Alluxio内置轻量级的作业调度框架,创建Job给JobWorker执行。
Worker节点内有两个进程:
- AlluxioWorker:存储block,接受RPC请求。
- AlluxioJobWorker:执行Job。
如下所示:
3. Alluxio选举流程
在https://blog.51cto.com/u_15327484/8286269介绍了Ratis的选举流程,Alluxio直接使用这个选举流程,没有进行任何改造,不详细展开。
4. Alluxio元数据更新流程
在https://blog.51cto.com/u_15327484/8286269中,也介绍了Ratis的元数据更新流程。分析发现Ratis先将entry持久化到磁盘中,再通过StateMachine修改整个内存中的元数据。Ratis框架不了解什么样的请求会修改元数据,所有请求entry都持久化到磁盘中,即使有些entry不会变更元数据。
Alluxio为了提高效率,先将客户端请求对元数据进行修改,对于修改了元数据的entry,会落盘到磁盘中,Follower再读取entry修改状态机,Leader因为提前修改了元数据,不会再修改状态机了。
由于Leader提前修改了元数据,客户端访问元数据时能够看到变更快速生效,提高效率。
Alluxio元数据变更基础流程如下所示:
详细流程如下,相对于Ratis,Alluxio的同步流程有两处变更:
- Alluxio Leader先变更元数据,再同步Log到follower中。注意,Follower还是先持久化entry,在更新状态机内存。
- Alluxio Leader从Follwoer下载Snapshot,自己不生成Snapshot,因为生成Snapshot时,无法变更内存中的元数据。
如下,在Alluxio Leader获取修改请求时,先更新内存元数据,再同步entry到其他follower中:
public void applyAndJournal(Supplier<JournalContext> context, AddSyncPointEntry entry) {
LOG.info("Apply startSync {}", entry.getSyncpointPath());
try {
//先更新元数据
apply(entry);
//再同步entry到follower
context.get().append(Journal.JournalEntry.newBuilder().setAddSyncPoint(entry).build());
} catch (Throwable t) {
ProcessUtils.fatalError(LOG, t, "Failed to apply %s", entry);
throw t;// fatalError will usually system.exit
}
}
Alluxio的 JournalStateMachine状态机在修改元数据时,如果发现master为leader,就不修改元数据,因为已经提前修改过了:
private void applySingleEntry(JournalEntry entry) {
//省略
if (!mIgnoreApplys) {
mJournalApplier.processJournalEntry(entry);
}
}
//mIgnoreApplys默认为false,当master为leader时,设置为true
private boolean mIgnoreApplys = false;
public synchronized long upgrade() {
// Resume the journal applier if was suspended.
if (mJournalApplier.isSuspended()) {
try {
resume();
} catch (IOException e) {
ProcessUtils.fatalError(LOG, e, "State-machine failed to catch up after suspension.");
}
}
//如果Alluxio为Leader,就设置true
mIgnoreApplys = true;
return mNextSequenceNumberToRead - 1;
}
Alluxio中,如果Master时Leader,当应用的事务超过40w,直接从其他Follower中下载snapshot:
public long takeSnapshot() {
long index;
StateLockManager stateLockManager = mStateLockManagerRef.get();
if (!mIsLeader) {
//follower直接创建snapshot
index = takeLocalSnapshot(false);
} else if (stateLockManager != null) {
// the leader has been allowed to take a local snapshot by being given a non-null
// StateLockManager through the #allowLeaderSnapshots method
try (LockResource stateLock = stateLockManager.lockExclusive(StateLockOptions.defaults())) {
//通过命令强制构建snapshot
index = takeLocalSnapshot(true);
} catch (Exception e) {
return RaftLog.INVALID_LOG_INDEX;
}
} else {
RaftGroup group;
try (LockResource ignored = new LockResource(mGroupLock)) {
if (mServerClosing) {
return RaftLog.INVALID_LOG_INDEX;
}
// These calls are protected by mGroupLock and mServerClosing
// as they will access the lock in RaftServerProxy.java
// which is also accessed during raft server shutdown which
// can cause a deadlock as the shutdown takes the lock while
// waiting for this thread to finish
Preconditions.checkState(mServer.getGroups().iterator().hasNext());
group = mServer.getGroups().iterator().next();
} catch (IOException e) {
SAMPLING_LOG.warn("Failed to get raft group info: {}", e.getMessage());
return RaftLog.INVALID_LOG_INDEX;
}
if (group.getPeers().size() < 2) {
SAMPLING_LOG.warn("No follower to perform delegated snapshot. Please add more masters to "
+ "the quorum or manually take snapshot using 'alluxio fsadmin journal checkpoint'");
return RaftLog.INVALID_LOG_INDEX;
} else {
//从其他Follower中获取snapshot
index = mSnapshotManager.maybeCopySnapshotFromFollower();
}
}
// update metrics if took a snapshot
if (index != RaftLog.INVALID_LOG_INDEX) {
mSnapshotLastIndex = index;
mLastCheckPointTime = System.currentTimeMillis();
}
return index;
}
Ratis中会判断是否需要创建snapshot,每隔40w条entry就会创建一次snapshot:
private boolean shouldTakeSnapshot() {
if (autoSnapshotThreshold == null) {
return false;
} else if (shouldStop()) {
return getLastAppliedIndex() - snapshotIndex.get() > 0;
}
return state == State.RUNNING && getLastAppliedIndex() - snapshotIndex.get() >= autoSnapshotThreshold;
}
5. Alluxio元数据管理
Alluxio中,状态机在apply entry时,会先查看要更新那部分的元数据,再进行变更
private void applyToMaster(Journal.JournalEntry entry) {
String masterName;
try {
//获取entry要变更的元数据管理对象
masterName = JournalEntryAssociation.getMasterForEntry(entry);
} catch (Exception t) {
ProcessUtils.fatalError(LOG, t, "Unrecognized journal entry: %s", entry);
throw new IllegalStateException();
}
try {
Journaled master = mJournals.get(masterName).getStateMachine();
LOG.trace("Applying entry to master {}: {} ", masterName, entry);
//元数据管理对象变更
master.processJournalEntry(entry);
JournalUtils.sinkAppend(mJournalSinks, entry);
} catch (Exception t) {
JournalUtils.handleJournalReplayFailure(LOG, t,
"Failed to apply journal entry to master %s. Entry: %s", masterName, entry);
}
// Store last applied sequence.
mLastAppliedSequence = entry.getSequenceNumber();
}
Journal状态机维护以下元数据对象:
- MetaMaster:维护WEBUI所需的元数据信息。
- FilesystemMaster:维护InodeTree信息,包含UFS元数据信息。
- BlockMaster:Block相关位置信息。
- MetricsMaster:维护指标信息。
通过DEBUG可以看到维护的对象:
其中,FilesystemMaster维护了文件系统树的缓存:
- InodeCache:Inode信息。
- EdgeCache:Inode父子信息。
- ListingCache:目录下的文件信息。
FilesystemMaster和BlockMaster元数据会存储到RocksDB中,避免元数据太多打爆内存。以FilesystemMaster为例,其元数据更新流程如下所示: