1. 背景
在上一篇文章中:https://blog.51cto.com/u_15327484/7790888,介绍了ResourceManager从接受用户提交用户的请求,到ResourceManager向Nodemanager发送startContainer请求启动applicationMaster程序的部分重要流程。
本文将继续查看Nodemanager中,如何启动Container。
2. Container定义
在启动Container前,首先要清楚Container是什么。在Yarn中,Container是抽象类,具体资源由其实现类ContainerPBImpl表示。
public class ContainerPBImpl extends Container {
//ContainerId表示Container的唯一标识
private ContainerId containerId = null;
//NodeId表示Container所在的节点ID
private NodeId nodeId = null;
//该Container所表示的资源大小,例如多少内存,多少vcore
private Resource resource = null;
//Container优先级,优先级低先执行
private Priority priority = null;
//安全认证的token信息
private Token containerToken = null;
}
Resource由ResourcePBImpl表示,它从protobuf生成类中获取内存及vcore信息:
public class ResourcePBImpl extends Resource {
ResourceProto proto = ResourceProto.getDefaultInstance();
public int getMemory() {
ResourceProtoOrBuilder p = viaProto ? proto : builder;
return (p.getMemory());
}
@Override
public int getVirtualCores() {
ResourceProtoOrBuilder p = viaProto ? proto : builder;
return (p.getVirtualCores());
}
}
可以看到,Container只是资源分配的一个封装。申请指定内存和vcore的container时,resourcemanager会查看每个nodemanger还剩多少资源,然后匹配查看是否有一台nodemanager还剩下指定内存和vcore,如果没有满足该资源请求的NM,resourcemanager就会报错。
Container自身不是一个进程,我们所说的启动container,其实是在指定在NM中获取这部分资源,通过ContainerExecutor,执行AppMaster,MapTask,ReduceTask这些任务,当然也可以是其他计算组件的进程。
3. Nodemanager启动容器流程简述
NodeManager启动容器,会经历以下事件:
- 创建容器对应的Application对象,并启动日志聚合线程定期聚合日志。
- 下载运行Task所需的三种类型的public、private和application。
- 启动Task进程。
对应的Yarn事件如下:
- initApplication类型的ApplicationEvent。
- ApplicationStarted类型的LogHandlerEvent。
- application_log_handling_inited类型ApplicationEvent。
- init_application_resources类型的LocalizationEvent。
- ResourceLocalizationService是LocalizationEvent的事件处理器。在处理init_application_resources类型的LocalizationEvent时,记录用户与LocalResourcesTracker的map映射到privateRsrc map集合,记录appId与LocalResourcesTracker的map映射到appRsrc map集合。
- applicationInited类型的ApplicationEvent。
- initContainer类型的ContainerEvent。
- localize_container_resources类型的LocalizationEvent。
- ResourceLocalizationService是LocalizationEvent的事件处理器。在处理localize_container_resources类型的LocalizationEvent时,将LocalizationEvent显式转换为ContainerLocalizationRequestEvent,根据event中请求的resource的可见性、用户、appId,从ResourceLocalizationService的privateRsrc map集合和appRsrc map集合中获取相应的LocalResourcesTracker,由LocalResourcesTracker发起request类型的ResourceEvent。
- request类型的ResourceEvent。LocalResourceTrackerImpl是ResourceEvent的事件处理器。在处理request类型的ResourceEvent时,创建LocalizedResource,并记录该request与LocalizedResource的映射关系到map集合中。LocalizedResource同样也是ResourceEvent的事件处理器,ResourceEvent被抛给LocalizedResource处理,然后LocalizedResource发起request_resource_localization类型的LocalizerEvent。
- request_resource_localization类型的LocalizerEvent。LocalizerTracker是LocalizerEvent的事件处理器。LocalizerTracker根据资源的类型进行资源本地化。资源有3种类型:public、private和application。public resources的资源本地化使用PublicLocalizer完成,它通过线程池异步下载资源。private和application的资源本地化将启动独立的进程ContainerLocalizer执行下载资源的工作。
- localized类型的ResourceEvent。
- resource_localized类型的ContainerEvent。
- scheduler_container类型的ContainerSchedulerEvent。
- LaunchContainer类型的ContainersLaunchEvent。
可以看到Yarn启动容器需要进行的转换非常多,本文主要介绍重要的步骤。
4. application_log_handling_inited事件进行日志聚合
使用LogAggregationService.handle
处理该类型事件时,调用initApp初始化,在该方法中,会调用initAppAggregator进行日志聚合:
private void initApp(final ApplicationId appId, String user,
Credentials credentials, ContainerLogsRetentionPolicy logRetentionPolicy,
Map<ApplicationAccessType, String> appAcls,
LogAggregationContext logAggregationContext) {
ApplicationEvent eventResponse;
try {
long start = System.currentTimeMillis();
verifyAndCreateRemoteLogDir(getConfig());
//初始化日志聚合器
initAppAggregator(appId, user, credentials, logRetentionPolicy, appAcls,
logAggregationContext);
long els = System.currentTimeMillis() - start;
LOG.info("AppId = " + appId + ", initApp take time " + els + " ms");
if (metrics != null) {
this.metrics.addAppLogDirInitDuration(els);
}
eventResponse = new ApplicationEvent(appId,
ApplicationEventType.APPLICATION_LOG_HANDLING_INITED);
} catch (YarnRuntimeException e) {
LOG.warn("Application failed to init aggregation", e);
eventResponse = new ApplicationEvent(appId,
ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED);
}
this.dispatcher.getEventHandler().handle(eventResponse);
}
在AppLogAggregatorImpl的initAppAggregator方法中,会启动一个线程池,定时执行上传日志的行为:
private void doAppLogAggregation() {
while (!this.appFinishing.get() && !this.aborted.get()) {
synchronized(this) {
try {
if (this.rollingMonitorInterval > 0) {
wait(this.rollingMonitorInterval * 1000);
if (this.appFinishing.get() || this.aborted.get()) {
break;
}
uploadLogsForContainers(false);
} else {
wait(THREAD_SLEEP_TIME);
}
} catch (InterruptedException e) {
LOG.warn("PendingContainers queue is interrupted");
this.appFinishing.set(true);
}
}
}
5. request_resource_localization事件进行资源本地化
资源本地化其实是指下载HDFS中该容器所需的资源。
Yarn使用LocalizerTracker处理该类型事件。其中,PUBLIC类型的资源丢到PublicLocalizer的队列中,进行异步下载;PRIVATE和APPLICATION分别启动LocalizerRunner线程下载资源:
private final PublicLocalizer publicLocalizer;
@Override
public void handle(LocalizerEvent event) {
String locId = event.getLocalizerId();
switch (event.getType()) {
case REQUEST_RESOURCE_LOCALIZATION:
// 0) find running localizer or start new thread
LocalizerResourceRequestEvent req =
(LocalizerResourceRequestEvent)event;
switch (req.getVisibility()) {
case PUBLIC:
//将请求加入publicLocalizer的内部队列,再由线程池执行异步下载资源的任务
publicLocalizer.addResource(req);
break;
case PRIVATE:
case APPLICATION:
synchronized (privLocalizers) {
LocalizerRunner localizer = privLocalizers.get(locId);
if (localizer != null && localizer.killContainerLocalizer.get()) {
// Old localizer thread has been stopped, remove it and creates
// a new localizer thread.
LOG.info("New " + event.getType() + " localize request for "
+ locId + ", remove old private localizer.");
cleanupPrivLocalizers(locId);
localizer = null;
}
if (null == localizer) {
LOG.info("Created localizer for " + locId);
localizer = new LocalizerRunner(req.getContext(), locId);
privLocalizers.put(locId, localizer);
//启动LocalizerRunner线程,LocalizerRunner线程在运行中会启动ContainerLocalizer进程下载资源
localizer.start();
}
// 1) propagate event
localizer.addResource(req);
}
break;
}
break;
}
}
PublicLocalizer通过addResource封装下载线程,提交到线程池中运行:
class PublicLocalizer extends Thread {
final FileContext lfs;
final Configuration conf;
final ExecutorService threadPool;
final CompletionService<Path> queue;
// Its shared between public localizer and dispatcher thread.
//存储future与LocalizerResourceRequestEvent的对应关系
final Map<Future<Path>,LocalizerResourceRequestEvent> pending;
PublicLocalizer(Configuration conf) {
super("Public Localizer");
this.lfs = getLocalFileContext(conf);
this.conf = conf;
this.pending = Collections.synchronizedMap(
new HashMap<Future<Path>, LocalizerResourceRequestEvent>());
this.threadPool = createLocalizerExecutor(conf);
this.queue = new ExecutorCompletionService<Path>(threadPool);
}
public void addResource(LocalizerResourceRequestEvent request) {
if (rsrc.tryAcquire()) {
if (rsrc.getState() == ResourceState.DOWNLOADING) {
LocalResource resource = request.getResource().getRequest();
try {
Path publicRootPath =
dirsHandler.getLocalPathForWrite("." + Path.SEPARATOR
+ ContainerLocalizer.FILECACHE,
ContainerLocalizer.getEstimatedSize(resource), true);
Path publicDirDestPath =
publicRsrc.getPathForLocalization(key, publicRootPath,
delService);
if (publicDirDestPath == null) {
return;
}
if (!publicDirDestPath.getParent().equals(publicRootPath)) {
createParentDirs(publicDirDestPath, publicRootPath);
if (diskValidator != null) {
diskValidator.checkStatus(
new File(publicDirDestPath.toUri().getPath()));
} else {
throw new DiskChecker.DiskErrorException(
"Disk Validator is null!");
}
}
// explicitly synchronize pending here to avoid future task
// completing and being dequeued before pending updated
synchronized (pending) {
//将LocalizerResourceRequestEvent放入线程池中,执行异步下载资源
pending.put(queue.submit(new FSDownload(lfs, null, conf,
publicDirDestPath, resource, request.getContext().getStatCache())),
request);
}
}
//略
}
6. LaunchContainer事件启动容器
ContainersLauncher处理ContainersLauncherEvent事件。它构建ContainerLaunch线程提交到线程池中:
public ExecutorService containerLauncher =
HadoopExecutors.newCachedThreadPool(
new ThreadFactoryBuilder()
.setNameFormat("ContainersLauncher #%d")
.build());
public void handle(ContainersLauncherEvent event) {
// TODO: ContainersLauncher launches containers one by one!!
Container container = event.getContainer();
ContainerId containerId = container.getContainerId();
switch (event.getType()) {
case LAUNCH_CONTAINER:
Application app =
context.getApplications().get(
containerId.getApplicationAttemptId().getApplicationId());
ContainerLaunch launch =
new ContainerLaunch(context, getConfig(), dispatcher, exec, app,
event.getContainer(), dirsHandler, containerManager);
containerLauncher.submit(launch);
running.put(containerId, launch);
break;
//省略
}
}
ContainersLauncher.call准备好启动容器相关的资源,使用ContainerExecutor开始启动进程:
protected final ContainerExecutor exec;
public Integer call() {
if (!validateContainerState()) {
return 0;
}
final ContainerLaunchContext launchContext = container.getLaunchContext();
ContainerId containerID = container.getContainerId();
String containerIdStr = containerID.toString();
final List<String> command = launchContext.getCommands();
int ret = -1;
Path containerLogDir;
try {
Map<Path, List<String>> localResources = getLocalizedResources();
final String user = container.getUser();
// /// Variable expansion
// Before the container script gets written out.
List<String> newCmds = new ArrayList<String>(command.size());
String appIdStr = app.getAppId().toString();
String relativeContainerLogDir = ContainerLaunch
.getRelativeContainerLogDir(appIdStr, containerIdStr);
containerLogDir =
dirsHandler.getLogPathForWrite(relativeContainerLogDir, false);
recordContainerLogDir(containerID, containerLogDir.toString());
for (String str : command) {
// TODO: Should we instead work via symlinks without this grammar?
newCmds.add(expandEnvironment(str, containerLogDir));
}
launchContext.setCommands(newCmds);
Map<String, String> environment = expandAllEnvironmentVars(
launchContext, containerLogDir);
// /// End of variable expansion
// Use this to track variables that are added to the environment by nm.
LinkedHashSet<String> nmEnvVars = new LinkedHashSet<String>();
FileContext lfs = FileContext.getLocalFSFileContext();
Path nmPrivateContainerScriptPath = dirsHandler.getLocalPathForWrite(
getContainerPrivateDir(appIdStr, containerIdStr) + Path.SEPARATOR
+ CONTAINER_SCRIPT);
Path nmPrivateTokensPath = dirsHandler.getLocalPathForWrite(
getContainerPrivateDir(appIdStr, containerIdStr) + Path.SEPARATOR
+ String.format(ContainerLocalizer.TOKEN_FILE_NAME_FMT,
containerIdStr));
Path nmPrivateClasspathJarDir = dirsHandler.getLocalPathForWrite(
getContainerPrivateDir(appIdStr, containerIdStr));
// Select the working directory for the container
Path containerWorkDir = deriveContainerWorkDir();
recordContainerWorkDir(containerID, containerWorkDir.toString());
String pidFileSubpath = getPidFileSubpath(appIdStr, containerIdStr);
// pid file should be in nm private dir so that it is not
// accessible by users
pidFilePath = dirsHandler.getLocalPathForWrite(pidFileSubpath);
List<String> localDirs = dirsHandler.getLocalDirs();
List<String> localDirsForRead = dirsHandler.getLocalDirsForRead();
List<String> logDirs = dirsHandler.getLogDirs();
List<String> filecacheDirs = getNMFilecacheDirs(localDirsForRead);
List<String> userLocalDirs = getUserLocalDirs(localDirs);
List<String> containerLocalDirs = getContainerLocalDirs(localDirs);
List<String> containerLogDirs = getContainerLogDirs(logDirs);
List<String> userFilecacheDirs = getUserFilecacheDirs(localDirsForRead);
List<String> applicationLocalDirs = getApplicationLocalDirs(localDirs,
appIdStr);
if (!dirsHandler.areDisksHealthy()) {
ret = ContainerExitStatus.DISKS_FAILED;
throw new IOException("Most of the disks failed. "
+ dirsHandler.getDisksHealthReport(false));
}
List<Path> appDirs = new ArrayList<Path>(localDirs.size());
for (String localDir : localDirs) {
Path usersdir = new Path(localDir, ContainerLocalizer.USERCACHE);
Path userdir = new Path(usersdir, user);
Path appsdir = new Path(userdir, ContainerLocalizer.APPCACHE);
appDirs.add(new Path(appsdir, appIdStr));
}
// Set the token location too.
addToEnvMap(environment, nmEnvVars,
ApplicationConstants.CONTAINER_TOKEN_FILE_ENV_NAME,
new Path(containerWorkDir,
FINAL_CONTAINER_TOKENS_FILE).toUri().getPath());
// /// Write out the container-script in the nmPrivate space.
try (DataOutputStream containerScriptOutStream =
lfs.create(nmPrivateContainerScriptPath,
EnumSet.of(CREATE, OVERWRITE))) {
// Sanitize the container's environment
sanitizeEnv(environment, containerWorkDir, appDirs, userLocalDirs,
containerLogDirs, localResources, nmPrivateClasspathJarDir,
nmEnvVars);
prepareContainer(localResources, containerLocalDirs);
// Write out the environment
exec.writeLaunchEnv(containerScriptOutStream, environment,
localResources, launchContext.getCommands(),
containerLogDir, user, nmEnvVars);
}
// /// End of writing out container-script
// /// Write out the container-tokens in the nmPrivate space.
try (DataOutputStream tokensOutStream =
lfs.create(nmPrivateTokensPath, EnumSet.of(CREATE, OVERWRITE))) {
Credentials creds = container.getCredentials();
creds.writeTokenStorageToStream(tokensOutStream);
}
// /// End of writing out container-tokens
//启动容器
exec.activateContainer(containerID, pidFilePath);
ret = exec.launchContainer(container, nmPrivateContainerScriptPath,
nmPrivateTokensPath, user, appIdStr, containerWorkDir,
localDirs, logDirs);
} catch (ConfigurationException e) {
LOG.error("Failed to launch container due to configuration error.", e);
dispatcher.getEventHandler().handle(new ContainerExitEvent(
containerID, ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, ret,
e.getMessage()));
// Mark the node as unhealthy
context.getNodeStatusUpdater().reportException(e);
return ret;
} catch (Throwable e) {
LOG.warn("Failed to launch container.", e);
dispatcher.getEventHandler().handle(new ContainerExitEvent(
containerID, ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, ret,
e.getMessage()));
return ret;
} finally {
setContainerCompletedStatus(ret);
}
handleContainerExitCode(ret, containerLogDir);
return ret;
}
实际上,启动容器就是执行脚本launch_container.sh。该脚本中,通过命令执行appMaster、MapTask、ReduceTask等任务。例如appMaster执行流程如下:
执行shell脚本的方式有三种。ContainerExecutor是一个接口,有三种实现方式。分别是
- DefaultContainerExecutor:直接构建执行launch_container.sh的shell命令,通过java执行该命令。
- LinuxContainerExecutor:使用hadoop自带的container-executor执行launch_container.sh,它支持cgroup,并修改执行命令的用户。
- DockerContainerExecutor:使用docker执行launch_container.sh。
6.1 DefaultContainerExecutor启动容器
直接执行launch_container.sh脚本,类似于sh launch_container.sh
命令:
shExec = buildCommandExecutor(sb.getWrapperScriptPath().toString(),
containerIdStr, user, pidFile,
new File(containerWorkDir.toUri().getPath()),
container.getLaunchContext().getEnvironment());
if (isContainerActive(containerId)) {
shExec.execute();
}
6.2 LinuxContainerExecutor启动容器
在构建启动命令时,通过hadoop安装包中的container-executor构建启动命令:
protected String getContainerExecutorExecutablePath(Configuration conf) {
String yarnHomeEnvVar =
System.getenv(ApplicationConstants.Environment.HADOOP_YARN_HOME.key());
File hadoopBin = new File(yarnHomeEnvVar, "bin");
String defaultPath =
new File(hadoopBin, "container-executor").getAbsolutePath();
return null == conf
? defaultPath
: conf.get(YarnConfiguration.NM_LINUX_CONTAINER_EXECUTOR_PATH, defaultPath);
}
该命令由
hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/container-executor.c编译而成。
container-executor会通过fork创建子进程,通过execlp执行launch_container.sh脚本:
int launch_container_as_user(const char *user, const char *app_id,
const char *container_id, const char *work_dir,
const char *script_name, const char *cred_file,
const char* pid_file, char* const* local_dirs,
char* const* log_dirs, const char *resources_key,
char* const* resources_values) {
int exit_code = -1;
char *script_file_dest = NULL;
char *cred_file_dest = NULL;
char *exit_code_file = NULL;
exit_code_file = get_exit_code_file(pid_file);
if (NULL == exit_code_file) {
exit_code = OUT_OF_MEMORY;
goto cleanup;
}
int container_file_source =-1;
int cred_file_source = -1;
exit_code = create_script_paths(
work_dir, script_name, cred_file, &script_file_dest, &cred_file_dest,
&container_file_source, &cred_file_source);
if (exit_code != 0) {
fprintf(ERRORFILE, "Could not create local files and directories");
fflush(ERRORFILE);
goto cleanup;
}
pid_t child_pid = fork();
//省略
// cgroups-based resource enforcement
if (resources_key != NULL && ! strcmp(resources_key, "cgroups")) {
// write pid to cgroups
char* const* cgroup_ptr;
for (cgroup_ptr = resources_values; cgroup_ptr != NULL &&
*cgroup_ptr != NULL; ++cgroup_ptr) {
if (strcmp(*cgroup_ptr, "none") != 0 &&
write_pid_to_cgroup_as_root(*cgroup_ptr, pid) != 0) {
exit_code = WRITE_CGROUP_FAILED;
goto cleanup;
}
}
}
//省略
if (execlp(script_file_dest, script_file_dest, NULL) != 0) {
fprintf(LOGFILE, "Couldn't execute the container launch file %s - %s",
script_file_dest, strerror(errno));
exit_code = UNABLE_TO_EXECUTE_CONTAINER_SCRIPT;
goto cleanup;
}
exit_code = 0;
//省略
}
它支持修改启动命令的用户:
int change_user(uid_t user, gid_t group) {
if (user == getuid() && user == geteuid() &&
group == getgid() && group == getegid()) {
return 0;
}
if (seteuid(0) != 0) {
fprintf(LOGFILE, "unable to reacquire root - %s\n", strerror(errno));
fprintf(LOGFILE, "Real: %d:%d; Effective: %d:%d\n",
getuid(), getgid(), geteuid(), getegid());
return SETUID_OPER_FAILED;
}
if (setgid(group) != 0) {
fprintf(LOGFILE, "unable to set group to %d - %s\n", group,
strerror(errno));
fprintf(LOGFILE, "Real: %d:%d; Effective: %d:%d\n",
getuid(), getgid(), geteuid(), getegid());
return SETUID_OPER_FAILED;
}
if (setuid(user) != 0) {
fprintf(LOGFILE, "unable to set user to %d - %s\n", user, strerror(errno));
fprintf(LOGFILE, "Real: %d:%d; Effective: %d:%d\n",
getuid(), getgid(), geteuid(), getegid());
return SETUID_OPER_FAILED;
}
return 0;
}
还支持cgroup进行资源隔离:
static int write_pid_to_cgroup_as_root(const char* cgroup_file, pid_t pid) {
uid_t user = geteuid();
gid_t group = getegid();
if (change_effective_user(0, 0) != 0) {
return -1;
}
// open
int cgroup_fd = open(cgroup_file, O_WRONLY | O_APPEND, 0);
if (cgroup_fd == -1) {
fprintf(LOGFILE, "Can't open file %s as node manager - %s\n", cgroup_file,
strerror(errno));
return -1;
}
// write pid
char pid_buf[21];
snprintf(pid_buf, sizeof(pid_buf), "%" PRId64, (int64_t)pid);
ssize_t written = write(cgroup_fd, pid_buf, strlen(pid_buf));
close(cgroup_fd);
if (written == -1) {
fprintf(LOGFILE, "Failed to write pid to file %s - %s\n",
cgroup_file, strerror(errno));
return -1;
}
// Revert back to the calling user.
if (change_effective_user(user, group)) {
return -1;
}
return 0;
}
6.3 DockerContainerExecutor启动容器
如下,DockerContainerExecutor直接构建docker的启动命令,执行launch_container.sh脚本:
public static final String NM_DEFAULT_DOCKER_CONTAINER_EXECUTOR_EXEC_NAME =
"/usr/bin/docker";
String dockerExecutor = getConf().get(YarnConfiguration.NM_DOCKER_CONTAINER_EXECUTOR_EXEC_NAME,
YarnConfiguration.NM_DEFAULT_DOCKER_CONTAINER_EXECUTOR_EXEC_NAME);
StringBuilder commands = new StringBuilder();
String commandStr = commands.append(dockerExecutor)
.append(" ")
.append("run")
.append(" ")
.append("--rm --net=host")
.append(" ")
.append(" --name " + containerIdStr)
.append(localDirMount)
.append(logDirMount)
.append(containerWorkDirMount)
.append(" ")
.append(containerImageName)
.toString();
String dockerPidScript = "`" + dockerExecutor + " inspect --format {{.State.Pid}} " + containerIdStr + "`";