1. 背景
在https://blog.51cto.com/u_15327484/8023493和https://blog.51cto.com/u_15327484/8089923两篇文章中,分别详细介绍了客户端和Namenode在HDFS写流程的处理逻辑。本文将介绍DataNode接收数据的流程,本文省略与NameNode交互细节,专注于数据的发送与接收。
2. DataNode事件处理线程模型
在DataNode启动时,会创建TcpPeerServer这个SocketServer,传递给DataXceiverServer进行处理:
public class DataNode extends ReconfigurableBase
implements InterDatanodeProtocol, ClientDatanodeProtocol,
TraceAdminProtocol, DataNodeMXBean, ReconfigurationProtocol {
DataXceiverServer xserver = null;
//启动DataNode服务时,会初始化DataXceiverServer服务
void startDataNode(List<StorageLocation> dataDirectories,
SecureResources resources
) throws IOException {
//初始化
initDataXceiver();
}
private void initDataXceiver() throws IOException {
// find free port or use privileged port provided
TcpPeerServer tcpPeerServer;
if (secureResources != null) {
tcpPeerServer = new TcpPeerServer(secureResources);
} else {
//处理连接请求时,未能立即处理的请求,默认是长度是128
int backlogLength = getConf().getInt(
CommonConfigurationKeysPublic.IPC_SERVER_LISTEN_QUEUE_SIZE_KEY,
CommonConfigurationKeysPublic.IPC_SERVER_LISTEN_QUEUE_SIZE_DEFAULT);
tcpPeerServer = new TcpPeerServer(dnConf.socketWriteTimeout,
DataNode.getStreamingAddr(getConf()), backlogLength);
}
if (dnConf.getTransferSocketRecvBufferSize() > 0) {
tcpPeerServer.setReceiveBufferSize(
dnConf.getTransferSocketRecvBufferSize());
}
streamingAddr = tcpPeerServer.getStreamingAddr();
LOG.info("Opened streaming server at {}", streamingAddr);
this.threadGroup = new ThreadGroup("dataXceiverServer");
//创建DataXceiverServer服务
xserver = new DataXceiverServer(tcpPeerServer, getConf(), this);
this.dataXceiverServer = new Daemon(threadGroup, xserver);
this.threadGroup.setDaemon(true); // auto destroy when empty
//省略
}
}
DataXceiverServer是一个线程,它主要负责在一个while循环中利用TcpPeerServer(也就是ServerSocket)的accept()方法阻塞,直到接收到客户端或者其他DataNode的连接请求,然后:
- 获得peer,即Socket的封装。
- 判断当前DataNode上DataXceiver线程数量是否超过阈值,如果超过的话,直接抛出IOException,利用IOUtils的cleanup()方法关闭peer后继续循环,否则继续3。
- 创建一个后台线程DataXceiver,并将其加入到datanode的线程组threadGroup中,并启动该线程,响应数据读写请求。
public void run() {
Peer peer = null;
while (datanode.shouldRun && !datanode.shutdownForUpgrade) {
try {
peer = peerServer.accept();
//处理datanode请求的线程数超过最大maxXceiverCount时,报错退出
// Make sure the xceiver count is not exceeded
int curXceiverCount = datanode.getXceiverCount();
if (curXceiverCount > maxXceiverCount) {
throw new IOException("Xceiver count " + curXceiverCount
+ " exceeds the limit of concurrent xcievers: "
+ maxXceiverCount);
}
//启动一个新的后台线程DataXceiver,用于处理用户请求
new Daemon(datanode.threadGroup,
DataXceiver.create(peer, datanode, this))
.start();
//省略
}
DataNode线程模型如下,它没有使用基于Reactor的Hadoop RPC模型,而是直接使用最基础的socket+异步线程模型:
3. DataXceiver线程处理writeBlock请求
DataXceiver线程根据用户请求创建socket的input输入流,读取输入流的前一个字节,解析出来对应的op操作,再执行op操作。当socket关闭时,op操作结束,线程退出:
public void run() {
int opsProcessed = 0;
Op op = null;
try {
synchronized(this) {
xceiver = Thread.currentThread();
}
dataXceiverServer.addPeer(peer, Thread.currentThread(), this);
peer.setWriteTimeout(datanode.getDnConf().socketWriteTimeout);
InputStream input = socketIn;
try {
IOStreamPair saslStreams = datanode.saslServer.receive(peer, socketOut,
socketIn, datanode.getXferAddress().getPort(),
datanode.getDatanodeId());
//输入流
input = new BufferedInputStream(saslStreams.in,
smallBufferSize);
socketOut = saslStreams.out;
}
//省略
//根据输入流,查看用户请求执行的op操作
op = readOp();
//省略
执行op操作
processOp(op);
++opsProcessed;
} while ((peer != null) &&
(!peer.isClosed() && dnConf.socketKeepaliveTimeout > 0));
//省略
if (peer != null) {
dataXceiverServer.closePeer(peer);
IOUtils.closeStream(in);
}
}
}
OP操作码定义有如下:
public enum Op {
WRITE_BLOCK((byte)80),
READ_BLOCK((byte)81),
READ_METADATA((byte)82),
REPLACE_BLOCK((byte)83),
COPY_BLOCK((byte)84),
BLOCK_CHECKSUM((byte)85),
TRANSFER_BLOCK((byte)86),
REQUEST_SHORT_CIRCUIT_FDS((byte)87),
RELEASE_SHORT_CIRCUIT_FDS((byte)88),
REQUEST_SHORT_CIRCUIT_SHM((byte)89);
.........................
}
DataXceiver.processOp定义了处理op的转发逻辑:
/** Process op by the corresponding method. */
protected final void processOp(Op op) throws IOException {
switch(op) {
case READ_BLOCK:
opReadBlock();
break;
case WRITE_BLOCK:
opWriteBlock(in);
break;
case REPLACE_BLOCK:
opReplaceBlock(in);
break;
case COPY_BLOCK:
opCopyBlock(in);
break;
case BLOCK_CHECKSUM:
opBlockChecksum(in);
break;
case TRANSFER_BLOCK:
opTransferBlock(in);
break;
case REQUEST_SHORT_CIRCUIT_FDS:
opRequestShortCircuitFds(in);
break;
case RELEASE_SHORT_CIRCUIT_FDS:
opReleaseShortCircuitFds(in);
break;
case REQUEST_SHORT_CIRCUIT_SHM:
opRequestShortCircuitShm(in);
break;
default:
throw new IOException("Unknown op " + op + " in data stream");
}
}
DataXceiver.writeBlock方法执行获取下游DataNode的socket地址,发送writeBlock请求,当前datanode开始调用blockReceiver.receiveBlock()方法接收block数据:
public void writeBlock(final ExtendedBlock block,
final StorageType storageType,
final Token<BlockTokenIdentifier> blockToken,
final String clientname,
final DatanodeInfo[] targets,
final StorageType[] targetStorageTypes,
final DatanodeInfo srcDataNode,
final BlockConstructionStage stage,
final int pipelineSize,
final long minBytesRcvd,
final long maxBytesRcvd,
final long latestGenerationStamp,
DataChecksum requestedChecksum,
CachingStrategy cachingStrategy,
boolean allowLazyPersist,
final boolean pinning,
final boolean[] targetPinnings,
final String storageId,
final String[] targetStorageIds) throws IOException {
DataOutputStream mirrorOut = null; // stream to next target 下游节点的输出流
DataInputStream mirrorIn = null; // reply from next target 下游节点的输入流
Socket mirrorSock = null; // socket to next target 下一个节点的Socket
String mirrorNode = null; // the name:port of next target 下一个节点的host:端口
String firstBadLink = ""; // first datanode that failed in connection setup 管道中第一恶坏的节点
// open a block receiver
blockReceiver = new BlockReceiver(block, storageType, in,
peer.getRemoteAddressString(),
peer.getLocalAddressString(),
stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd,
clientname, srcDataNode, datanode, requestedChecksum,
cachingStrategy, allowLazyPersist, pinning);
//省略
//构造下游节点的输出流mirrorOut,用于往下游写数据
//构造下游节点的输入流mirrorIn,用于下游接收相应信息
mirrorOut = new DataOutputStream(new BufferedOutputStream(unbufMirrorOut,
HdfsConstants.SMALL_BUFFER_SIZE));
mirrorIn = new DataInputStream(unbufMirrorIn);
//和客户端往第一个datanode发数据请求一样,构造了一个Sender对象,封装了写数据需要的参数,通过writeBlock方法,加上标识为80的操作码,将相关数据发送到下一个datanode
//省略
new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0],
blockToken, clientname, targets, targetStorageTypes, srcDataNode,
stage, pipelineSize, minBytesRcvd, maxBytesRcvd,
latestGenerationStamp, requestedChecksum, cachingStrategy,
false, false, targetPinnings);
}
//flush数据流
mirrorOut.flush();
//开始接收数据
if (blockReceiver != null) {
String mirrorAddr = (mirrorSock == null) ? null : mirrorNode;
blockReceiver.receiveBlock(mirrorOut, mirrorIn, replyOut,
mirrorAddr, null, targets, false);
// send close-ack for transfer-RBW/Finalized
//数据块复制操作,直接把状态置成SUCCESS,返回上游节点相关信息
if (isTransfer) {
if (LOG.isTraceEnabled()) {
LOG.trace("TRANSFER: send close-ack");
}
writeResponse(SUCCESS, null, replyOut);
}
}
BlockReceiver.receiveBlock调用receivePacket()方法接收packet,如果当前节点是客户端,会额外创建PacketResponder线程获取packet响应:
if (isClient && !isTransfer) {
responder = new Daemon(datanode.threadGroup,
new PacketResponder(replyOut, mirrIn, downstreams));
responder.start(); // start thread to processes responses
}
while (receivePacket() >= 0) { /* Receive until the last packet */ }
BlockReceiver.receivePacket先将数据发送到下一个datanode,再落盘到本地中:
/**
* 接收并且处理一个数据包,它里面可能包含很多的块(512字节的chunks)
* Receives and processes a packet. It can contain many .
* 返回接收到的数据包包含了多少个字节
* returns the number of data bytes that the packet has.
*/
private int receivePacket() throws IOException {
// read the next packet 从输入流中读取下一个数据包
packetReceiver.receiveNextPacket(in);
//获取数据包头信息,并校验
PacketHeader header = packetReceiver.getHeader();
//先将数据发送到下一个datanode
packetReceiver.mirrorPacketTo(mirrorOut);
//省略
//接收数据信息和数据的校验和信息,并写入缓冲区
ByteBuffer dataBuf = packetReceiver.getDataSlice();
ByteBuffer checksumBuf = packetReceiver.getChecksumSlice();
//省略
// Write data to disk.
//前面经过了一大堆的校验之后,将数据写入磁盘
long begin = Time.monotonicNow();
streams.writeDataToDisk(dataBuf.array(), startByteToDisk, numBytesToDisk);
//省略
/// flush entire packet, sync if requested
//flush操作
flushOrSync(syncBlock);
//省略
}
最终,通过flush操作,datanode中数据部分会保存在blk_xxx文件中,checkSum会保存在blk_xxx_xxx.meta文件中:
DataNode处理流程总结如下所示:
4. 总结
- Client通过new Sender.writeBlock()向DataNode来发送写文件请求,DataNode内部有一个DataXceiverServer对象来处理来Scoket请求,每次接收到请求的时候,会创建一个DataXceiver的线程来处理来自客户端的Socket请求,DataXceiver实现了Runnable接口,是一个线程,因此,主要从run方法入手。接收到请求后,首先调用op = readOp(),读取来自Socket的请求,判断处理的类型,客户端通过new Sender.writeBlock来写文件的操作为:WRITE_BLOCK,然后通过调用opWriteBlock(in);来处理请求。
- 调用writeBlock方法来执行写Block操作,首先先检查是否具有读写权限,校验来自客户的AccessToken,接着通过setCurrentBlockReceiver(getBlockReceiver())来实例化BlockReceiver对象,BlockReceiver对象主要负责读写Block的操作。
- 在调用BlockReceiver类的构造函数期间会创建块的BlockMeta元信息文件和Block文件,等待接收来自客户的DFSPacket文件,在执行创建Block的Meta信息和Block块文件,会通过datanode.notifyNamenodeReceivingBlock()来通知NameNode块的位置。
- BlockReceiver实例化完毕后,通过new Sender(mirrorOut).writeBlock向下一个DataNode执行写文件请求,然后下一个DataNode执行和上面相同的请求。并且下游的DataNode节点会通过writeResponse返回连接成功信息,然后第一个DataNode节点通过BlockOpResponseProto向客户端发送建立成功的通知,接着开始调用blockReceiver.receiveBlock接收Block传输。
- 在BlockReceiver中,通过receivePacket循环接收来自客户端的DFSPacket,每次接收到一个包后,通过packetReceiver.mirrorPacketTo(mirrorOut)写到下一个节点,并且有一个PacketResponder线程等待每个包的确确认信息,然后返回上游的DataNode节点。
- 在所有的DFSPacket都接收完毕后,会通过writeResponse(SUCCESS, null, replyOut)向上游节点返回写Block成功信息。接着通过datanode.closeBlock向NameNode节点发送Block接收完成信息,NameNode更新节点信息。完成一个Block的接收。