1. 背景
presto将event log存储到内存之中。以下两种情况下无法通过web获取SQL历史查询信息:
- 重启presto coordinator,内存清空,无法通过Web查询历史SQL执行信息。
- presto内存只保存100条最新的event log,100条之前的SQL执行记录会被清除。
这大大增加了业务检查SQL执行情况的门槛。为此,需要将Presto的event log持久化。
2. Presto持久化入口
在PrestoServer启动时,会调用PluginManager的loadPlugins方法:
injector.getInstance(PluginManager.class).loadPlugins();
loadPlugins方法加载预定义好的Plugin类,调用它的installPlugin方法:
private void loadPlugin(PluginClassLoader pluginClassLoader)
{
ServiceLoader<Plugin> serviceLoader = ServiceLoader.load(Plugin.class, pluginClassLoader);
List<Plugin> plugins = ImmutableList.copyOf(serviceLoader);
checkState(!plugins.isEmpty(), "No service providers of type %s", Plugin.class.getName());
for (Plugin plugin : plugins) {
log.info("Installing %s", plugin.getClass().getName());
installPlugin(plugin, pluginClassLoader::duplicate);
}
}
installPlugin最后会执行installPluginInternal方法将plugin中的EventListenerFactory对象加入到EventListenerManager对象中:
private void installPluginInternal(Plugin plugin, Supplier<ClassLoader> duplicatePluginClassLoaderFactory)
{
//Plugin
//新增
for (EventListenerFactory eventListenerFactory : plugin.getEventListenerFactories()) {
log.info("Registering event listener %s", eventListenerFactory.getName());
eventListenerManager.addEventListenerFactory(eventListenerFactory);
}
}
在PrestoServer启动时,就会EventListenerFactory:
injector.getInstance(SessionPropertyDefaults.class).loadConfigurationManager();
injector.getInstance(ResourceGroupManager.class).loadConfigurationManager();
injector.getInstance(AccessControlManager.class).loadSystemAccessControl();
injector.getInstance(PasswordAuthenticatorManager.class).loadPasswordAuthenticator();
injector.getInstance(EventListenerManager.class).loadEventListeners();
injector.getInstance(GroupProviderManager.class).loadConfiguredGroupProvider();
最终,通过EventListenerFactory工厂创建EventListener对象:
private EventListener createEventListener(File configFile)
{
log.info("-- Loading event listener %s --", configFile);
configFile = configFile.getAbsoluteFile();
Map<String, String> properties = loadEventListenerProperties(configFile);
String name = properties.remove(EVENT_LISTENER_NAME_PROPERTY);
checkArgument(!isNullOrEmpty(name), "EventListener plugin configuration for %s does not contain %s", configFile, EVENT_LISTENER_NAME_PROPERTY);
EventListenerFactory eventListenerFactory = eventListenerFactories.get(name);
EventListener eventListener = eventListenerFactory.create(properties);
log.info("-- Loaded event listener %s --", configFile);
return eventListener;
}
在Presto coordinator执行SQL的进度为:创建query、split完成、query执行结束,就会调用EventListenerManager的queryCompleted、queryCreated、splitCompleted方法。
以queryCompleted方法为例,EventListenerManager最后会调用EventListener对象中的queryCompleted方法:
public void queryCompleted(String queryInfoJson, String queryId)
{
for (EventListener listener : configuredEventListeners.get()) {
try {
listener.queryCompleted(queryInfoJson, queryId);
}
catch (Throwable e) {
log.warn(e, "Failed to publish QueryCompletedEvent for query %s", queryId);
}
}
}
Presto的EventListener接口,则提供了相应接口:
public interface EventListener
{
default void queryCreated(QueryCreatedEvent queryCreatedEvent)
{
}
default void queryCompleted(QueryCompletedEvent queryCompletedEvent)
{
}
default void queryCompleted(String queryInfoJson, String queryId)
{
}
default void splitCompleted(SplitCompletedEvent splitCompletedEvent)
{
}
}
因此,为了打印event log和血缘信息。需要额外定义Plugin,Plugin中分别创建血缘落盘相关的EventListenerFactory和event log落盘相关的EventListenerFactory。在这两个EventListenerFactory中定义对应的落盘逻辑。
3. 创建Plugin
如下,创建EventLogPlugin,分别包含EventLogListenerFactory和AuditLogListenerFactory两个工厂类:
public class EventLogPlugin
implements Plugin
{
@Override
public Iterable<EventListenerFactory> getEventListenerFactories()
{
return ImmutableList.of(new EventLogListenerFactory(), new AuditLogListenerFactory());
}
}
4. 实现event log落盘
4.1 eventlog代码实现
如下,EventLogListenerFactory负责创建EventListener:
@Override
public EventListener create(Map<String, String> config)
{
Bootstrap app = new Bootstrap(
new HttpServerModule(),
new NodeModule(),
new JsonModule(),
new JaxrsModule(),
new EventModule(),
new EventLogModule());
Injector injector = app
.strictConfig()
.doNotInitializeLogging()
.setRequiredConfigurationProperties(config)
.initialize();
return new EventLogListener(injector.getInstance(EventLogProcessor.class));
}
EventListener中定义queryCompleted执行方法:
@Override
public void queryCompleted(String queryInfoJson, String queryId)
{
if (eventLogProcessor.isEnableEventLog()) {
eventLogProcessor.putEventLog(queryInfoJson, queryId);
}
}
定义EventLogProcessor,BlockingQueue负责存放落盘请求,线程池负责异步落盘:
private final BlockingQueue<Pair<String, String>> eventLogQueue = new ArrayBlockingQueue<>(100);
private static final ExecutorService WRITE_EVENTLOG_THREAD_POOL = Executors.newFixedThreadPool(3);
如下,EventLogProcessor.putEventLog负责将落盘请求放入到BlockingQueue中:
public void putEventLog(String eventLog, String queryId)
{
try {
eventLogQueue.put(new Pair<>(queryId, eventLog));
}
catch (InterruptedException e) {
logger.error(e, e.getMessage());
}
}
在EventLogProcessor中,通过异步线程池并发落盘:
WRITE_EVENTLOG_THREAD_POOL.execute(() -> {
while (true) {
try {
writeEventLog(eventLogQueue.take());
}
catch (InterruptedException e) {
logger.error(e, e.getMessage());
}
}
});
}
落盘时,先根据配置获取要写入本地还是HDFS:
conf = new Configuration(false);
conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());
if (this.enableEventLog) {
requireNonNull(this.dir, "event-log.dir not set");
try {
// default schema: hdfs
if (dir.startsWith("file")) {
initLocalFileSystem();
}
else {
initHDFSFileSystem();
}
}
根据创建的filesystem通过输出流进行写入:
public void writeEventLog(Pair<String, String> pair)
{
try {
Path path = new Path(dir + "/" + pair.getKey().substring(0, 8) + "/" + pair.getKey());
FSDataOutputStream outputStream = fileSystem.create(path);
outputStream.write(pair.getValue().getBytes(StandardCharsets.UTF_8));
outputStream.close();
}
catch (IOException e) {
logger.error(e, e.getMessage());
}
}
4.2 历史数据定时清理
为了避免产生过多小文件。通过定时脚本,每天5点清理30天前当天的数据(如今天是10月31日,清理10月1日的数据):
0 0 5 * * (sh daily_clear_presto_eventlog.sh $(date -d '30 day ago' '+%Y%m') >/dev/null 2>&1
方案完成。
5. 血缘落盘
对于EventLog,其数据量比较大,需要存储到HDFS中;对于血缘信息,通过落盘到本地,然后进行Kafka采集,本地就不需要保留过多的数据,这样就不需要上传到HDFS中。
创建AuditLogEventListener及其AuditLogEventListenerFactory。AuditLogEventListener负责解析QueryCompletedEvent中的元数据信息:
private static final String LINEAGE_INFO = "LINEAGE_LOG";
private Logger ROOT_LOG = LoggerFactory.getLogger("rootLogger");
private Logger LINEAGE_INFO_LOG = LoggerFactory.getLogger(LINEAGE_INFO);
@Override
public void queryCompleted(QueryCompletedEvent queryCompletedEvent)
{
try {
PrestoSchema prestoSchema = getLineageInfo(queryCompletedEvent);
if (prestoSchema != null) {
writeAuditLog(lineageInfoToJson(prestoSchema), LINEAGE_INFO_LOG);
}
}
catch (Throwable e) {
ROOT_LOG.error("", e);
}
}
writeAuditLog就是使用logger对血缘信息进行rotate:
private void writeAuditLog(String log, Logger logger)
{
if (log == null || log.isEmpty()) {
return;
}
logger.info(log);
}
getLineageInfo方法负责解析presto信息:
private PrestoSchema getLineageInfo(QueryCompletedEvent queryCompletedEvent)
{
if (queryCompletedEvent == null) {
return null;
}
QueryContext queryContext = queryCompletedEvent.getContext();
QueryMetadata queryMetadata = queryCompletedEvent.getMetadata();
Optional<QueryFailureInfo> queryFailureInfoOptional = queryCompletedEvent.getFailureInfo();
QueryStatistics queryStatistics = queryCompletedEvent.getStatistics();
PrestoSchema prestoSchema = new PrestoSchema();
// QueryTag
prestoSchema.setQueryTag(PrestoSchema.SQL);
// QueryId
prestoSchema.setQueryId(queryMetadata.getQueryId());
// QueryEngine
prestoSchema.setQueryEngine(PrestoSchema.PRESTO_ENGINE);
// Query
prestoSchema.setQuerySubmitMethod(PrestoSchema.Method.PrestoServer.name());
// QueryEngineInstance => Catalog
if (queryContext.getCatalog().isPresent()) {
prestoSchema.setQueryEngineInstance(queryContext.getCatalog().get());
}
// QueryUser
String sessionUser = queryContext.getUser();
Optional<String> kinitUserOptional = queryContext.getPrincipal();
String kinitUser = "";
if (kinitUserOptional.isPresent()) {
kinitUser = kinitUserOptional.get();
if (kinitUser.indexOf("/") > 0) {
kinitUser = kinitUser.substring(0, kinitUser.indexOf("/"));
}
if (kinitUser.indexOf("@") > 0) {
kinitUser = kinitUser.substring(0, kinitUser.indexOf("@"));
}
}
prestoSchema.setQueryUser(kinitUser + "," + sessionUser);
// QueryCurrentDb
if (queryContext.getSchema().isPresent()) {
prestoSchema.setCurrentDB(queryContext.getSchema().get());
}
// QuerySql
prestoSchema.setQuerySQL(queryMetadata.getQuery());
// QueryExceptionLog QueryTaskExceptionLog
if (queryFailureInfoOptional.isPresent()) {
ErrorCode errorCode = queryFailureInfoOptional.get().getErrorCode();
StringBuilder exceptionLogBuilder = new StringBuilder();
exceptionLogBuilder.append(errorCode.getType().toString()).append(",").append(errorCode.getName())
.append(",").append(queryFailureInfoOptional.get().getFailureType().get()).append(",")
.append(queryFailureInfoOptional.get().getFailureMessage().get());
prestoSchema.setQueryExceptionLog(exceptionLogBuilder.toString());
if (queryFailureInfoOptional.get().getFailureTask().isPresent()) {
prestoSchema.setQueryTaskExceptionLog(queryFailureInfoOptional.get().getFailureTask().get());
}
}
// QueryStatus
if (queryFailureInfoOptional.isPresent()) {
ErrorCode errorCode = queryFailureInfoOptional.get().getErrorCode();
if (errorCode.getCode() == StandardErrorCode.USER_CANCELED.toErrorCode().getCode()) {
prestoSchema.setQueryStatus(PrestoSchema.QUERY_CNACELED);
}
else {
prestoSchema.setQueryStatus(PrestoSchema.QUERY_STATUS_FAILED);
}
}
else {
prestoSchema.setQueryStatus(PrestoSchema.QUERY_STATUS_SUCCESS);
}
// QueryStartTime QueryFinishTime
prestoSchema.setQueryStartTime(queryCompletedEvent.getCreateTime().toEpochMilli());
prestoSchema.setQueryFinishTime(queryCompletedEvent.getEndTime().toEpochMilli());
// QueryDuration
prestoSchema.setQueryDuration((prestoSchema.getQueryFinishTime() - prestoSchema.getQueryStartTime()) / 1000d);
// QueryInputSize
prestoSchema.setQueryInputSize(queryStatistics.getTotalBytes());
// QueryOutputSize
prestoSchema.setQueryOutputSize(queryStatistics.getOutputBytes() + queryStatistics.getWrittenBytes());
// QueryCpuTime
prestoSchema.setQueryCpuTime(queryStatistics.getCpuTime().getSeconds() + queryStatistics.getCpuTime().getNano() / 1000000000d);
// QueryMemory
prestoSchema.setQueryMemory(queryStatistics.getCumulativeMemory() / (1024 * 1024 * 1000));
return prestoSchema;
}
方案完成。