Presto EventLog及血缘信息异步落盘实践
  GQ7psP7UJw7k 2023年11月30日 14 0

1. 背景

presto将event log存储到内存之中。以下两种情况下无法通过web获取SQL历史查询信息:

  1. 重启presto coordinator,内存清空,无法通过Web查询历史SQL执行信息。
  2. presto内存只保存100条最新的event log,100条之前的SQL执行记录会被清除。

这大大增加了业务检查SQL执行情况的门槛。为此,需要将Presto的event log持久化。

2. Presto持久化入口

在PrestoServer启动时,会调用PluginManager的loadPlugins方法:

injector.getInstance(PluginManager.class).loadPlugins();

loadPlugins方法加载预定义好的Plugin类,调用它的installPlugin方法:

private void loadPlugin(PluginClassLoader pluginClassLoader)
    {
        ServiceLoader<Plugin> serviceLoader = ServiceLoader.load(Plugin.class, pluginClassLoader);
        List<Plugin> plugins = ImmutableList.copyOf(serviceLoader);
        checkState(!plugins.isEmpty(), "No service providers of type %s", Plugin.class.getName());
        for (Plugin plugin : plugins) {
            log.info("Installing %s", plugin.getClass().getName());
            installPlugin(plugin, pluginClassLoader::duplicate);
        }
    }

installPlugin最后会执行installPluginInternal方法将plugin中的EventListenerFactory对象加入到EventListenerManager对象中:

private void installPluginInternal(Plugin plugin, Supplier<ClassLoader> duplicatePluginClassLoaderFactory)
    {
				//Plugin
        //新增
        for (EventListenerFactory eventListenerFactory : plugin.getEventListenerFactories()) {
            log.info("Registering event listener %s", eventListenerFactory.getName());
            eventListenerManager.addEventListenerFactory(eventListenerFactory);
        }
    }

在PrestoServer启动时,就会EventListenerFactory:

        		injector.getInstance(SessionPropertyDefaults.class).loadConfigurationManager();
            injector.getInstance(ResourceGroupManager.class).loadConfigurationManager();
            injector.getInstance(AccessControlManager.class).loadSystemAccessControl();
            injector.getInstance(PasswordAuthenticatorManager.class).loadPasswordAuthenticator();
            injector.getInstance(EventListenerManager.class).loadEventListeners();
            injector.getInstance(GroupProviderManager.class).loadConfiguredGroupProvider();

最终,通过EventListenerFactory工厂创建EventListener对象:

private EventListener createEventListener(File configFile)
    {
        log.info("-- Loading event listener %s --", configFile);
        configFile = configFile.getAbsoluteFile();
        Map<String, String> properties = loadEventListenerProperties(configFile);
        String name = properties.remove(EVENT_LISTENER_NAME_PROPERTY);
        checkArgument(!isNullOrEmpty(name), "EventListener plugin configuration for %s does not contain %s", configFile, EVENT_LISTENER_NAME_PROPERTY);
        EventListenerFactory eventListenerFactory = eventListenerFactories.get(name);
        EventListener eventListener = eventListenerFactory.create(properties);
        log.info("-- Loaded event listener %s --", configFile);
        return eventListener;
    }

在Presto coordinator执行SQL的进度为:创建query、split完成、query执行结束,就会调用EventListenerManager的queryCompleted、queryCreated、splitCompleted方法。

以queryCompleted方法为例,EventListenerManager最后会调用EventListener对象中的queryCompleted方法:

public void queryCompleted(String queryInfoJson, String queryId)
    {
        for (EventListener listener : configuredEventListeners.get()) {
            try {
                listener.queryCompleted(queryInfoJson, queryId);
            }
            catch (Throwable e) {
                log.warn(e, "Failed to publish QueryCompletedEvent for query %s", queryId);
            }
        }
    }

Presto的EventListener接口,则提供了相应接口:

public interface EventListener
{
    default void queryCreated(QueryCreatedEvent queryCreatedEvent)
    {
    }

    default void queryCompleted(QueryCompletedEvent queryCompletedEvent)
    {
    }

    default void queryCompleted(String queryInfoJson, String queryId)
    {
    }

    default void splitCompleted(SplitCompletedEvent splitCompletedEvent)
    {
    }
}

因此,为了打印event log和血缘信息。需要额外定义Plugin,Plugin中分别创建血缘落盘相关的EventListenerFactory和event log落盘相关的EventListenerFactory。在这两个EventListenerFactory中定义对应的落盘逻辑。

3. 创建Plugin

如下,创建EventLogPlugin,分别包含EventLogListenerFactory和AuditLogListenerFactory两个工厂类:

public class EventLogPlugin
        implements Plugin
{
    @Override
    public Iterable<EventListenerFactory> getEventListenerFactories()
    {
        return ImmutableList.of(new EventLogListenerFactory(), new AuditLogListenerFactory());
    }
}

4. 实现event log落盘

4.1 eventlog代码实现

如下,EventLogListenerFactory负责创建EventListener:

@Override
    public EventListener create(Map<String, String> config)
    {
        Bootstrap app = new Bootstrap(
                new HttpServerModule(),
                new NodeModule(),
                new JsonModule(),
                new JaxrsModule(),
                new EventModule(),
                new EventLogModule());
        Injector injector = app
                .strictConfig()
                .doNotInitializeLogging()
                .setRequiredConfigurationProperties(config)
                .initialize();
        return new EventLogListener(injector.getInstance(EventLogProcessor.class));
    }

EventListener中定义queryCompleted执行方法:

@Override
    public void queryCompleted(String queryInfoJson, String queryId)
    {
        if (eventLogProcessor.isEnableEventLog()) {
            eventLogProcessor.putEventLog(queryInfoJson, queryId);
        }
    }

定义EventLogProcessor,BlockingQueue负责存放落盘请求,线程池负责异步落盘:

private final BlockingQueue<Pair<String, String>> eventLogQueue = new ArrayBlockingQueue<>(100);
private static final ExecutorService WRITE_EVENTLOG_THREAD_POOL = Executors.newFixedThreadPool(3);

如下,EventLogProcessor.putEventLog负责将落盘请求放入到BlockingQueue中:

public void putEventLog(String eventLog, String queryId)
    {
        try {
            eventLogQueue.put(new Pair<>(queryId, eventLog));
        }
        catch (InterruptedException e) {
            logger.error(e, e.getMessage());
        }
    }

在EventLogProcessor中,通过异步线程池并发落盘:

WRITE_EVENTLOG_THREAD_POOL.execute(() -> {
            while (true) {
                try {
                    writeEventLog(eventLogQueue.take());
                }
                catch (InterruptedException e) {
                    logger.error(e, e.getMessage());
                }
            }
        });
    }

落盘时,先根据配置获取要写入本地还是HDFS:

      	conf = new Configuration(false);
        conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
        conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());

        if (this.enableEventLog) {
            requireNonNull(this.dir, "event-log.dir not set");
            try {
                // default schema: hdfs
                if (dir.startsWith("file")) {
                    initLocalFileSystem();
                }
                else {
                    initHDFSFileSystem();
                }
            }

根据创建的filesystem通过输出流进行写入:

public void writeEventLog(Pair<String, String> pair)
    {
        try {
            Path path = new Path(dir + "/" + pair.getKey().substring(0, 8) + "/" + pair.getKey());
            FSDataOutputStream outputStream = fileSystem.create(path);
            outputStream.write(pair.getValue().getBytes(StandardCharsets.UTF_8));
            outputStream.close();
        }
        catch (IOException e) {
            logger.error(e, e.getMessage());
        }
    }

4.2 历史数据定时清理

为了避免产生过多小文件。通过定时脚本,每天5点清理30天前当天的数据(如今天是10月31日,清理10月1日的数据):

0 0 5 * * (sh daily_clear_presto_eventlog.sh $(date -d '30 day ago' '+%Y%m') >/dev/null 2>&1

方案完成。

5. 血缘落盘

对于EventLog,其数据量比较大,需要存储到HDFS中;对于血缘信息,通过落盘到本地,然后进行Kafka采集,本地就不需要保留过多的数据,这样就不需要上传到HDFS中。

创建AuditLogEventListener及其AuditLogEventListenerFactory。AuditLogEventListener负责解析QueryCompletedEvent中的元数据信息:

  	private static final String LINEAGE_INFO = "LINEAGE_LOG";
    private Logger ROOT_LOG = LoggerFactory.getLogger("rootLogger");
    private Logger LINEAGE_INFO_LOG = LoggerFactory.getLogger(LINEAGE_INFO);

@Override
    public void queryCompleted(QueryCompletedEvent queryCompletedEvent)
    {
        try {
            PrestoSchema prestoSchema = getLineageInfo(queryCompletedEvent);
            if (prestoSchema != null) {
                writeAuditLog(lineageInfoToJson(prestoSchema), LINEAGE_INFO_LOG);
            }
        }
        catch (Throwable e) {
            ROOT_LOG.error("", e);
        }
    }

writeAuditLog就是使用logger对血缘信息进行rotate:

    private void writeAuditLog(String log, Logger logger)
    {
        if (log == null || log.isEmpty()) {
            return;
        }
        logger.info(log);
    }

getLineageInfo方法负责解析presto信息:

private PrestoSchema getLineageInfo(QueryCompletedEvent queryCompletedEvent)
    {
        if (queryCompletedEvent == null) {
            return null;
        }

        QueryContext queryContext = queryCompletedEvent.getContext();
        QueryMetadata queryMetadata = queryCompletedEvent.getMetadata();
        Optional<QueryFailureInfo> queryFailureInfoOptional = queryCompletedEvent.getFailureInfo();
        QueryStatistics queryStatistics = queryCompletedEvent.getStatistics();

        PrestoSchema prestoSchema = new PrestoSchema();

        // QueryTag
        prestoSchema.setQueryTag(PrestoSchema.SQL);

        // QueryId
        prestoSchema.setQueryId(queryMetadata.getQueryId());

        // QueryEngine
        prestoSchema.setQueryEngine(PrestoSchema.PRESTO_ENGINE);

        // Query
        prestoSchema.setQuerySubmitMethod(PrestoSchema.Method.PrestoServer.name());

        // QueryEngineInstance => Catalog
        if (queryContext.getCatalog().isPresent()) {
            prestoSchema.setQueryEngineInstance(queryContext.getCatalog().get());
        }

        // QueryUser
        String sessionUser = queryContext.getUser();
        Optional<String> kinitUserOptional = queryContext.getPrincipal();
        String kinitUser = "";
        if (kinitUserOptional.isPresent()) {
            kinitUser = kinitUserOptional.get();
            if (kinitUser.indexOf("/") > 0) {
                kinitUser = kinitUser.substring(0, kinitUser.indexOf("/"));
            }
            if (kinitUser.indexOf("@") > 0) {
                kinitUser = kinitUser.substring(0, kinitUser.indexOf("@"));
            }
        }
        prestoSchema.setQueryUser(kinitUser + "," + sessionUser);

        // QueryCurrentDb
        if (queryContext.getSchema().isPresent()) {
            prestoSchema.setCurrentDB(queryContext.getSchema().get());
        }

        // QuerySql
        prestoSchema.setQuerySQL(queryMetadata.getQuery());

        // QueryExceptionLog QueryTaskExceptionLog
        if (queryFailureInfoOptional.isPresent()) {
            ErrorCode errorCode = queryFailureInfoOptional.get().getErrorCode();
            StringBuilder exceptionLogBuilder = new StringBuilder();
            exceptionLogBuilder.append(errorCode.getType().toString()).append(",").append(errorCode.getName())
                    .append(",").append(queryFailureInfoOptional.get().getFailureType().get()).append(",")
                    .append(queryFailureInfoOptional.get().getFailureMessage().get());
            prestoSchema.setQueryExceptionLog(exceptionLogBuilder.toString());
            if (queryFailureInfoOptional.get().getFailureTask().isPresent()) {
                prestoSchema.setQueryTaskExceptionLog(queryFailureInfoOptional.get().getFailureTask().get());
            }
        }

        // QueryStatus
        if (queryFailureInfoOptional.isPresent()) {
            ErrorCode errorCode = queryFailureInfoOptional.get().getErrorCode();
            if (errorCode.getCode() == StandardErrorCode.USER_CANCELED.toErrorCode().getCode()) {
                prestoSchema.setQueryStatus(PrestoSchema.QUERY_CNACELED);
            }
            else {
                prestoSchema.setQueryStatus(PrestoSchema.QUERY_STATUS_FAILED);
            }
        }
        else {
            prestoSchema.setQueryStatus(PrestoSchema.QUERY_STATUS_SUCCESS);
        }

        // QueryStartTime QueryFinishTime
        prestoSchema.setQueryStartTime(queryCompletedEvent.getCreateTime().toEpochMilli());
        prestoSchema.setQueryFinishTime(queryCompletedEvent.getEndTime().toEpochMilli());

        // QueryDuration
        prestoSchema.setQueryDuration((prestoSchema.getQueryFinishTime() - prestoSchema.getQueryStartTime()) / 1000d);

        // QueryInputSize
        prestoSchema.setQueryInputSize(queryStatistics.getTotalBytes());

        // QueryOutputSize
        prestoSchema.setQueryOutputSize(queryStatistics.getOutputBytes() + queryStatistics.getWrittenBytes());

        // QueryCpuTime
        prestoSchema.setQueryCpuTime(queryStatistics.getCpuTime().getSeconds() + queryStatistics.getCpuTime().getNano() / 1000000000d);

        // QueryMemory
        prestoSchema.setQueryMemory(queryStatistics.getCumulativeMemory() / (1024 * 1024 * 1000));

        return prestoSchema;
    }

方案完成。

【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月30日 0

暂无评论

推荐阅读
GQ7psP7UJw7k