Flink之SQL查询操作
  GLcPk8bYyq2p 2023年11月22日 17 0

基本SELECT查询

生成测试数据

使用DataGen SQL连接器生成测试数据

CREATE TABLE datagen (
 f_sequence INT,
 f_random INT,
 f_random_str STRING,
 ts AS localtimestamp,
 WATERMARK FOR ts AS ts
) WITH (
--指定要使用的连接器--
 'connector' = 'datagen',
-- 每秒生成的行数,用以控制数据发出速率。--	
 'rows-per-second'='5',
--指定字段的生成器。可以是 'sequence' 或 'random',默认random--
 'fields.f_sequence.kind'='sequence',
 --序列生成器的起始值--
 'fields.f_sequence.start'='1',
 'fields.f_sequence.end'='1000',
 --随机生成器的最小值,适用于数字类型--
 'fields.f_random.min'='1',
 'fields.f_random.max'='1000',
--随机生成器生成字符的长度,适用于 char、varchar、binary、varbinary、string-- 
'fields.f_random_str.length'='10'
);

在这里插入图片描述

WITH

WITH子句提供了一种用于更大查询而编写辅助语句的方法。这些编写的语句通常被称为公用表表达式,表达式可以理解为仅针对某个查询而存在的临时视图。

语法如下:

WITH <with_item_definition> [ , ... ]
SELECT ... FROM ...;

<with_item_defintion>:
    with_item_name (column_name[, ...n]) AS ( <select_query> )

定义一个公用表表达式orders_with_total,并在一个GROUP BY查询中使用它

WITH orders_with_total AS (
    SELECT order_id, price + tax AS total
    FROM Orders
)
SELECT order_id, SUM(total)
FROM orders_with_total
GROUP BY order_id;

在SQL客户端直接输入:

WITH datagen_with_total AS (
    SELECT f_sequence, f_random + 10 AS total
    FROM datagen
)

SELECT f_sequence, SUM(total)
FROM datagen_with_total
GROUP BY f_sequence;

WHERE

语法格式如下:

SELECT select_list FROM table_expression [ WHERE boolean_expression ]
SELECT * FROM datagen;

SELECT f_sequence,f_random,f_random_str  FROM datagen;

SELECT f_sequence,f_random,f_random_str  FROM datagen WHERE f_sequence >10;

# 在VALUES子句中使用内联数据。每一个元组对应一行,另外可以通过设置别名来为每一列指定名称。
SELECT order_id, price FROM (VALUES (1, 2.0), (2, 3.1))  AS t (order_id, price)

DISTINCT

所有的复制行都会从结果集(每个分组只会保留一行)中被删除

SELECT DISTINCT id FROM Orders

根据f_random字段进行数据去重

SELECT DISTINCT f_random FROM datagen WHERE f_sequence <10;

ORDER BY

ORDER BY子句使结果行根据指定的表达式进行排序。 如果两行根据最左边的表达式相等,则根据下一个表达式进行比较,依此类推。 如果根据所有指定的表达式它们相等,则它们以与实现相关的顺序返回。

在流模式下运行时,表的主要排序顺序必须按时间属性升序。 所有后续的 orders 都可以自由选择。 但是批处理模式没有这个限制。

SELECT *
FROM Orders
ORDER BY order_time, order_id

LIMIT

LIMIT子句限制 SELECT 语句返回的行数。 通常,此子句与 ORDER BY 结合使用,以确保结果是确定性的。

选择表中的前3行

SELECT *
FROM Orders
ORDER BY orderTime
LIMIT 3

窗口函数

概述

Apache Flink提供了3个内置窗口表值函数TVF:TUMBLE、HOP和CUMULATE。

窗口TVF的返回值是一个新的关系,包括原始表的所有列和附加的三个用于指定窗口的列,分别是:window_startwindow_endwindow_time

函数通过时间属性字段为每行数据分配一个窗口。 在流计算模式,时间属性字段必须被指定为事件或处理时间属性。

在流模式下,窗口表值函数的时间属性字段必须位于事件或处理时间属性上。且时间属性字段必须是TIMESTAMPTIMESTAMP_LTZ的类型。

函数运行后,原有的时间属性timecol将转换为一个常规的timestamp

创建数据表

  CREATE TABLE datagen (
  id INT,
  age INT,
  pt AS PROCTIME(), --处理时间
  et AS cast(CURRENT_TIMESTAMP as timestamp(3)), --事件时间
  WATERMARK FOR et AS et - INTERVAL '5' SECOND   --watermark
) WITH (
  'connector' = 'datagen',
  'rows-per-second' = '10',
  'fields.id.min' = '1',
  'fields.id.max' = '10',
  'fields.age.min' = '1',
  'fields.age.max' = '150'
);

滚动窗口 TUMBLE

TUMBLE函数指定每个元素到一个指定大小的窗口中。滚动窗口的大小固定且不重复。

例如:假设指定了一个 5 分钟的滚动窗口。Flink 将每 5 分钟生成一个新的窗口 在这里插入图片描述

TUMBLE函数语法如下:

TUMBLE(TABLE data, DESCRIPTOR(timecol), size [, offset ])
data:是一个表参数,可以是与时间属性列的任意关系。

timecol:是一个列描述符,指示数据的哪些时间属性列应映射到滚动窗口。

size:是指定翻滚窗口宽度的持续时间。

offset: 是一个可选参数,用于指定窗口开始移动的偏移量。

示例:

SELECT * FROM TABLE(
   TUMBLE(TABLE datagen, DESCRIPTOR(et), INTERVAL '5' SECOND));

查询结果如下

          id         age                      pt                      et            window_start              window_end             window_time
           2          28 2023-07-08 18:04:36.792 2023-07-08 18:04:36.792 2023-07-08 18:04:35.000 2023-07-08 18:04:40.000 2023-07-08 18:04:39.999
           7         146 2023-07-08 18:04:36.792 2023-07-08 18:04:36.792 2023-07-08 18:04:35.000 2023-07-08 18:04:40.000 2023-07-08 18:04:39.999
           3          76 2023-07-08 18:04:36.793 2023-07-08 18:04:36.792 2023-07-08 18:04:35.000 2023-07-08 18:04:40.000 2023-07-08 18:04:39.999
          10          94 2023-07-08 18:04:36.793 2023-07-08 18:04:36.793 2023-07-08 18:04:35.000 2023-07-08 18:04:40.000

滑动窗口 HOP

滑动窗口函数指定元素到一个定长的窗口中。滑动窗口有一个固定的持续时间以及一个滑动的间隔。

若滑动间隔小于窗口的持续时间,滑动窗口则会出现重叠,行将会被分配到多个窗口中。

例如:可以定义一个每5分钟滑动一次。大小为10分钟的窗口。每5分钟获得最近10分钟到达的数据的窗口 在这里插入图片描述 HOP函数语法如下:

HOP(TABLE data, DESCRIPTOR(timecol), slide, size [, offset ])
data:是一个表参数,可以是与时间属性列的任意关系

timecol:是一个列描述符,指示数据的哪些时间属性列应映射到跳跃窗口

slide:是指定连续跳跃窗口开始之间的持续时间的持续时间

size:是指定跳跃窗口宽度的持续时间

offset: 是一个可选参数,用于指定窗口开始移动的偏移量

示例:

SELECT * FROM TABLE(
    HOP(TABLE datagen, DESCRIPTOR(et), INTERVAL '5' SECOND, INTERVAL '10' SECOND));

累积窗口 CUMULATE

CUMULATE函数指定元素到多个窗口,从初始的窗口开始,直到达到最大的窗口大小的窗口,所有的窗口都包含其区间内的元素

窗口的开始时间是固定的,可以将CUMULATE函数视为首先应用具有最大窗口大小的TUMBLE窗口,然后将每个滚动窗口拆分为具有相同窗口开始但窗口结束步长不同的几个窗口。 所以累积窗口会产生重叠并且没有固定大小。

累积窗口会在一定的统计周期内进行累积计算。累积窗口中有两个核心的参数:最大窗口长度和累积步长。所谓最大窗口长度其实就是统计周期,最终目的就是统计这段时间内的数据。

例如:1小时步长,24小时大小的累计窗口,每天可以获得如下这些窗口:[00:00, 01:00),[00:00, 02:00),[00:00, 03:00), …, [00:00, 24:00) 在这里插入图片描述 CUMULATE函数语法如下:

CUMULATE(TABLE data, DESCRIPTOR(timecol), step, size)
data:是一个表参数,可以是与时间属性列的任意关系

timecol:是一个列描述符,指示数据的哪些时间属性列应映射到累积窗口

step:是指定连续累积窗口末尾之间增加的窗口大小的持续时间

size:是指定累积窗口最大宽度的持续时间。size必须是 的整数倍step

offset: 是一个可选参数,用于指定窗口开始移动的偏移量

示例:

SELECT * FROM TABLE(
    CUMULATE(TABLE datagen, DESCRIPTOR(et), INTERVAL '2' SECONDS, INTERVAL '6' SECONDS));

窗口偏移

Offset是一个可选参数,可用于更改窗口分配。它可以是正持续时间和负持续时间。窗口偏移的默认值为0。如果设置不同的偏移值,同一条记录可能会分配到不同的窗口。

例如,对于2021-06-30 00:00:04大小为 10 MINUTE 的 Tumble 窗口,带有时间戳的记录将分配给哪个窗口?

如果offset值为-16 MINUTE,则记录分配给窗口 [ 2021-06-29 23:54:00, 2021-06-30 00:04:00)
如果offset值为-6 MINUTE,则记录分配给窗口 [ 2021-06-29 23:54:00, 2021-06-30 00:04:00)
如果offset是-4 MINUTE,则记录分配给窗口[ 2021-06-29 23:56:00, 2021-06-30 00:06:00)

如果offset是0,则记录分配给窗口[ 2021-06-30 00:00:00, 2021-06-30 00:10:00)

如果offset是4 MINUTE,则记录分配给窗口[ 2021-06-29 23:54:00, 2021-06-30 00:04:00)
如果offset是6 MINUTE,则记录分配给窗口[ 2021-06-29 23:56:00, 2021-06-30 00:06:00)
如果offset是16 MINUTE,则记录分配给窗口[ 2021-06-29 23:56:00, 2021-06-30 00:06:00)

可以发现,一些窗口偏移参数可能对窗口的分配有同样的影响。在上述情况下,-16 、-6 和4对于大小为10的翻滚窗口具有相同的效果。

窗口偏移的作用只是更新窗口分配,对 Watermark 没有影响。

示例:

SQL> SELECT * FROM TABLE(
   TUMBLE(TABLE datagen, DESCRIPTOR(et), INTERVAL '5' SECOND, INTERVAL '10' SECOND));

聚合

窗口聚合

窗口聚合是通过GROUP BY子句定义的,其特征是包含窗口表值函数 产生的window_startwindow_end列。和普通的GROUP BY子句一样,窗口聚合对于每个组会计算出一行数据

和其他连续表上的聚合不同,窗口聚合不产生中间结果,只在窗口结束产生一个总的聚合结果,另外,窗口聚合会清除不需要的中间状态

窗口函数聚合使用语法:

FROM TABLE(
窗口类型(TABLE 表名, DESCRIPTOR(时间字段),INTERVAL 时间)
)
GROUP BY [window_start,][window_end,] --可选

1.ROLLUP窗口聚合 聚合写法:

   SELECT window_start, window_end, SUM(age) sumAge
  FROM TABLE(
    TUMBLE(TABLE datagen, DESCRIPTOR(et), INTERVAL '5' SECOND))
  GROUP BY window_start, window_end;

输出如下结果:

            window_start              window_end      sumAge
 2023-07-08 18:32:05.000 2023-07-08 18:32:10.000        2840
 2023-07-08 18:32:10.000 2023-07-08 18:32:15.000        3558

2.HOP窗口聚合 聚合写法:

SELECT window_start, window_end, SUM(age) FROM TABLE(
    HOP(TABLE datagen, DESCRIPTOR(et), INTERVAL '5' SECOND, INTERVAL '10' SECOND))
    GROUP BY window_start, window_end;

3.CUMULATE窗口聚合 聚合写法:

 SELECT window_start, window_end, SUM(age)
  FROM TABLE(
    CUMULATE(TABLE datagen, DESCRIPTOR(et), INTERVAL '5' SECOND, INTERVAL '10' SECOND))
  GROUP BY window_start, window_end;

4.GROUPING SET窗口聚合

1.GROUPING SETS

窗口聚合也支持GROUPING SETS语法,要求窗口聚合中GROUP BY子句必须包含window_start 和 window_end列,但GROUPING SETS子句中不能包含这两个字段。

Flink SQL> SELECT window_start, window_end, supplier_id, SUM(price) as price
  FROM TABLE(
    TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))
  GROUP BY window_start, window_end, GROUPING SETS ((supplier_id), ());
+------------------+------------------+-------------+-------+
|     window_start |       window_end | supplier_id | price |
+------------------+------------------+-------------+-------+
| 2020-04-15 08:00 | 2020-04-15 08:10 |      (NULL) | 11.00 |
| 2020-04-15 08:00 | 2020-04-15 08:10 |   supplier2 |  5.00 |
| 2020-04-15 08:00 | 2020-04-15 08:10 |   supplier1 |  6.00 |
| 2020-04-15 08:10 | 2020-04-15 08:20 |      (NULL) | 10.00 |
| 2020-04-15 08:10 | 2020-04-15 08:20 |   supplier2 |  9.00 |
| 2020-04-15 08:10 | 2020-04-15 08:20 |   supplier1 |  1.00 |
+------------------+------------------+-------------+-------+

2.ROLLUP

ROLLUP是一种特定通用类型 Grouping Sets 的简写。代表着指定表达式和所有前缀的列表,包括空列表。

例如:ROLLUP (one,two) 等效于 GROUPING SET((one,two),(one),())

SELECT window_start, window_end, supplier_id, SUM(price) as price
FROM TABLE(
    TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))
GROUP BY window_start, window_end, ROLLUP (supplier_id);

3.CUBE

CUBE 是一种特定通用类型 Grouping Sets 的简写。代表着指定列表以及所有可能的子集和幂集。

SELECT window_start, window_end, item, supplier_id, SUM(price) as price
  FROM TABLE(
    TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))
  GROUP BY window_start, window_end, CUBE (supplier_id, item);

分组聚合

基本聚合函数

聚合函数根据多个输入行计算单个结果。例如,聚合来计算一组行的COUNT、SUM、AVG、MAX、MIN。

select COUNT(*) from datagen;

SELECT age, COUNT(*) as total FROM datagen GROUP BY age;

DISTINCT

DISTINCT聚合在聚合函数前去掉重复的数据。

计算datagen表中不同 age的数量而不是总行数。

SELECT COUNT(DISTINCT age) FROM datagen;

GROUPING SETS

通过一个标准的 GROUP BY 语句来描述更复杂的分组操作。数据按每个指定的 Grouping Sets 分别分组,并像简单的 group by 子句一样为每个组进行聚合。

SELECT supplier_id, rating, COUNT(*) AS total
FROM (VALUES
    ('supplier1', 'product1', 4),
    ('supplier1', 'product2', 3),
    ('supplier2', 'product3', 3),
    ('supplier2', 'product4', 4))
AS Products(supplier_id, product_id, rating)
ROUPING SETS 的每个子列表可以是:空的,多列或表达式,它们的解释方式和直接使用 GROUP BY 子句是一样的。

一个空的 Grouping Sets 表示所有行都聚合在一个分组下,即使没有数据,也会输出结果

对于 Grouping Sets 中的空子列表,结果数据中的分组或表达式列会用NULL代替

ROLLUP

ROLLUP是一种特定通用类型 Grouping Sets 的简写。代表着指定表达式和所有前缀的列表,包括空列表。

SELECT supplier_id, rating, COUNT(*)
FROM (VALUES
    ('supplier1', 'product1', 4),
    ('supplier1', 'product2', 3),
    ('supplier2', 'product3', 3),
    ('supplier2', 'product4', 4))
AS Products(supplier_id, product_id, rating)
GROUP BY ROLLUP (supplier_id, rating)

CUBE

CUBE是一种特定通用类型 Grouping Sets 的简写。代表着指定列表以及所有可能的子集和幂集

SELECT supplier_id, rating, product_id, COUNT(*)
FROM (VALUES
    ('supplier1', 'product1', 4),
    ('supplier1', 'product2', 3),
    ('supplier2', 'product3', 3),
    ('supplier2', 'product4', 4))
AS Products(supplier_id, product_id, rating)
GROUP BY CUBE (supplier_id, rating, product_id)

HAVING

HAVING 会删除 group 后不符合条件的行。 HAVING 和 WHERE 的不同点:

WHERE在GROUP BY之前过滤单独的数据行

HAVING过滤GROUP BY生成的数据行
SELECT SUM(amount)
FROM Orders
GROUP BY users
HAVING SUM(amount) > 50

OVER聚合

OVER聚合计算一系列有序行上每个输入行的聚合值。与GROUP BY聚合相反,OVER聚合不会将每个组的结果行数减少到一行。相反,OVER聚合会为每个输入行生成聚合值。

OVER窗口的语法如下:

SELECT
  agg_func(agg_col) OVER (
    [PARTITION BY col1[, col2, ...]]
    ORDER BY time_col
    range_definition),
  ...
FROM ...
ORDER BY:必须是时间戳列(事件时间、处理时间),只能升序

PARTITION BY:标识了聚合窗口的聚合粒度

range_definition:这个标识聚合窗口的聚合数据范围,在Flink中有两种指定数据范围的方式。第一种为按照行数聚合,第二种为按照时间区间聚合

ORDER BY

OVER窗口需要数据是有序的。因为表没有固定的排序,所以ORDER BY子句是强制的。对于流式查询,Flink目前只支持 OVER 窗口定义在升序(asc)的 时间属性 上。其他的排序不支持。

PARTITION BY

OVER窗口可以定义在一个分区表上。PARTITION BY子句代表着每行数据只在其所属的数据分区进行聚合。

范围RANGE定义

1.按照时间区间聚合-RANGE间隔

RANGE间隔是定义在排序列值上的,在 Flink 里,排序列总是一个时间属性。

定义了聚合会在比当前行的时间属性小 30 分钟的所有行上进行。

RANGE BETWEEN INTERVAL '30' MINUTE PRECEDING AND CURRENT ROW

查看前5秒到现在收到的水位数据条数

SELECT 
    id, 
    age, 
    et,
    count(age) OVER (
        PARTITION BY id
        ORDER BY et
        RANGE BETWEEN INTERVAL '5' SECOND PRECEDING AND CURRENT ROW
  ) AS cnt
FROM datagen;

用WINDOW子句来在SELECT外部单独定义一个OVER窗口

SELECT 
    id, 
    age, 
    et,
count(age) OVER w AS cnt,
sum(age) OVER w AS sumAge
FROM datagen
WINDOW w AS (
    PARTITION BY id
    ORDER BY et
    RANGE BETWEEN INTERVAL '5' SECOND PRECEDING AND CURRENT ROW
);

2.按照行数聚合-ROW间隔

间隔ROWS是基于计数的间隔。它准确定义了聚合中包含的行数。

ROWS间隔基于计数。它定义了聚合操作包含的精确行数。

定义了当前行 + 之前的 10 行(也就是11行)都会被聚合。

ROWS BETWEEN 10 PRECEDING AND CURRENT ROW
WINDOW

统计当前行之前的5行和当前行(总共6行)的数据。或理解为统计前5条到现在的数据

SELECT 
    id, 
    age, 
    et,
    avg(age) OVER (
      PARTITION BY age
      ORDER BY et
      ROWS BETWEEN 5 PRECEDING AND CURRENT ROW
) AS avgAge
FROM datagen;

用WINDOW子句来在SELECT外部单独定义一个OVER窗口

SELECT 
    id, 
    age, 
    et,
avg(age) OVER w AS avgAge,
count(age) OVER w AS cnt
FROM datagen
WINDOW w AS (
    PARTITION BY age
    ORDER BY et
    ROWS BETWEEN 5 PRECEDING AND CURRENT ROW
);

TOP-N

普通Top-N

Top-N 查询要求按列排序的 N 个最小值或最大值。最小值和最大值集都被视为 Top-N 查询。当需要根据条件仅显示批处理/流式表中的 N 个最底部或 N 个最顶部记录时,前 N 条查询非常有用。

语法:

SELECT [column_list]
FROM (
   SELECT [column_list],
     ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
       ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum
   FROM table_name)
WHERE rownum <= N [AND conditions]

参数说明:

ROW_NUMBER():根据分区内行的顺序,为每行分配一个唯一的连续编号,从 1 开始。目前,我们仅支持ROW_NUMBERover window 功能。今后,我们将一如既往地支持RANK()和支持DENSE_RANK()。

PARTITION BY col1[, col2...]:指定分区列。每个分区都会有一个 Top-N 结果。

ORDER BY col1 [asc|desc][, col2 [asc|desc]...]:指定排序列。不同列上的排序方向可能不同。

WHERE rownum <= N:rownum <= NFlink 需要识别该查询是 Top-N 查询。N 表示将保留的 N 个最小或最大的记录。

[AND conditions]:where 子句中可以自由添加其他条件,但其他条件只能使用rownum <= N连词组合AND。

获取前3名的数据

SELECT * FROM (
  SELECT *,ROW_NUMBER() OVER (PARTITION BY age ORDER BY et DESC) AS row_num
  FROM datagen)
WHERE row_num <=3;

窗口Top-N

窗口Top-N是特殊的 Top-N,它返回每个分区键的每个窗口的N个最小或最大值。

窗口Top-N只在窗口最后返回汇总的Top-N数据,不会产生中间结果。

窗口Top-N 会在窗口结束后清除不需要的中间状态。 因此,窗口Top-N 适用于用户不需要每条数据都更新Top-N结果的场景,相对普通Top-N来说性能更好。通常,窗口 Top-N 直接用于 窗口表值函数上。

窗口Top-N的语法和普通的Top-N 相同,需要PARTITION BY子句包含 窗口表值函数 或 窗口聚合 产生的window_start和window_end。

1.在窗口聚合后进行窗口 Top-N

在10分钟的滚动窗口上计算销售额位列前三的供应商

Flink SQL> SELECT *
  FROM (
    SELECT *, ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY price DESC) as rownum
    FROM (
      SELECT window_start, window_end, supplier_id, SUM(price) as price, COUNT(*) as cnt
      FROM TABLE(
        TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))
      GROUP BY window_start, window_end, supplier_id
    )
  ) WHERE rownum <= 3;

2.在窗口表值函数后进行窗口Top-N

在10分钟的滚动窗口上计算价格位列前三的数据

SELECT *
  FROM (
    SELECT *, ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY price DESC) as rownum
    FROM TABLE(
               TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))
  ) WHERE rownum <= 3;

联结Join查询

内部等连接

内联结用INNER JOIN来定义,会返回两表中符合联接条件的所有行的组合,也就是所谓的笛卡尔积(Cartesian product)。目前仅支持等值联结条件

SELECT * FROM Orders
INNER JOIN Product
ON Orders.productId = Product.id

外部等连接

外联结也会返回符合联结条件的所有行的笛卡尔积;另外,还可以将某一侧表中找不到任何匹配的行也单独返回。

Flink SQL支持左外(LEFT JOIN)、右外(RIGHT JOIN)和全外(FULL OUTER JOIN),分别表示会将左侧表、右侧表以及双侧表中没有任何匹配的行返回。

SELECT * FROM Orders
LEFT JOIN Product
ON Orders.product_id = Product.id

SELECT * FROM Orders
RIGHT JOIN Product
ON Orders.product_id = Product.id

SELECT * FROM Orders
FULL OUTER JOIN Product
ON Orders.product_id = Product.id

间隔联结

返回受连接条件和时间约束限制的简单笛卡尔积。间隔连接至少需要一个等连接谓词和一个限制两侧时间的连接条件。两个适当的范围谓词可以定义这样的条件(<、<=、>=、>)、BETWEEN 谓词或比较两个输入的相同类型的时间属性(即处理时间或事件时间)的单个相等谓词表。

例如,如果订单在收到订单四小时后发货,则此查询会将所有订单与其相应的发货连接起来。

SELECT *
FROM Orders o, Shipments s
WHERE o.id = s.order_id
AND o.order_time BETWEEN s.ship_time - INTERVAL '4' HOUR AND s.ship_ti

集合操作

UNION 和 UNION ALL

UNION:将集合合并并且去重

UNION ALL:将集合合并,不做去重
Flink SQL> create view t1(s) as values ('c'), ('a'), ('b'), ('b'), ('c');
Flink SQL> create view t2(s) as values ('d'), ('e'), ('a'), ('b'), ('b');

Flink SQL> (SELECT s FROM t1) UNION (SELECT s FROM t2);
Flink SQL> (SELECT s FROM t1) UNION ALL (SELECT s FROM t2);

Intersect 和 Intersect All

Intersect:交集并且去重

Intersect ALL:交集不做去重
Flink SQL> (SELECT s FROM t1) INTERSECT (SELECT s FROM t2);

Flink SQL> (SELECT s FROM t1) INTERSECT ALL (SELECT s FROM t2);

Except 和 Except All

Except:差集并且去重

Except ALL:差集不做去重
Flink SQL> (SELECT s FROM t1) EXCEPT (SELECT s FROM t2);

Flink SQL> (SELECT s FROM t1) EXCEPT ALL (SELECT s FROM t2);

In 子查询

如果给定表的子查询中存在表达式,则返回 true。子查询表必须由一列组成。该列必须具有与表达式相同的数据类型。

SELECT user, amount FROM Orders
WHERE product IN (
    SELECT product FROM NewProducts
)

EXISTS

如果子查询返回至少一行,则为 true。只支持能被重写为 join 和 group 的操作。

SELECT user, amount
FROM Orders
WHERE product EXISTS (
    SELECT product FROM NewProducts
)

去重

普通去重

去重是去掉重复的行,只保留第一或者最后一行。

Flink使用ROW_NUMBER()去除重复数据,去重就是Top-N 在 N 为 1 时的特例,并且去重必须要求按照处理或者事件时间排序。

语法如下:

SELECT [column_list]
FROM (
   SELECT [column_list],
     ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
       ORDER BY time_attr [asc|desc]) AS rownum
   FROM table_name)
WHERE rownum = 1

参数说明:

ROW_NUMBER():为每一行分配一个唯一且连续的数字,从 1 开始

PARTITION BY col1[, col2...]:指定分区键,即需要去重的键

ORDER BY time_attr [asc|desc]:指定排序列,必须是 时间属性。目前 Flink 支持 处理时间属性 和 事件时间属性。Order by ASC 保留第一行,DESC 保留最后一行

WHERE rownum = 1:Flink 需要这个条件来识别去重语句

示例:

CREATE TABLE Orders (
  order_id  STRING,
  user        STRING,
  product     STRING,
  num         BIGINT,
  proctime AS PROCTIME()
) WITH (...);

SELECT order_id, user, product, num
FROM (
  SELECT *,
    ROW_NUMBER() OVER (PARTITION BY order_id ORDER BY proctime ASC) AS row_num
  FROM Orders)
WHERE row_num = 1

窗口去重

窗口去重是一种特殊的 去重,它根据指定的多个列来删除重复的行,保留每个窗口和分区键的第一个或最后一个数据。

窗口重复数据删除是一种特殊的重复数据删除,它删除一组列上重复的行,保留每个窗口和分区键的第一个或最后一个。

语法:

SELECT [column_list]
FROM (
   SELECT [column_list],
     ROW_NUMBER() OVER (PARTITION BY window_start, window_end [, col_key1...]
       ORDER BY time_attr [asc|desc]) AS rownum
   FROM table_name) -- relation applied windowing TVF
WHERE (rownum = 1 | rownum <=1 | rownum < 2) [AND conditions]

参数说明:

ROW_NUMBER():为每一行分配一个唯一的连续编号,从 1 开始。

PARTITION BY window_start, window_end [, col_key1...]window_start:指定包含、window_end等分区键的分区列。

ORDER BY time_attr [asc|desc]:指定排序列,必须是时间属性。目前Flink支持处理时间属性和事件时间属性。按 ASC 排序意味着保留第一行,按 DESC 排序意味着保留最后一行。

WHERE (rownum = 1 | rownum <=1 | rownum < 2):rownum = 1 | rownum <=1 | rownum < 2优化器需要识别查询可以转换为窗口重复数据删除。
 SELECT * FROM (
    SELECT id, age, et, pt,
      ROW_NUMBER() OVER (PARTITION BY age ORDER BY id DESC) AS rownum
    FROM datagen )
    WHERE rownum <= 1;

注意:

目前只支持在滚动窗口、滑动窗口和累积窗口的窗口表值函数后进行窗口去重

目前只支持根据事件时间属性进行排序

函数

Flink允许用户在Table API 和 SQL中使用函数进行数据的转换,为用户提供了一组内置的数据转换函数。

系统函数也叫内置函数,是在系统中预先实现好的功能模块。可以通过固定的函数名直接调用,实现想要的转换操作。Flink SQL中的系统函数又主要可以分为两大类:标量函数和聚合函数。

标量函数

标量函数将零、一个或多个值作为输入并返回单个值作为结果。

1.比较函数

比较函数其实就是一个比较表达式,用来判断两个值之间的关系,返回一个布尔类型的值。这个比较表达式可以是用 <、>、= 等符号连接两个值,也可以是用关键字定义的某种判断。

SQL函数 Table函数 描述
value1 = value2 value1 === value2 如果 value1 等于 value2 返回 TRUE;如果 value1 或者 value2 为 NULL 返回 UNKNOW

2.逻辑函数

逻辑函数就是一个逻辑表达式,也就是用与(AND)、或(OR)、非(NOT)将布尔类型的值连接起来,也可以用判断语句(IS、IS NOT)进行真值判断;返回的还是一个布尔类型的值。

SQL函数 Table函数 描述
boolean1 OR boolean2 BOOLEAN1 // BOOLEAN2 如果 boolean1 为 TRUE 或 boolean2 为 TRUE 返回 TRUE。支持三值逻辑。 例如 true

3.算术函数

进行算术计算的函数,包括用算术符号连接的运算,和复杂的数学运算。

SQL函数 Table函数 描述
numeric1 + numeric2 NUMERIC1 + NUMERIC2 返回 numeric1 加 numeric2

4.字符串函数 进行字符串处理的函数

string1 || string2  两个字符串的连接

UPPER(string)  将字符串string转为全部大写

CHAR_LENGTH(string)  计算字符串string的长度
SQL函数 Table函数 描述
UPPER(string) STRING.upperCase() 以大写形式返回字符串

5.时间函数 进行与时间相关操作的函数

DATE string  按格式"yyyy-MM-dd"解析字符串string,返回类型为SQL Date

TIMESTAMP string  按格式"yyyy-MM-dd HH:mm:ss[.SSS]"解析,返回类型为SQL timestamp

CURRENT_TIME  返回本地时区的当前时间,类型为SQL time(与LOCALTIME等价)

INTERVAL string range  返回一个时间间隔。
SQL函数 Table函数 描述
DATE string STRING.toDate() 以“yyyy-MM-dd”的形式返回从字符串解析的 SQL 日期

6.其他:

更多类型的标量函数参考:官方文档-标量函数

聚合函数

聚合函数是以表中多个行作为输入,提取字段进行聚合操作的函数,会将唯一的聚合值作为结果返回。

聚合函数应用非常广泛,不论分组聚合、窗口聚合还是开窗(Over)聚合,对数据的聚合操作都可以用相同的函数来定义。

COUNT(*)  返回所有行的数量,统计个数。

SUM([ ALL | DISTINCT ] expression)  对某个字段进行求和操作。默认情况下省略了关键字ALL,表示对所有行求和;如果指定DISTINCT,则会对数据进行去重,每个值只叠加一次。

RANK()   返回当前值在一组值中的排名。

ROW_NUMBER()    对一组值排序后,返回当前值的行号

RANK()和ROW_NUMBER()一般用在OVER窗口中
SQL函数 Table函数 描述
COUNT(*) COUNT(1) FIELD.count 返回输入行数

自定义函数

自定义函数(UDF)是一种扩展开发机制,可以用来在查询语句里调用难以用其他方式表达的频繁使用或自定义的逻辑。


// 定义函数逻辑
public static class SubstringFunction extends ScalarFunction {
  public String eval(String s, Integer begin, Integer end) {
    return s.substring(begin, end);
  }
}

TableEnvironment env = TableEnvironment.create(...);

// 在 Table API 里不经注册直接“内联”调用函数
env.from("MyTable").select(call(SubstringFunction.class, $("myField"), 5, 12));

// 注册函数
env.createTemporarySystemFunction("SubstringFunction", SubstringFunction.class);

// 在 Table API 里调用注册好的函数
env.from("MyTable").select(call("SubstringFunction", $("myField"), 5, 12));

// 在 SQL 里调用注册好的函数
env.sqlQuery("SELECT SubstringFunction(myField, 5, 12) FROM MyTable");

Module

概述

Module允许Flink扩展函数能力。它是可插拔的

Flink 包含了以下三种Module:

1.核心模块

Flink内置的Module,其包含了目前Flink内置的所有UDF,Flink默认开启的Module就是CoreModule,可以直接使用其中的UDF

2.Hive模块

可以将Hive内置函数作为Flink的系统函数提供给SQL\Table API用户进行使用,比如get_json_object 这类Hive内置函数(Flink 默认的 CoreModule 是没有的)

3.用户自定义Module:

用户可以实现Module接口实现自己的UDF扩展 Module

在 Flink 中,Module可以被 加载、启用 、禁用 、卸载,当加载Module 之后,默认就是开启的。

操作命令

# 查看
SHOW MODULES;
SHOW FULL MODULES;

# 加载
LOAD MODULE module_name [WITH ('key1' = 'val1', 'key2' = 'val2', ...)]

# 启用module,没有被use的为禁用
USE MODULES module_name

# 卸载
UNLOAD MODULE module_name 

操作可以同时支持多个Module,并且根据加载Module的顺序去按顺序查找和解析UDF,先查到的先解析使用。

Flink只会解析已经启用的Module,当两个Module中出现两个同名的函数且都启用时, Flink会根据加载Module的顺序进行解析,结果就是会使用顺序为第一个Module的UDF

可以更改顺序

# 将Hive Module设为第一个使用及解析的Module
USE MODULE hive,core;

加载Hive Module

加载官方已经提供的的 Hive Module,将Hive已有的内置函数作为 Flink 的内置函数

1.下载:flink-sql-connector-hive

2.上传jar包到flink的lib

cp flink-sql-connector-hive-2.3.9_2.12-1.17.0.jar /usr/local/program/flink/lib/

注意:拷贝hadoop的包,解决依赖冲突问题

cp /usr/local/program/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-client-core-3.3.4.jar /usr/local/program/flink/lib/

3.重启flink集群和sql-client

# 基于独立模式的会话模式部署
./bin/start-cluster.sh

# SQL客户端默认使用embedded模式
./bin/sql-client.sh

4.加载hive module

-- hive-connector内置了hive module,提供了hive自带的系统函数
load module hive with ('hive-version'='3.1.3');
show modules;
show functions;

-- 可以调用hive的split函数
select split('a,b', ',');
【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月22日 0

暂无评论

推荐阅读
  KRe60ogUm4le   18天前   31   0   0 javascala
GLcPk8bYyq2p
最新推荐 更多

2024-05-03