5、Druid配置文件详细介绍以及示例
  nNPyvzOmRTFq 2023年11月02日 37 0

Apache Druid 系列文章

1、Druid(Imply-3.0.4)介绍及部署(centos6.10)、验证 2、Druid的入门示例(使用三种不同的方式摄入数据和提交任务) 3、Druid的load data 示例(实时kafka数据和离线-本地或hdfs数据) 4、java操作druid api 5、Druid配置文件详细介绍以及示例 6、Druid的Roll up详细介绍及示例


(文章目录)


本文主要介绍了Druid的配置文件内容,以及以示例说明配置文件的使用。 本文依赖是Druid和kafka环境具备。 本文分为2个部分,即配置文件说明和数据查询示例。

一、摄取配置文件结构说明

1、整体结构

摄取配置文件主要由以下几个部分组成

{
# 表示上传的方式,本地文件上传使用index,hdfs文件上传使用index_hadoop
"type":"index_hadoop",

# 用于设置数据的具体配置以及转换方式
"spec"{},

# (可选):用于配置一些运行参数,比如可以设置上传csv时候是否包含表头行
"context":{}
}
  • type:文件上传方式(index、index_hadoop、kafka)

  • spec dataSchema:数据解析模式 ioConfig:数据源 turningConfig:优化配置(分区规则、分区大小)

  • 示例

{
    // 文件上传方式
    "type": "index",
    "spec": {
            // 数据解析模式
            "dataSchema": {...},
            // 摄取数据源
            "ioConfig": {...},
            // 摄取过程优化配置(可选)
            "tuningConfig": {...}
    }
}

2、dataSchema-数据解析模式

数据解析模式,主要为针对数据文件,定义了一系列规则。如:

  • 数据源
  • 数据解析规则
  • 数据粒度规则配置
  • 定义如何进行指标计算

1)、配置项说明

// 1、dataSource:数据源名称,用于设置上传数据之后的表名称。
 "dataSource"="tableName"
 
// 2、parser:用于指定数据怎么被转化,转化为什么格式
// type:指定常规数据的格式,默认为string;如果保存hdfs上那么指定为hadoopyString
// parseSpec:用于指定数据转换格式
//  format:指定上传文件格式,可以为csv,json,tsv,javascript、timeAndDims等
//  timestampSpec:指定时间戳序列,包含column以及format两个参数,column必选,用于指定时间戳对应的列。format用于指定时间格式,可以使用iso、millis、posix、zuto,默认为auto。示例:
        "timestampSpec":{"format":"auto","column":"start_time"}
//  columns(可选):csv格式特有,用于指定源数据中的列名。用于配置源文件中应该包含的所有列的列名。示例:
        "columns":["columns","column2","column3"]
//  dimensionsSpec:配置维度数据,也就是将要在druid数据表中展现的列。dimensionExclusion可选,用于指定不需要的数据,默认为空;spatialDimensions可选,用于指定列的空间限制,默认为空。示例:
        "dimensionsSpec":{
            "dimensions":["page","language",{"type":"long","name":"countryNum" } ]   
            }
      // 如果字段为String,那么直接列出,如果为其他类型如long/double等,需要使用{"type":"long","name":"字段名称" }单独列出。配置时间戳的列不需要在dimensions中列出。

//3、granularitySpec:指定如何划分segment以及数据的时间范围
// type:用来指定粒度类型的使用,默认为type=uniform,建议设置为uniform或arbitrary(尝试创建大小相等的字段)
// segmentGranularity(可选):指定每个segment包含的时间戳的范围。默认为day,用来确定每个segment包含的时间戳的范围,可以为"SECOND"、"MINUTE" 、"HOUR"、"DAY"、"DOW"、"DOY"、"WEEK"、"MONTH"、"QUARTER"、"YEAR"、"EPOCH"、"DECADE"、"CENTURY"、"MILLENNIUM"等。
// queryGranularity(可选):默认为None,允许查询的时间粒度,单位与segmentGranularity相同,如果为None那么允许以任意时间粒度进行查询。
// rollup(可选):是否使用预计算算法,默认为true,推荐true,比较快。
// intervals:用于指定上传时间限制时间段。只有时间段内的数据可以上传。批量数据导入需要设置/流式导入无需设置。示例:
        "granularitySpec" : {
            "type" : "uniform",
            "segmentGranularity" : "DAY",
            "queryGranularity" : {  "type" : "none"},
            "rollup" : "true",
            "intervals" : [ "2017-11-15T00:00:00.000Z/2017-11-18T00:00:00.000Z" ]
            }
            
//4、metricsSpec:包含了一系列的aggregators转换
    //type可以为:count、longSum、doubleSum、doubleMin\doubleMax、longMin\longMax、doubleFirst\doubleLast、longFirst\longLast
    //除count外其他都需要指定name和fieldName两个参数,name表示最后输出的,也就是在表中体现的名称,而fieldName则代表源数据中的列名。
    //更多说明参考:http://druid.io/docs/0.10.1/querying/aggregations.html
    //示例:
    "metricsSpec":
        [
            {"type":"count","name":"count"},
            {"type":"doubleSum","name":"added","fieldName":"added"},
            {"type":"doubleSum","name":"deleted","fieldName":"deleted"},
            {"type":"doubleSum","name":"delta","fieldName":"delta"}
        ]

2)、示例

//数据摄取模式
"dataSchema": {
    // 数据源(表),数据源名称,用于设置上传数据之后的表名称
    "dataSource": "testdata",
    // 解析器,用于指定数据怎么被转化,转化为什么格式
    "parser": {
        // 解析字符串文本
        "type": "String",
        "parseSpec": {
            // 字符串文本格式为JSON
            "format": "json",
            // 指定维度列名,维度与时间一致,导入时聚合
            "dimensionsSpec": {
                "dimensions": [
                    "city",
                    "platform"
                ]
            },
            // 指定时间戳的列,以及时间戳格式化方式
            "timestampSpec": {
                "format": "auto",
                "column": "timestamp"
            }
        }
    },
    // 指标计算规则,指定如何划分segment以及数据的时间范围
    "metricsSpec": [
        {
            //name表示列名
            "name": "count",
            "type": "count"
        },
        {
            // 聚合计算后指标的列名
            "name": "click",
            // 聚合函数:count、longSum、doubleSum、longMin、doubleMin、doubleMax
            "type": "longSum",
            "fieldName": "click"
        }
    ]
    // 粒度规则,指定如何划分segment以及数据的时间范围
    "granularitySpec": {
        "type": "uniform",
        // 按天来生成 segment (每天生成一个segment)
        "segmentGranularity": "day",
        // 查询的最小粒度(最小粒度为小时)
        "queryGranularity": "hour",
        // 加载原始数据的时间范围,批量数据导入需要设置/流式导入无需设置
        "intervals": [
            "2022-12-01/2022-12-31"
        ]
    },
    
}

3、ioConfig配置-摄取数据源-

用于指定数据的来源以及存储位置,数据源配置主要指定

  • type:用于指定源数据获取方式
  • inputSpec:指定源数据路径
"ioConfig": {
    "type": "index",
    "inputSpec": {
        // 本地文件 local/ HDFS使用 hadoop,当设置type为“granularity”时候,则需要根据时间戳使用路径格式将数据导入目录中。
        "type": "local",
        // 路径,支持批量上传数据,路径指定到文件夹即可
        "baseDir": "/root/data/",
        // 只过滤出来哪个/哪些文件
        "filter": "test.json"
    }
}

// 示例1- HDFS:
"ioConfig" : {
    "type" : "hadoop",
    "inputSpec" : 
        {
            "type" : "static",
            "paths" : "hdfs://master:9000/user/root/.."
        }
   }

// 示例2- granularity :
"ioConfig" : 
    {
        "type" : "hadoop",
        "inputSpec" : 
            {
                "type" : "granularity",
                ...
            }
    }

4、tuningConfig- 优化配置(可选)

用于指定如何协调各种不同的参数,通常在优化配置中可以指定一些优化选项

// type:指定数据存放方式
// paritionSpec:用于指定数据的segment的分区方式以及大小,默认为hashed。Segment会给予时间戳进行分区,并根据其他类型进一步分区,druid支持hashed(基于每行所有维度的哈希值)和dimension(基于单个维度的范围)来进行分区。为了让druid在重查询负载下运行良好,对于段文件大小的推荐在300Mb到700mb之间,可以使用partitionsSpec来调整大小。
//	hashed分区。hashed分区首先会选择多个Segment,然后根据每行数据所有列的哈希值对这些Segment进行分区,Segment的数量是输入数据集的基数以及目标分区大小自动确定的。
            //type:要分区的类型,hashed
            //targetPartitionSize:包含在分区中的目标行数,应该在500M-1G
            //maxPartitionSize:分区中包括的最大行数,默认为比targetPartitionSize大50%。
            //numShards:直接指定分区的数量而不是分区的大小,可以跳过自动选择多个分区的必要步骤,录入数据将会更快。此项与targetPartitionSize只需要填入一项即可。
            //partitionDemensions(可选),只能配合numShards一起使用,指定需要分区的维度,为空则选择所有维度列,当targetPartitionSize设置时候将自动忽略。
            //示例:
            "partitionsSpec": 
                {
                    "type":"hashed",
                    "targetPartitionSize":5000000,
                    "maxPartitionSize":7500000,
                    "assumeGrouped":false,
                    "numShards": -1,
                    "partitionDimensions": [ ]
                }
//	Only-dimension单维度分区。选择作为分区指标的维度列,然后将该维度分隔成连续的不同的分区,每个分区都会包含该维度值在该范围内的所有行。默认情况下使用的维度都是自动指定的。
        //type:要分区的类型,dimension
        //targetPartitionSize(必须):包含在分区中的目标行数,应该在500M-1G
        //maxPartitionSize(可选):分区中包括的最大行数,默认为比targetPartitionSize大50%。
        //partitionDemension(可选),要分区的维度,为空时自动选择
        //assumeGrouped(可选):如果数据源已经按照时间和维度分组了,该选项将会提高加载数据的速度,但是如果没有那么会选择次优分区。

// jobProperties:在添加mapreduce作业时候的一些配置,key:value表示。我们的集群因为一些问题需要对虚拟机进行一些调参。
    
    //示例1:
    "tuningConfig" : 
        {
            "type" : "hadoop",
            "partitionsSpec" : 
                {
                    "type" : "hashed",
                    "targetPartitionSize" : 5000000
                },
            "jobProperties" : 
                {
                    "mapreduce.map.log.level" : "DEBUG",
                    "mapreduce.reduce.log.level" : "DEBUG",
                    "mapreduce.reduce.java.opts" : "-Xmx1024m",
                    "mapreduce.map.java.opts" : "-Xmx1024m"
                }
            }
    
    //示例2:
        "tuningConfig": 
        {
            "type": "hadoop",
            "partitionsSpec": {
                "type": "hashed",
                "targetPartitionSize": 5000000
            },
            "jobProperties": {
                "fs.default.name": "hdfs://HadoopHAcluster/druid",
                "fs.defaultFS": "hdfs://HadoopHAcluster/druid",
                "dfs.datanode.address": "server2",
                "dfs.client.use.datanode.hostname": "true",
                "dfs.datanode.use.datanode.hostname": "true",
                "yarn.resourcemanager.hostname": "server1",
                "yarn.nodemanager.vmem-check-enabled": "false",
                "mapreduce.map.java.opts": "-Duser.timezone=UTC -Dfile.encoding=UTF-8",
                "mapreduce.job.user.classpath.first": "true",
                "mapreduce.reduce.java.opts": "-Duser.timezone=UTC -Dfile.encoding=UTF-8",
                "mapreduce.map.memory.mb": 1024,
                "mapreduce.reduce.memory.mb": 1024
            }
        }
     
     //示例3:
     "tuningConfig": 
     {
        "type": "index",
        // 分区类型
        "partitionsSpec": {
            "type": "hashed",
            // 每个分区的目标行数(这里配置每个分区500W行)
            "targetPartitionSize": 5000000
        }
    }
    
// workingPath:用于指示数据中间落地的路径(mapreduce中间结果),默认为'/tmp/druid-indexing'
// version:创建更加详细的版本,这将忽略hadoopindextask,除非将useExplicitVersion设置为true,默认为日期时间的索引为开始
// maxRowsInMemory:指定聚合之后的数据行数,默认75000
// leaveInermediate:作业完成后是否留下workingPath的中间文件,默认false。
// cleanupOnFailure:作业失败时是否清除中间文件,只有在leavelnermediate为true时候生效默认true。
// overwriteFiles:在执行index导入数据时候是否覆盖已经存在的文件。默认false
// ignoreInvalidRows:忽略发现有问题的行,默认false
// combineText:是否在CombineTextInputFormat阶段将多个文件合并到一个文件split中,这可以在处理大量小文件时候加速hadoop作业。
// useCombiner:如果可能的话,是否在mapper阶段中合并行,默认false
// indexSpec:调整数据的索引方式
// buildV9Directly:是否直接构建V9索引,而不是先构建V8 index再转换为V9 index。
// numBackgroundPersistThreads:是否增加后台持久化线程的数量,这会导致内存以及cpu的负荷增加,提高效率,默认为0(使用当前的持久化线程),建议用1
// forceExtendableShardSpecs:强制使用可扩展的ShardSpecs,该功能可以与kafka index扩展服务一起使用,默认false
// useExplicitVersion:是否强制使用HasoopIndexTask版本,默认false

二、Druid数据查询

下面以 metrics-kafka 为例,演示在Druid中使用不同方式来进行数据查询、分析。

1、JSON API方式

1)、JSON查询语法结构

Druid最早提供JSON API的方式查询数据,通过JSON格式来定义各种查询组件实现数据查询。

2)、示例

{
    "queryType":"search",
    // 指定要查询的数据源
    "dataSource":"metrics-kafka",
    // 聚合器,描述如何进行聚合,对哪个指标字段进行聚合、进行哪种聚合、指定聚合后的列名
    "aggregations":[
        {
            "type": "count",
            "name": "views"
        },
        {
             "name": "latencyMs",
             "type": "doubleSum",
             "fieldName": "latencyMs"
        }
    ],
    // 指定查询的时间范围,前闭后开
    "intervals":["2022-07-23/2019-07-24"]
}
  • 使用Postman来测试JSON API查询
  • 复制用于查询的JSON数据
{
    "queryType":"search",
    "dataSource":"metrics-kafka",
    "aggregations":[
        {
            "type": "count",
            "name": "views"
        },
        {
             "name": "latencyMs",
             "type": "doubleSum",
             "fieldName": "latencyMs"
        }
    ],
    "intervals":["2022-07-23/2019-07-24"]
}
  • 发送请求到 http://server3:8082/druid/v2?pretty 在这里插入图片描述

2、SQL 方式

使用Druid SQL查询,可以使用SQL查询来代替Druid原生基于JSON的查询方式。Druid SQL将SQL语句解析为原生JSON API方式,再执行查询。

1)、Druid SQL可视化界面

Druid 提供了一个图形界面SQL查询接口 在这里插入图片描述

2)、查询语法

在Druid中,每一个数据源在Druid中都对应一张表,可以直接通过SELECT语句查询表中的数据。

  • 语法结构 Druid SQL支持的SELECT查询语法结构
[ EXPLAIN PLAN FOR ]
[ WITH tableName [ ( column1, column2, ... ) ] AS ( query ) ]
SELECT [ ALL | DISTINCT ] { * | exprs }
FROM table
[ WHERE expr ]
[ GROUP BY exprs ]
[ HAVING expr ]
[ ORDER BY expr [ ASC | DESC ], expr [ ASC | DESC ], ... ]
[ LIMIT limit ]
[ UNION ALL <another query> ]
  • EXPLAIN PLAN FOR 在SELECT语句前面添加EXPLAIN PLAN FOR,可以查看到Druid SQL是如何解释为Druid JSON API查询的。SELECT语句并没有真正地执行。
  • WITH tableName 定义一个SQL片断,该SQL片断会被整个SQL语句所用到
WITH cr1 AS
(SELECT city, SUM(click) as click from ad_event GROUP BY 1)
select * from cr1 where city = 'beijing'
  • GROUP BY GROUP BY 语句可以使用 1、2、...位置来替代
SELECT 
    city, 
    SUM(click) as click 
from 
    ad_event 
GROUP BY 1

ORDER BY 也支持类似GROUP BY 的语法

  • UNION ALL UNION ALL操作符表示将多个SELECT语句放在一起(并集),每个SELECT语句都会一个接一个单独执行(并不是并行执行),Druid当前并不支持 UNION(不支持去重)

3、Druid SQL不支持的功能

  • JOIN语句
  • DDL/DML语句

4、聚合函数

Druid SQL中的聚合函数可以使用以下语法:

AGG(expr) FILTER(WHERE whereExpr)

这样聚合函数只会聚合符合条件的行数据

SELECT url, sum("views") filter(where url != '/') FROM "metrics-kafka" GROUP BY url;
SELECT url, sum("views")   FROM "metrics-kafka" GROUP BY url;

在这里插入图片描述 在这里插入图片描述

【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月08日 0

暂无评论

推荐阅读
nNPyvzOmRTFq
最新推荐 更多

2024-05-31