Apache Druid 系列文章
1、Druid(Imply-3.0.4)介绍及部署(centos6.10)、验证 2、Druid的入门示例(使用三种不同的方式摄入数据和提交任务) 3、Druid的load data 示例(实时kafka数据和离线-本地或hdfs数据) 4、java操作druid api 5、Druid配置文件详细介绍以及示例 6、Druid的Roll up详细介绍及示例
(文章目录)
本文主要介绍了Druid的配置文件内容,以及以示例说明配置文件的使用。 本文依赖是Druid和kafka环境具备。 本文分为2个部分,即配置文件说明和数据查询示例。
一、摄取配置文件结构说明
1、整体结构
摄取配置文件主要由以下几个部分组成
{
# 表示上传的方式,本地文件上传使用index,hdfs文件上传使用index_hadoop
"type":"index_hadoop",
# 用于设置数据的具体配置以及转换方式
"spec"{},
# (可选):用于配置一些运行参数,比如可以设置上传csv时候是否包含表头行
"context":{}
}
-
type:文件上传方式(index、index_hadoop、kafka)
-
spec dataSchema:数据解析模式 ioConfig:数据源 turningConfig:优化配置(分区规则、分区大小)
-
示例
{
// 文件上传方式
"type": "index",
"spec": {
// 数据解析模式
"dataSchema": {...},
// 摄取数据源
"ioConfig": {...},
// 摄取过程优化配置(可选)
"tuningConfig": {...}
}
}
2、dataSchema-数据解析模式
数据解析模式,主要为针对数据文件,定义了一系列规则。如:
- 数据源
- 数据解析规则
- 数据粒度规则配置
- 定义如何进行指标计算
1)、配置项说明
// 1、dataSource:数据源名称,用于设置上传数据之后的表名称。
"dataSource"="tableName"
// 2、parser:用于指定数据怎么被转化,转化为什么格式
// type:指定常规数据的格式,默认为string;如果保存hdfs上那么指定为hadoopyString
// parseSpec:用于指定数据转换格式
// format:指定上传文件格式,可以为csv,json,tsv,javascript、timeAndDims等
// timestampSpec:指定时间戳序列,包含column以及format两个参数,column必选,用于指定时间戳对应的列。format用于指定时间格式,可以使用iso、millis、posix、zuto,默认为auto。示例:
"timestampSpec":{"format":"auto","column":"start_time"}
// columns(可选):csv格式特有,用于指定源数据中的列名。用于配置源文件中应该包含的所有列的列名。示例:
"columns":["columns","column2","column3"]
// dimensionsSpec:配置维度数据,也就是将要在druid数据表中展现的列。dimensionExclusion可选,用于指定不需要的数据,默认为空;spatialDimensions可选,用于指定列的空间限制,默认为空。示例:
"dimensionsSpec":{
"dimensions":["page","language",{"type":"long","name":"countryNum" } ]
}
// 如果字段为String,那么直接列出,如果为其他类型如long/double等,需要使用{"type":"long","name":"字段名称" }单独列出。配置时间戳的列不需要在dimensions中列出。
//3、granularitySpec:指定如何划分segment以及数据的时间范围
// type:用来指定粒度类型的使用,默认为type=uniform,建议设置为uniform或arbitrary(尝试创建大小相等的字段)
// segmentGranularity(可选):指定每个segment包含的时间戳的范围。默认为day,用来确定每个segment包含的时间戳的范围,可以为"SECOND"、"MINUTE" 、"HOUR"、"DAY"、"DOW"、"DOY"、"WEEK"、"MONTH"、"QUARTER"、"YEAR"、"EPOCH"、"DECADE"、"CENTURY"、"MILLENNIUM"等。
// queryGranularity(可选):默认为None,允许查询的时间粒度,单位与segmentGranularity相同,如果为None那么允许以任意时间粒度进行查询。
// rollup(可选):是否使用预计算算法,默认为true,推荐true,比较快。
// intervals:用于指定上传时间限制时间段。只有时间段内的数据可以上传。批量数据导入需要设置/流式导入无需设置。示例:
"granularitySpec" : {
"type" : "uniform",
"segmentGranularity" : "DAY",
"queryGranularity" : { "type" : "none"},
"rollup" : "true",
"intervals" : [ "2017-11-15T00:00:00.000Z/2017-11-18T00:00:00.000Z" ]
}
//4、metricsSpec:包含了一系列的aggregators转换
//type可以为:count、longSum、doubleSum、doubleMin\doubleMax、longMin\longMax、doubleFirst\doubleLast、longFirst\longLast
//除count外其他都需要指定name和fieldName两个参数,name表示最后输出的,也就是在表中体现的名称,而fieldName则代表源数据中的列名。
//更多说明参考:http://druid.io/docs/0.10.1/querying/aggregations.html
//示例:
"metricsSpec":
[
{"type":"count","name":"count"},
{"type":"doubleSum","name":"added","fieldName":"added"},
{"type":"doubleSum","name":"deleted","fieldName":"deleted"},
{"type":"doubleSum","name":"delta","fieldName":"delta"}
]
2)、示例
//数据摄取模式
"dataSchema": {
// 数据源(表),数据源名称,用于设置上传数据之后的表名称
"dataSource": "testdata",
// 解析器,用于指定数据怎么被转化,转化为什么格式
"parser": {
// 解析字符串文本
"type": "String",
"parseSpec": {
// 字符串文本格式为JSON
"format": "json",
// 指定维度列名,维度与时间一致,导入时聚合
"dimensionsSpec": {
"dimensions": [
"city",
"platform"
]
},
// 指定时间戳的列,以及时间戳格式化方式
"timestampSpec": {
"format": "auto",
"column": "timestamp"
}
}
},
// 指标计算规则,指定如何划分segment以及数据的时间范围
"metricsSpec": [
{
//name表示列名
"name": "count",
"type": "count"
},
{
// 聚合计算后指标的列名
"name": "click",
// 聚合函数:count、longSum、doubleSum、longMin、doubleMin、doubleMax
"type": "longSum",
"fieldName": "click"
}
]
// 粒度规则,指定如何划分segment以及数据的时间范围
"granularitySpec": {
"type": "uniform",
// 按天来生成 segment (每天生成一个segment)
"segmentGranularity": "day",
// 查询的最小粒度(最小粒度为小时)
"queryGranularity": "hour",
// 加载原始数据的时间范围,批量数据导入需要设置/流式导入无需设置
"intervals": [
"2022-12-01/2022-12-31"
]
},
}
3、ioConfig配置-摄取数据源-
用于指定数据的来源以及存储位置,数据源配置主要指定
- type:用于指定源数据获取方式
- inputSpec:指定源数据路径
"ioConfig": {
"type": "index",
"inputSpec": {
// 本地文件 local/ HDFS使用 hadoop,当设置type为“granularity”时候,则需要根据时间戳使用路径格式将数据导入目录中。
"type": "local",
// 路径,支持批量上传数据,路径指定到文件夹即可
"baseDir": "/root/data/",
// 只过滤出来哪个/哪些文件
"filter": "test.json"
}
}
// 示例1- HDFS:
"ioConfig" : {
"type" : "hadoop",
"inputSpec" :
{
"type" : "static",
"paths" : "hdfs://master:9000/user/root/.."
}
}
// 示例2- granularity :
"ioConfig" :
{
"type" : "hadoop",
"inputSpec" :
{
"type" : "granularity",
...
}
}
4、tuningConfig- 优化配置(可选)
用于指定如何协调各种不同的参数,通常在优化配置中可以指定一些优化选项
// type:指定数据存放方式
// paritionSpec:用于指定数据的segment的分区方式以及大小,默认为hashed。Segment会给予时间戳进行分区,并根据其他类型进一步分区,druid支持hashed(基于每行所有维度的哈希值)和dimension(基于单个维度的范围)来进行分区。为了让druid在重查询负载下运行良好,对于段文件大小的推荐在300Mb到700mb之间,可以使用partitionsSpec来调整大小。
// hashed分区。hashed分区首先会选择多个Segment,然后根据每行数据所有列的哈希值对这些Segment进行分区,Segment的数量是输入数据集的基数以及目标分区大小自动确定的。
//type:要分区的类型,hashed
//targetPartitionSize:包含在分区中的目标行数,应该在500M-1G
//maxPartitionSize:分区中包括的最大行数,默认为比targetPartitionSize大50%。
//numShards:直接指定分区的数量而不是分区的大小,可以跳过自动选择多个分区的必要步骤,录入数据将会更快。此项与targetPartitionSize只需要填入一项即可。
//partitionDemensions(可选),只能配合numShards一起使用,指定需要分区的维度,为空则选择所有维度列,当targetPartitionSize设置时候将自动忽略。
//示例:
"partitionsSpec":
{
"type":"hashed",
"targetPartitionSize":5000000,
"maxPartitionSize":7500000,
"assumeGrouped":false,
"numShards": -1,
"partitionDimensions": [ ]
}
// Only-dimension单维度分区。选择作为分区指标的维度列,然后将该维度分隔成连续的不同的分区,每个分区都会包含该维度值在该范围内的所有行。默认情况下使用的维度都是自动指定的。
//type:要分区的类型,dimension
//targetPartitionSize(必须):包含在分区中的目标行数,应该在500M-1G
//maxPartitionSize(可选):分区中包括的最大行数,默认为比targetPartitionSize大50%。
//partitionDemension(可选),要分区的维度,为空时自动选择
//assumeGrouped(可选):如果数据源已经按照时间和维度分组了,该选项将会提高加载数据的速度,但是如果没有那么会选择次优分区。
// jobProperties:在添加mapreduce作业时候的一些配置,key:value表示。我们的集群因为一些问题需要对虚拟机进行一些调参。
//示例1:
"tuningConfig" :
{
"type" : "hadoop",
"partitionsSpec" :
{
"type" : "hashed",
"targetPartitionSize" : 5000000
},
"jobProperties" :
{
"mapreduce.map.log.level" : "DEBUG",
"mapreduce.reduce.log.level" : "DEBUG",
"mapreduce.reduce.java.opts" : "-Xmx1024m",
"mapreduce.map.java.opts" : "-Xmx1024m"
}
}
//示例2:
"tuningConfig":
{
"type": "hadoop",
"partitionsSpec": {
"type": "hashed",
"targetPartitionSize": 5000000
},
"jobProperties": {
"fs.default.name": "hdfs://HadoopHAcluster/druid",
"fs.defaultFS": "hdfs://HadoopHAcluster/druid",
"dfs.datanode.address": "server2",
"dfs.client.use.datanode.hostname": "true",
"dfs.datanode.use.datanode.hostname": "true",
"yarn.resourcemanager.hostname": "server1",
"yarn.nodemanager.vmem-check-enabled": "false",
"mapreduce.map.java.opts": "-Duser.timezone=UTC -Dfile.encoding=UTF-8",
"mapreduce.job.user.classpath.first": "true",
"mapreduce.reduce.java.opts": "-Duser.timezone=UTC -Dfile.encoding=UTF-8",
"mapreduce.map.memory.mb": 1024,
"mapreduce.reduce.memory.mb": 1024
}
}
//示例3:
"tuningConfig":
{
"type": "index",
// 分区类型
"partitionsSpec": {
"type": "hashed",
// 每个分区的目标行数(这里配置每个分区500W行)
"targetPartitionSize": 5000000
}
}
// workingPath:用于指示数据中间落地的路径(mapreduce中间结果),默认为'/tmp/druid-indexing'
// version:创建更加详细的版本,这将忽略hadoopindextask,除非将useExplicitVersion设置为true,默认为日期时间的索引为开始
// maxRowsInMemory:指定聚合之后的数据行数,默认75000
// leaveInermediate:作业完成后是否留下workingPath的中间文件,默认false。
// cleanupOnFailure:作业失败时是否清除中间文件,只有在leavelnermediate为true时候生效默认true。
// overwriteFiles:在执行index导入数据时候是否覆盖已经存在的文件。默认false
// ignoreInvalidRows:忽略发现有问题的行,默认false
// combineText:是否在CombineTextInputFormat阶段将多个文件合并到一个文件split中,这可以在处理大量小文件时候加速hadoop作业。
// useCombiner:如果可能的话,是否在mapper阶段中合并行,默认false
// indexSpec:调整数据的索引方式
// buildV9Directly:是否直接构建V9索引,而不是先构建V8 index再转换为V9 index。
// numBackgroundPersistThreads:是否增加后台持久化线程的数量,这会导致内存以及cpu的负荷增加,提高效率,默认为0(使用当前的持久化线程),建议用1
// forceExtendableShardSpecs:强制使用可扩展的ShardSpecs,该功能可以与kafka index扩展服务一起使用,默认false
// useExplicitVersion:是否强制使用HasoopIndexTask版本,默认false
二、Druid数据查询
下面以 metrics-kafka 为例,演示在Druid中使用不同方式来进行数据查询、分析。
1、JSON API方式
1)、JSON查询语法结构
Druid最早提供JSON API的方式查询数据,通过JSON格式来定义各种查询组件实现数据查询。
2)、示例
- 将JSON数据提交请求到: http://server3:8082/druid/v2?pretty
{
"queryType":"search",
// 指定要查询的数据源
"dataSource":"metrics-kafka",
// 聚合器,描述如何进行聚合,对哪个指标字段进行聚合、进行哪种聚合、指定聚合后的列名
"aggregations":[
{
"type": "count",
"name": "views"
},
{
"name": "latencyMs",
"type": "doubleSum",
"fieldName": "latencyMs"
}
],
// 指定查询的时间范围,前闭后开
"intervals":["2022-07-23/2019-07-24"]
}
- 使用Postman来测试JSON API查询
- 复制用于查询的JSON数据
{
"queryType":"search",
"dataSource":"metrics-kafka",
"aggregations":[
{
"type": "count",
"name": "views"
},
{
"name": "latencyMs",
"type": "doubleSum",
"fieldName": "latencyMs"
}
],
"intervals":["2022-07-23/2019-07-24"]
}
- 发送请求到 http://server3:8082/druid/v2?pretty
2、SQL 方式
使用Druid SQL查询,可以使用SQL查询来代替Druid原生基于JSON的查询方式。Druid SQL将SQL语句解析为原生JSON API方式,再执行查询。
1)、Druid SQL可视化界面
Druid 提供了一个图形界面SQL查询接口
2)、查询语法
在Druid中,每一个数据源在Druid中都对应一张表,可以直接通过SELECT语句查询表中的数据。
- 语法结构 Druid SQL支持的SELECT查询语法结构
[ EXPLAIN PLAN FOR ]
[ WITH tableName [ ( column1, column2, ... ) ] AS ( query ) ]
SELECT [ ALL | DISTINCT ] { * | exprs }
FROM table
[ WHERE expr ]
[ GROUP BY exprs ]
[ HAVING expr ]
[ ORDER BY expr [ ASC | DESC ], expr [ ASC | DESC ], ... ]
[ LIMIT limit ]
[ UNION ALL <another query> ]
- EXPLAIN PLAN FOR 在SELECT语句前面添加EXPLAIN PLAN FOR,可以查看到Druid SQL是如何解释为Druid JSON API查询的。SELECT语句并没有真正地执行。
- WITH tableName 定义一个SQL片断,该SQL片断会被整个SQL语句所用到
WITH cr1 AS
(SELECT city, SUM(click) as click from ad_event GROUP BY 1)
select * from cr1 where city = 'beijing'
- GROUP BY GROUP BY 语句可以使用 1、2、...位置来替代
SELECT
city,
SUM(click) as click
from
ad_event
GROUP BY 1
ORDER BY 也支持类似GROUP BY 的语法
- UNION ALL UNION ALL操作符表示将多个SELECT语句放在一起(并集),每个SELECT语句都会一个接一个单独执行(并不是并行执行),Druid当前并不支持 UNION(不支持去重)
3、Druid SQL不支持的功能
- JOIN语句
- DDL/DML语句
4、聚合函数
Druid SQL中的聚合函数可以使用以下语法:
AGG(expr) FILTER(WHERE whereExpr)
这样聚合函数只会聚合符合条件的行数据
SELECT url, sum("views") filter(where url != '/') FROM "metrics-kafka" GROUP BY url;
SELECT url, sum("views") FROM "metrics-kafka" GROUP BY url;