Alluxio读写流程
  GQ7psP7UJw7k 2023年11月19日 19 0

1. 背景

https://blog.51cto.com/u_15327484文章中,介绍了Alluxio的架构。本文基于此,介绍Alluxio文件读写流程。Alluxio读写流程几乎和HDFS一致,只是Worker多了一个从UFS读写的选项,本文会省略部分流程,只介绍重点。

2. Alluxio写流程

客户端向Alluxio写数据时,可以指定是否就Alluxio中的数据写到UFS中。写UFS数据时,还可以指定是同步写入还是异步写入。Alluxio有四种写数据的模式:

  1. MUST_CACHE:数据只写到Alluxio Worker中。
  2. CACHE_THROUGH:数据写到Alluxio Woker中,并同步到UFS中。
  3. ASYNC_THTOUGH:数据写到Alluxio Worker中,并异步持久化到UFS中。
  4. THROUGH:只持久化到UFS中。

2.1 Alluxio客户端发起写入操作

业务代码中,对Alluxio写入一般先调用createFile,再开始write写入数据:

FileSystem fs = FileSystem.Factory.get();
AlluxioURI path = new AlluxioURI("/myFile");
// Create a file and get its output stream
FileOutStream out = fs.createFile(path);
// Write data
out.write(...);
// Close and complete file
out.close();

客户端执行写入的流程如下:

Untitled.png

对于createFile请求,最终会调用RetryHandlingFileSystemMasterClient.createFile,通过grpc向Leader Master发起请求,Leader会将元数据在内存中进行修改,并将entry持久化到所有master的磁盘中:

public URIStatus createFile(final AlluxioURI path, final CreateFilePOptions options)
      throws AlluxioStatusException {
    return retryRPC(
        () -> new URIStatus(GrpcUtils.fromProto(mClient.createFile(CreateFilePRequest.newBuilder()
            .setPath(getTransportPath(path)).setOptions(options).build()).getFileInfo())),
        RPC_LOG, "CreateFile", "path=%s,options=%s", path, options);
  }

FileOutStream.write方法最终会调用AlluxioFileOutStream.writeInternal,首先检查是否需要创建一个block,然后准备向Worker中写入数据:

private void writeInternal(int b) throws IOException {
    if (mShouldCacheCurrentBlock) {
      try {
        if (mCurrentBlockOutStream == null || mCurrentBlockOutStream.remaining() == 0) {
          //按需请求新增block
          getNextBlock();
        }
        //写入数据
        mCurrentBlockOutStream.write(b);
      } catch (IOException e) {
        handleCacheWriteException(e);
      }
    }

    if (mUnderStorageType.isSyncPersist()) {
      mUnderStorageOutputStream.write(b);
      Metrics.BYTES_WRITTEN_UFS.inc();
    }
    mBytesWritten++;
  }

getNextBlock方法中,最终调用AlluxioFileOutStream.getNewBlockIdForFile通过grpc请求创建一个block:

public long getNewBlockIdForFile(final AlluxioURI path)
      throws AlluxioStatusException {
    return retryRPC(
        () -> mClient.getNewBlockIdForFile(
            GetNewBlockIdForFilePRequest.newBuilder().setPath(getTransportPath(path))
                .setOptions(GetNewBlockIdForFilePOptions.newBuilder().build()).build())
            .getId(),
        RPC_LOG, "GetNewBlockIdForFile", "path=%s", path);
  }

BlockOutStream.write会调用writeInternal方法写chunk,每个chunk默认1MB:

private void writeInternal(ByteBuf b, int off, int len) throws IOException {
    if (len == 0) {
      return;
    }

    while (len > 0) {
      updateCurrentChunk(false);
      int toWrite = Math.min(len, mCurrentChunk.writableBytes());
      mCurrentChunk.writeBytes(b, off, toWrite);
      off += toWrite;
      len -= toWrite;
    }
    updateCurrentChunk(false);
  }

写满一个chunk后,执行updateCurrentChunk方法,将数据输出到worker中:

private final List<DataWriter> mDataWriters;
if (mCurrentChunk.writableBytes() == 0 || lastChunk) {
      try {
        if (mCurrentChunk.readableBytes() > 0) {
          for (DataWriter dataWriter : mDataWriters) {
            mCurrentChunk.retain();
            dataWriter.writeChunk(mCurrentChunk.duplicate());
          }
        } else {
          Preconditions.checkState(lastChunk);
        }
      } finally {
        // If the packet has bytes to read, we increment its refcount explicitly for every packet
        // writer. So we need to release here. If the packet has no bytes to read, then it has
        // to be the last packet. It needs to be released as well.
        mCurrentChunk.release();
        mCurrentChunk = null;
      }
    }

最终执行GrpcDataWriter.writeChunk写入到mStream中,可以看到mStream其实就是客户端向worker发起writeBlock请求后的输出流:

private final GrpcBlockingStream<WriteRequest, WriteResponse> mStream;
//mStream通过向worker发起writeBlock请求,获得输出流
mStream = new GrpcBlockingStream<>(mClient.get()::writeBlock, writerBufferSizeMessages,
          MoreObjects.toStringHelper(this)
              .add("request", mPartialRequest)
              .add("address", address)
              .toString());

public void writeChunk(final ByteBuf buf) throws IOException {
    mPosToQueue += buf.readableBytes();
    try {
      WriteRequest request = WriteRequest.newBuilder().setCommand(mPartialRequest).setChunk(
          Chunk.newBuilder()
              .setData(UnsafeByteOperations.unsafeWrap(buf.nioBuffer()))
              .build()).build();
      if (mStream instanceof GrpcDataMessageBlockingStream) {
        //客户端通过输出流,将数据发送给worker
        ((GrpcDataMessageBlockingStream<WriteRequest, WriteResponse>) mStream)
            .sendDataMessage(new DataMessage<>(request, new NettyDataBuffer(buf)), mDataTimeoutMs);
      } else {
        mStream.send(request, mDataTimeoutMs);
      }
    } finally {
      buf.release();
    }
  }

2.2 Worker处理写入请求

下图是block数据既写入Worker,又写入UFS流程:

Untitled 1.png

其中,重要流程是DelegationWriteHandler.createWriterHandler方法,根据用户请求,可以执行三种不同的写入逻辑:

  1. ALLUXIO_BLOCK:block数据直接入到Worker中。
  2. UFS_FILE:block数据只写入到UFS中。
  3. UFS_FALLBACK_BLOCK:block数据写入到Worker和UFS中。

DelegationWriteHandler.createWriterHandler方法如下:

private AbstractWriteHandler createWriterHandler(alluxio.grpc.WriteRequest request) {
    switch (request.getCommand().getType()) {
      case ALLUXIO_BLOCK:
        return new BlockWriteHandler(mBlockWorker, mResponseObserver,
            mUserInfo, mDomainSocketEnabled);
      case UFS_FILE:
        return new UfsFileWriteHandler(mUfsManager, mResponseObserver,
            mUserInfo);
      case UFS_FALLBACK_BLOCK:
        return new UfsFallbackBlockWriteHandler(
            mBlockWorker, mUfsManager, mResponseObserver, mUserInfo, mDomainSocketEnabled);
      default:
        throw new IllegalArgumentException(String.format("Invalid request type %s",
            request.getCommand().getType().name()));
    }
  }

UfsFallbackBlockWriteHandler.writeBuf中,先通过BlockWriteHandler.writeBuf将数据写入到worker中,再写入到UFS中:

protected void writeBuf(BlockWriteRequestContext context,
      StreamObserver<WriteResponse> responseObserver, DataBuffer buf, long pos) throws Exception {
    if (context.isWritingToLocal()) {
      long posBeforeWrite = pos - buf.readableBytes();
      try {
        //写入到worker中
        mBlockWriteHandler.writeBuf(context, responseObserver, buf, pos);
        return;
      } //省略
      // close the block writer first
      if (context.getBlockWriter() != null) {
        context.getBlockWriter().close();
      }
      // prepare the UFS block and transfer data from the temp block to UFS
      //创建Ufs的Block,如果不存在文件,就在Ufs中创建文件
      createUfsBlock(context);
      if (posBeforeWrite > 0) {
        //传输数据到Ufs中
        transferToUfsBlock(context, posBeforeWrite);
      }
      // close the original block writer and remove the temp file
      mBlockWriteHandler.cancelRequest(context);
    }
    if (context.getOutputStream() == null) {
      createUfsBlock(context);
    }
    buf.readBytes(context.getOutputStream(), buf.readableBytes());
  }

createUfsBlock方法中,就负责将worker中的数据读取出来,写入到UFS中:

private void transferToUfsBlock(BlockWriteRequestContext context, long pos) throws IOException {
    OutputStream ufsOutputStream = context.getOutputStream();
    long blockId = context.getRequest().getId();
    Optional<TempBlockMeta> block = mWorker.getBlockStore().getTempBlockMeta(blockId);
    Preconditions.checkState(block.isPresent()
        && Files.copy(Paths.get(block.get().getPath()), ufsOutputStream) == pos);
  }

3. Alluxio读流程

3.1 客户端发起读流程

业务读取Alluxio流程代码如下所示:

FileSystem fs = FileSystem.Factory.get();
AlluxioURI path = new AlluxioURI("/myFile");
// Open the file for reading
FileInStream in = fs.openFile(path);
// Read data
in.read(...);
// Close file relinquishing the lock
in.close();

openFile主要是进行初始化操作,将配置信息封装到AlluxioFileInStream对象中。随后执行AlluxioFileInStream.read方法开始读取。

AlluxioFileInStream.read方法经过一系列的调用,执行BlockInStream.readChunk一次读取每个chunk:

private void readChunk() throws IOException {
    if (mDataReader == null) {
      mDataReader = mDataReaderFactory.create(mPos, mLength - mPos);
    }

    if (mCurrentChunk != null && mCurrentChunk.readableBytes() == 0) {
      mCurrentChunk.release();
      mCurrentChunk = null;
    }
    if (mCurrentChunk == null) {
      mCurrentChunk = mDataReader.readChunk();
    }
  }

最终调用GrpcDataReader.readChunkInternal方法读取block。GrpcDataReader首先会通过执行readBlock访问worker获取输入流,在读取输入流中的数据:


private final GrpcBlockingStream<ReadRequest, ReadResponse> mStream;
//调用readBlock访问worker,构建输入流
mStream = new GrpcDataMessageBlockingStream<>(mClient.get()::readBlock,
            readerBufferSizeMessages,
            desc, null, mMarshaller);

private DataBuffer readChunkInternal() throws IOException {
    Preconditions.checkState(!mClient.get().isShutdown(),
        "Data reader is closed while reading data chunks.");
    DataBuffer buffer = null;
    ReadResponse response = null;
    //获取数据
      DataMessage<ReadResponse, DataBuffer> message =
          ((GrpcDataMessageBlockingStream<ReadRequest, ReadResponse>) mStream)
              .receiveDataMessage(mDataTimeoutMs);
    //忽略
    return buffer;
  }

3.2 worker服务端处理读流程

Worker接收读请求后,执行流程如下所示:

Untitled 2.png

Worker服务端收到readBlock请求后,BlockReadHandler.getDataBuffer方法负责获取block输入流,然后返回给客户端:

protected DataBuffer getDataBuffer(BlockReadRequestContext context, long offset, int len)
        throws Exception {
      @Nullable
      BlockReader blockReader = null;
      try {
        //构建blockReader
        openBlock(context);
        openMs = System.currentTimeMillis() - startMs;
        blockReader = context.getBlockReader();
        Preconditions.checkState(blockReader != null);
        startTransferMs = System.currentTimeMillis();
        ByteBuf buf;
        switch (mBlockStoreType) {
          case PAGE:
            if (mIsReaderBufferPooled) {
              buf = PooledDirectNioByteBuf.allocate(len);
            } else {
              buf = Unpooled.directBuffer(len, len);
            }
            try {
              //blockReader读取数据
              while (buf.writableBytes() > 0 && blockReader.transferTo(buf) != -1) {
              }
              return new NettyDataBuffer(buf.retain());
            } finally {
              buf.release();
            }
         //省略
    }

openBlock方法负责构建BlockReader,它最后调用PagedBlockStore.createBlockReader。它的逻辑如下:

  1. 如果block元数据存在,通过元数据读取worker已经存在的block。
  2. 如果block元数据不存在,就访问UFS读取block数据;并检查是否需要写入block到worker中,如果不需要worker缓存,数据会直接从UFS中通过worker传输到客户端。
public BlockReader createBlockReader(long sessionId, long blockId, long offset,
                                       boolean positionShort, Protocol.OpenUfsBlockOptions options)
      throws IOException {
    BlockLock blockLock = mLockManager.acquireBlockLock(sessionId, blockId, BlockLockType.READ);
    //查看worker中是否存在该block,存在就构建reader读worker中的block,获取读锁
    try (LockResource lock = new LockResource(mPageMetaStore.getLock().readLock())) {
      Optional<PagedBlockMeta> blockMeta = mPageMetaStore.getBlock(blockId);
      if (blockMeta.isPresent()) {
        final BlockPageEvictor evictor = blockMeta.get().getDir().getEvictor();
        evictor.addPinnedBlock(blockId);
        return new DelegatingBlockReader(getBlockReader(blockMeta.get(), offset, options), () -> {
          evictor.removePinnedBlock(blockId);
          unpinBlock(blockLock);
        });
      }
    }
    // this is a block that needs to be read from UFS
    //如果worker中不存在block,就要申请写锁从UFS读取block写入到worker中
    try (LockResource lock = new LockResource(mPageMetaStore.getLock().writeLock())) {
      // in case someone else has added this block while we wait for the lock,
      // just use the block meta; otherwise create a new one and add to the metastore
      Optional<PagedBlockMeta> blockMeta = mPageMetaStore.getBlock(blockId);
      //获取到写锁后如果worker又缓存了block,依然读取worker中的block
      if (blockMeta.isPresent()) {
        blockMeta.get().getDir().getEvictor().addPinnedBlock(blockId);
        return new DelegatingBlockReader(getBlockReader(blockMeta.get(), offset, options), () -> {
          blockMeta.get().getDir().getEvictor().removePinnedBlock(blockId);
          unpinBlock(blockLock);
        });
      }
      long blockSize = options.getBlockSize();
      PagedBlockStoreDir dir =
          (PagedBlockStoreDir) mPageMetaStore.allocate(BlockPageId.fileIdOf(blockId, blockSize),
              blockSize);
      PagedBlockMeta newBlockMeta = new PagedBlockMeta(blockId, blockSize, dir);
      //如果用户设置了不缓存block到worker,就直接读取UFS
      if (options.getNoCache()) {
        // block does not need to be cached in Alluxio, no need to add and commit it
        unpinBlock(blockLock);
        final UfsBlockReadOptions readOptions;
        try {
          readOptions = UfsBlockReadOptions.fromProto(options);
        } catch (IllegalArgumentException e) {
          throw new AlluxioRuntimeException(Status.INTERNAL,
              String.format("Block %d may need to be read from UFS, but key UFS read options "
                  + "is missing in client request", blockId), e, ErrorType.Internal, false);
        }
        //构建一个UFSBlockReader
        return new PagedUfsBlockReader(mUfsManager, mUfsInStreamCache, newBlockMeta,
            offset, readOptions, mPageSize);
      }
      //新构建一个block
      mPageMetaStore.addBlock(newBlockMeta);
      dir.getEvictor().addPinnedBlock(blockId);
      //默认情况下,将block信息从UFS中写入到worker中
      return new DelegatingBlockReader(getBlockReader(newBlockMeta, offset, options), () -> {
        //先
        commitBlockToMaster(newBlockMeta);
        newBlockMeta.getDir().getEvictor().removePinnedBlock(blockId);
        unpinBlock(blockLock);
      });
    }
  }

4. HDFS客户端访问Alluxio过程

HDFS的配置文件core-site.xml中增加Alluxio文件系统配置:

<property>
        <name>fs.alluxio.impl</name>
        <value>alluxio.hadoop.FileSystem</value>
  </property>
  <property>
        <name>fs.AbstractFileSystem.alluxio.impl</name>
        <value>alluxio.hadoop.AlluxioFileSystem</value>
        <description>The Alluxio AbstractFileSystem (Hadoop 2.x)</description>
  </property>

在hadoop的安装包hadoop/share/hadoop/common中,增加alluxio SDK包alluxio-client.jar。

随后,业务方在代码中设置alluxio前缀的访问:

public void create() throws URISyntaxException, IOException, InterruptedException {
        // 配置文件
        Configuration conf = new Configuration();
        // 获取文件系统
        FileSystem fs = FileSystem.get(new URI("alluxio://{alluxio leader}/path"), conf, 访问的用户);
        // 创建文件并写入数据
        FSDataOutputStream out = fs.create(new Path("/root/test3.txt"));
        out.write("Hello, HDFS".getBytes());
        out.flush();
        // 关闭流
        fs.close();
    }

FileSystem解析到alluixo前缀,从而获取fs.alluxio.impl配置alluxio.hadoop.FileSystem,通过alluxio.hadoop.FileSystem创建Alluxio的文件系统客户端对象:

private static FileSystem createFileSystem(URI uri, Configuration conf)
      throws IOException {
    Tracer tracer = FsTracer.get(conf);
    try(TraceScope scope = tracer.newScope("FileSystem#createFileSystem")) {
      scope.addKVAnnotation("scheme", uri.getScheme());
      //查询core-site.xml中fs.alluxio.impl的配置,为alluxio.hadoop.FileSystem
      Class<?> clazz = getFileSystemClass(uri.getScheme(), conf);
      //通过反射创建FileSystem子类alluxio.hadoop.FileSystemm对象
      FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf);
      //alluxio.hadoop.FileSystem初始化操作
      fs.initialize(uri, conf);
      return fs;
    }
  }
【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月19日 0

暂无评论

推荐阅读
GQ7psP7UJw7k