HDFS写流程分析:DataNode接收client数据
  GQ7psP7UJw7k 2023年11月14日 30 0

1. 背景

https://blog.51cto.com/u_15327484/8023493https://blog.51cto.com/u_15327484/8089923两篇文章中,分别详细介绍了客户端和Namenode在HDFS写流程的处理逻辑。本文将介绍DataNode接收数据的流程,本文省略与NameNode交互细节,专注于数据的发送与接收。

2. DataNode事件处理线程模型

在DataNode启动时,会创建TcpPeerServer这个SocketServer,传递给DataXceiverServer进行处理:

public class DataNode extends ReconfigurableBase
    implements InterDatanodeProtocol, ClientDatanodeProtocol,
        TraceAdminProtocol, DataNodeMXBean, ReconfigurationProtocol {

DataXceiverServer xserver = null;

//启动DataNode服务时,会初始化DataXceiverServer服务
void startDataNode(List<StorageLocation> dataDirectories,
                     SecureResources resources
                     ) throws IOException {
  //初始化
  initDataXceiver();
}

private void initDataXceiver() throws IOException {
    // find free port or use privileged port provided
    TcpPeerServer tcpPeerServer;
    if (secureResources != null) {
      tcpPeerServer = new TcpPeerServer(secureResources);
    } else {
      //处理连接请求时,未能立即处理的请求,默认是长度是128
      int backlogLength = getConf().getInt(
          CommonConfigurationKeysPublic.IPC_SERVER_LISTEN_QUEUE_SIZE_KEY,
          CommonConfigurationKeysPublic.IPC_SERVER_LISTEN_QUEUE_SIZE_DEFAULT);
      tcpPeerServer = new TcpPeerServer(dnConf.socketWriteTimeout,
          DataNode.getStreamingAddr(getConf()), backlogLength);
    }
    if (dnConf.getTransferSocketRecvBufferSize() > 0) {
      tcpPeerServer.setReceiveBufferSize(
          dnConf.getTransferSocketRecvBufferSize());
    }
    streamingAddr = tcpPeerServer.getStreamingAddr();
    LOG.info("Opened streaming server at {}", streamingAddr);
    this.threadGroup = new ThreadGroup("dataXceiverServer");
    //创建DataXceiverServer服务
    xserver = new DataXceiverServer(tcpPeerServer, getConf(), this);
    this.dataXceiverServer = new Daemon(threadGroup, xserver);
    this.threadGroup.setDaemon(true); // auto destroy when empty

    //省略
  }
}

DataXceiverServer是一个线程,它主要负责在一个while循环中利用TcpPeerServer(也就是ServerSocket)的accept()方法阻塞,直到接收到客户端或者其他DataNode的连接请求,然后:

  1. 获得peer,即Socket的封装。
  2. 判断当前DataNode上DataXceiver线程数量是否超过阈值,如果超过的话,直接抛出IOException,利用IOUtils的cleanup()方法关闭peer后继续循环,否则继续3。
  3. 创建一个后台线程DataXceiver,并将其加入到datanode的线程组threadGroup中,并启动该线程,响应数据读写请求。
public void run() {
    Peer peer = null;
    while (datanode.shouldRun && !datanode.shutdownForUpgrade) {
      try {
        peer = peerServer.accept();
        //处理datanode请求的线程数超过最大maxXceiverCount时,报错退出
        // Make sure the xceiver count is not exceeded
        int curXceiverCount = datanode.getXceiverCount();
        if (curXceiverCount > maxXceiverCount) {
          throw new IOException("Xceiver count " + curXceiverCount
              + " exceeds the limit of concurrent xcievers: "
              + maxXceiverCount);
        }
        //启动一个新的后台线程DataXceiver,用于处理用户请求
        new Daemon(datanode.threadGroup,
            DataXceiver.create(peer, datanode, this))
            .start();
      //省略
  }

DataNode线程模型如下,它没有使用基于Reactor的Hadoop RPC模型,而是直接使用最基础的socket+异步线程模型:

Untitled.png

3. DataXceiver线程处理writeBlock请求

DataXceiver线程根据用户请求创建socket的input输入流,读取输入流的前一个字节,解析出来对应的op操作,再执行op操作。当socket关闭时,op操作结束,线程退出:

public void run() {
    int opsProcessed = 0;
    Op op = null;

    try {
      synchronized(this) {
        xceiver = Thread.currentThread();
      }
      dataXceiverServer.addPeer(peer, Thread.currentThread(), this);
      peer.setWriteTimeout(datanode.getDnConf().socketWriteTimeout);
      InputStream input = socketIn;
      try {
        IOStreamPair saslStreams = datanode.saslServer.receive(peer, socketOut,
          socketIn, datanode.getXferAddress().getPort(),
          datanode.getDatanodeId());
        //输入流
        input = new BufferedInputStream(saslStreams.in,
            smallBufferSize);
        socketOut = saslStreams.out;
      } 
       //省略
        //根据输入流,查看用户请求执行的op操作
        op = readOp();
        //省略
        执行op操作
        processOp(op);
        ++opsProcessed;
      } while ((peer != null) &&
          (!peer.isClosed() && dnConf.socketKeepaliveTimeout > 0));
      //省略
      if (peer != null) {
        dataXceiverServer.closePeer(peer);
        IOUtils.closeStream(in);
      }
    }
  }

OP操作码定义有如下:

public enum Op {
  WRITE_BLOCK((byte)80),
  READ_BLOCK((byte)81),
  READ_METADATA((byte)82),
  REPLACE_BLOCK((byte)83),
  COPY_BLOCK((byte)84),
  BLOCK_CHECKSUM((byte)85),
  TRANSFER_BLOCK((byte)86),
  REQUEST_SHORT_CIRCUIT_FDS((byte)87),
  RELEASE_SHORT_CIRCUIT_FDS((byte)88),
  REQUEST_SHORT_CIRCUIT_SHM((byte)89);

  .........................
}

DataXceiver.processOp定义了处理op的转发逻辑:

/** Process op by the corresponding method. */
  protected final void processOp(Op op) throws IOException {
    switch(op) {
    case READ_BLOCK:
      opReadBlock();
      break;
    case WRITE_BLOCK:
      opWriteBlock(in);
      break;
    case REPLACE_BLOCK:
      opReplaceBlock(in);
      break;
    case COPY_BLOCK:
      opCopyBlock(in);
      break;
    case BLOCK_CHECKSUM:
      opBlockChecksum(in);
      break;
    case TRANSFER_BLOCK:
      opTransferBlock(in);
      break;
    case REQUEST_SHORT_CIRCUIT_FDS:
      opRequestShortCircuitFds(in);
      break;
    case RELEASE_SHORT_CIRCUIT_FDS:
      opReleaseShortCircuitFds(in);
      break;
    case REQUEST_SHORT_CIRCUIT_SHM:
      opRequestShortCircuitShm(in);
      break;
    default:
      throw new IOException("Unknown op " + op + " in data stream");
    }
  }

DataXceiver.writeBlock方法执行获取下游DataNode的socket地址,发送writeBlock请求,当前datanode开始调用blockReceiver.receiveBlock()方法接收block数据:

public void writeBlock(final ExtendedBlock block,
      final StorageType storageType, 
      final Token<BlockTokenIdentifier> blockToken,
      final String clientname,
      final DatanodeInfo[] targets,
      final StorageType[] targetStorageTypes,
      final DatanodeInfo srcDataNode,
      final BlockConstructionStage stage,
      final int pipelineSize,
      final long minBytesRcvd,
      final long maxBytesRcvd,
      final long latestGenerationStamp,
      DataChecksum requestedChecksum,
      CachingStrategy cachingStrategy,
      boolean allowLazyPersist,
      final boolean pinning,
      final boolean[] targetPinnings,
      final String storageId,
      final String[] targetStorageIds) throws IOException {

    DataOutputStream mirrorOut = null;  // stream to next target  下游节点的输出流
    DataInputStream mirrorIn = null;    // reply from next target 下游节点的输入流
    Socket mirrorSock = null;           // socket to next target  下一个节点的Socket
    String mirrorNode = null;           // the name:port of next target 下一个节点的host:端口
    String firstBadLink = "";           // first datanode that failed in connection setup 管道中第一恶坏的节点

    // open a block receiver
        blockReceiver = new BlockReceiver(block, storageType, in,
            peer.getRemoteAddressString(),
            peer.getLocalAddressString(),
            stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd,
            clientname, srcDataNode, datanode, requestedChecksum,
            cachingStrategy, allowLazyPersist, pinning);

          //省略
          //构造下游节点的输出流mirrorOut,用于往下游写数据
          //构造下游节点的输入流mirrorIn,用于下游接收相应信息
          mirrorOut = new DataOutputStream(new BufferedOutputStream(unbufMirrorOut,
              HdfsConstants.SMALL_BUFFER_SIZE));
          mirrorIn = new DataInputStream(unbufMirrorIn);

         //和客户端往第一个datanode发数据请求一样,构造了一个Sender对象,封装了写数据需要的参数,通过writeBlock方法,加上标识为80的操作码,将相关数据发送到下一个datanode
          //省略
            new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0],
              blockToken, clientname, targets, targetStorageTypes, srcDataNode,
              stage, pipelineSize, minBytesRcvd, maxBytesRcvd,
              latestGenerationStamp, requestedChecksum, cachingStrategy,
              false, false, targetPinnings);
          }

          //flush数据流
          mirrorOut.flush();

     //开始接收数据
     if (blockReceiver != null) {
        String mirrorAddr = (mirrorSock == null) ? null : mirrorNode;
        blockReceiver.receiveBlock(mirrorOut, mirrorIn, replyOut,
            mirrorAddr, null, targets, false);

        // send close-ack for transfer-RBW/Finalized 
        //数据块复制操作,直接把状态置成SUCCESS,返回上游节点相关信息
        if (isTransfer) {
          if (LOG.isTraceEnabled()) {
            LOG.trace("TRANSFER: send close-ack");
          }
          writeResponse(SUCCESS, null, replyOut);
        }
      }

BlockReceiver.receiveBlock调用receivePacket()方法接收packet,如果当前节点是客户端,会额外创建PacketResponder线程获取packet响应:

if (isClient && !isTransfer) {
        responder = new Daemon(datanode.threadGroup, 
            new PacketResponder(replyOut, mirrIn, downstreams));
        responder.start(); // start thread to processes responses
      }

      while (receivePacket() >= 0) { /* Receive until the last packet */ }

BlockReceiver.receivePacket先将数据发送到下一个datanode,再落盘到本地中:

/** 
   * 接收并且处理一个数据包,它里面可能包含很多的块(512字节的chunks)
   * Receives and processes a packet. It can contain many .
   * 返回接收到的数据包包含了多少个字节
   * returns the number of data bytes that the packet has.
   */
  private int receivePacket() throws IOException {
          // read the next packet 从输入流中读取下一个数据包
            packetReceiver.receiveNextPacket(in);

            //获取数据包头信息,并校验   
            PacketHeader header = packetReceiver.getHeader();  
            //先将数据发送到下一个datanode
            packetReceiver.mirrorPacketTo(mirrorOut);
            //省略
            //接收数据信息和数据的校验和信息,并写入缓冲区
            ByteBuffer dataBuf = packetReceiver.getDataSlice();
            ByteBuffer checksumBuf = packetReceiver.getChecksumSlice();
           //省略
           // Write data to disk.
           //前面经过了一大堆的校验之后,将数据写入磁盘
          long begin = Time.monotonicNow();
          streams.writeDataToDisk(dataBuf.array(), startByteToDisk, numBytesToDisk);
					//省略
          /// flush entire packet, sync if requested
          //flush操作
          flushOrSync(syncBlock);
          //省略

  }

最终,通过flush操作,datanode中数据部分会保存在blk_xxx文件中,checkSum会保存在blk_xxx_xxx.meta文件中: image.png

DataNode处理流程总结如下所示:

Untitled 1.png

4. 总结

  1. Client通过new Sender.writeBlock()向DataNode来发送写文件请求,DataNode内部有一个DataXceiverServer对象来处理来Scoket请求,每次接收到请求的时候,会创建一个DataXceiver的线程来处理来自客户端的Socket请求,DataXceiver实现了Runnable接口,是一个线程,因此,主要从run方法入手。接收到请求后,首先调用op = readOp(),读取来自Socket的请求,判断处理的类型,客户端通过new Sender.writeBlock来写文件的操作为:WRITE_BLOCK,然后通过调用opWriteBlock(in);来处理请求。
  2. 调用writeBlock方法来执行写Block操作,首先先检查是否具有读写权限,校验来自客户的AccessToken,接着通过setCurrentBlockReceiver(getBlockReceiver())来实例化BlockReceiver对象,BlockReceiver对象主要负责读写Block的操作。
  3. 在调用BlockReceiver类的构造函数期间会创建块的BlockMeta元信息文件和Block文件,等待接收来自客户的DFSPacket文件,在执行创建Block的Meta信息和Block块文件,会通过datanode.notifyNamenodeReceivingBlock()来通知NameNode块的位置。
  4. BlockReceiver实例化完毕后,通过new Sender(mirrorOut).writeBlock向下一个DataNode执行写文件请求,然后下一个DataNode执行和上面相同的请求。并且下游的DataNode节点会通过writeResponse返回连接成功信息,然后第一个DataNode节点通过BlockOpResponseProto向客户端发送建立成功的通知,接着开始调用blockReceiver.receiveBlock接收Block传输。
  5. 在BlockReceiver中,通过receivePacket循环接收来自客户端的DFSPacket,每次接收到一个包后,通过packetReceiver.mirrorPacketTo(mirrorOut)写到下一个节点,并且有一个PacketResponder线程等待每个包的确确认信息,然后返回上游的DataNode节点。
  6. 在所有的DFSPacket都接收完毕后,会通过writeResponse(SUCCESS, null, replyOut)向上游节点返回写Block成功信息。接着通过datanode.closeBlock向NameNode节点发送Block接收完成信息,NameNode更新节点信息。完成一个Block的接收。
【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月14日 0

暂无评论

推荐阅读
GQ7psP7UJw7k