21、Flink 的table API与DataStream API 集成(1)- 介绍及入门示例、集成说明
  nNPyvzOmRTFq 2023年12月11日 16 0




文章目录

  • Flink 系列文章
  • 一、Table API 与 DataStream API集成
  • 1、概述
  • 2、 DataStream 和 Table 相互转换示例
  • 1)、示例1 - toDataStream
  • 2)、示例2 - toChangelogStream
  • 3)、示例3 - 通过仅切换标志来处理批处理和流数据
  • 3、集成说明
  • 1)、maven依赖
  • 2)、import
  • 3)、Configuration
  • 4)、执行行为
  • 1、DataStream API
  • 2、Table API



本文是Flink table api 与 datastream api的集成的第一篇,主要介绍了集成的概述、table api 与 datastream api相互转换的三个示例以及其集成的说明(即maven依赖、import、配置以及执行行为),并以具体的示例进行说明。
本文依赖flink、kafka集群能正常使用。
本文分为3个部分,即集成概述、三个入门示例、集成说明。
本文的示例是在Flink 1.17版本中运行。

一、Table API 与 DataStream API集成

1、概述

在定义数据处理管道时,Table API和DataStream API同样重要。

DataStream API在一个相对低级的命令式编程API中提供流处理的原语(即时间、状态和数据流管理)。Table API抽象了许多内部构件,并提供了结构化和声明性API。

这两个API都可以处理有界和无界流。

在处理历史数据时,需要管理有界流。无界流发生在实时处理场景中,这些场景可能先使用历史数据进行初始化。

为了有效执行,这两个API都以优化的批处理执行模式提供处理有界流。然而,由于批处理只是流的一种特殊情况,因此也可以在常规流执行模式下运行有界流的管道。

一个API中的管道可以端到端定义,而不依赖于另一个API。然而,出于各种原因,混合这两种API可能是有用的:

  • 在DataStream API中实现主管道(main pipeline)之前,使用表生态系统(table ecosystem)轻松访问目录(catalogs )或连接到外部系统。
  • 在DataStream API中实现主管道之前,访问一些SQL函数以进行无状态数据规范化和清理。
  • 如果table API中不存在更低级的操作(例如自定义计时器处理),则不时切换到DataStream API。

Flink提供了特殊的桥接功能,以使与DataStream API的集成尽可能顺利。

在DataStream 和Table API之间切换会增加一些转换开销。例如,部分处理二进制数据的表运行时(即RowData)的内部数据结构需要转换为更用户友好的数据结构(即Row)。通常,这个开销可以忽略。

  • maven依赖
    本篇文章,如果没有特殊说明,将使用如下maven依赖
<properties>
		<encoding>UTF-8</encoding>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<maven.compiler.source>1.8</maven.compiler.source>
		<maven.compiler.target>1.8</maven.compiler.target>
		<java.version>1.8</java.version>
		<scala.version>2.12</scala.version>
		<flink.version>1.17.0</flink.version>
	</properties>

	<dependencies>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-clients</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-java</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-table-common</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-streaming-java</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>

		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-table-api-java-bridge</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-sql-gateway</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>

		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-csv</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-json</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>

		<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner -->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-table-planner_2.12</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>
		<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java-uber -->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-table-api-java-uber</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-table-runtime</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>

		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-jdbc</artifactId>
			<version>3.1.0-1.17</version>
		</dependency>
		<dependency>
			<groupId>mysql</groupId>
			<artifactId>mysql-connector-java</artifactId>
			<version>5.1.38</version>
		</dependency>
		<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-hive -->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-hive_2.12</artifactId>
			<version>1.17.0</version>
		</dependency>
		<dependency>
			<groupId>org.apache.hive</groupId>
			<artifactId>hive-exec</artifactId>
			<version>3.1.2</version>
		</dependency>
		<!-- flink连接器 -->
		<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka -->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-kafka</artifactId>
			<version>${flink.version}</version>
		</dependency>

		<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-sql-connector-kafka -->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-sql-connector-kafka</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>
		<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-compress -->
		<dependency>
			<groupId>org.apache.commons</groupId>
			<artifactId>commons-compress</artifactId>
			<version>1.24.0</version>
		</dependency>
		<dependency>
			<groupId>org.projectlombok</groupId>
			<artifactId>lombok</artifactId>
			<version>1.18.2</version>
			<!-- <scope>provided</scope> -->
		</dependency>
	</dependencies>

2、 DataStream 和 Table 相互转换示例

Flink提供了专门的StreamTableEnvironment,用于与DataStream API集成。这些环境使用其他方法扩展常规TableEnvironment,并将DataStream API中使用的StreamExecutionEnvironments作为参数。

1)、示例1 - toDataStream

下面的代码展示了如何在两个API之间来回切换的示例。表的列名和类型自动从DataStream的TypeInformation派生。由于DataStream API本机不支持变更日志处理,因此代码假设在流到表和表到流转换期间仅附加/仅插入语义。

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

/**
 * @author alanchan
 *
 */
public class ConvertingDataStreamAndTableDemo {

	/**
	 * @param args
	 * @throws Exception
	 */
	public static void main(String[] args) throws Exception {
		// 1、创建运行环境
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		StreamTableEnvironment tenv = StreamTableEnvironment.create(env);

		// 2、创建输入流
		DataStream<String> dataStream = env.fromElements("alan", "alanchan", "alanchanchn");

		// 3、将datastream 转为 table
		Table inputTable = tenv.fromDataStream(dataStream);

		// 4、创建视图,该步骤不是必须,将姓名转为大写
		tenv.createTemporaryView("InputTable", inputTable);
		Table resultTable = tenv.sqlQuery("SELECT UPPER(f0) FROM InputTable");

		// 5、将table转成datastream进行输出
		DataStream<Row> resultStream = tenv.toDataStream(resultTable);

		resultStream.print();
		env.execute();
	}

}
  • 示例输出
12> +I[ALAN]
14> +I[ALANCHANCHN]
13> +I[ALANCHAN]

fromDataStream和toDataStream的完整语义可以在下面的部分中找到。特别是,本节讨论了如何使用更复杂的嵌套类型来影响模式派生。它还包括使用事件时间和水印。

根据查询的类型,在许多情况下,生成的动态表是一个管道,它不仅在将表转换为数据流时产生仅插入的更改,而且还产生收回和其他类型的更新。在表到流转换期间,这可能会导致类似于以下内容的异常

Table sink 'Unregistered_DataStream_Sink_1' doesn't support consuming update changes [...].

在这种情况下,需要再次修改查询或切换到ChangelogStream。

2)、示例2 - toChangelogStream

下面的示例显示如何转换更新表。
每个结果行表示更改日志中的一个条目,该条目具有更改标志,可以通过对其调用row.getKind()来查询。在本例中,alan的第二个分数在更改之前(-U)创建更新,在更改之后(+U)创建更新。

本示例仅仅以一个方法来展示,避免没有必要的代码,运行框架参考上述示例。

public static void test2() throws Exception {
		// 1、创建运行环境
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		StreamTableEnvironment tenv = StreamTableEnvironment.create(env);

		// 2、创建输入流
		DataStream<Row> dataStream = env.fromElements(Row.of("alan", 18), Row.of("alanchan", 19), Row.of("alanchanchn", 20), Row.of("alan", 20));

		// 3、将datastream 转为 table
		Table inputTable = tenv.fromDataStream(dataStream).as("name", "salary");

		// 4、创建视图,该步骤不是必须
		tenv.createTemporaryView("InputTable", inputTable);
		Table resultTable = tenv.sqlQuery("SELECT name, SUM(salary) FROM InputTable GROUP BY name");

		// 5、将table转成datastream进行输出
		DataStream<Row> resultStream = tenv.toChangelogStream(resultTable);

		resultStream.print();
		env.execute();
	}
  • 运行结果
2> +I[alan, 18]
16> +I[alanchan, 19]
16> +I[alanchanchn, 20]
2> -U[alan, 18]
2> +U[alan, 38]

fromChangelogStream和toChangelogStream的完整语义可以在下面的部分中找到。特别是,本节讨论了如何使用更复杂的嵌套类型来影响模式派生。它包括使用事件时间和水印。它讨论了如何为输入和输出流声明主键和变更日志模式。

上面的示例显示了如何通过为每个传入记录连续发出逐行更新来增量计算最终结果。然而,在输入流有限(即有界)的情况下,通过利用批处理原理可以更有效地计算结果。

在批处理中,可以在连续的阶段中执行运算符,这些阶段在发出结果之前使用整个输入表。例如,连接操作符可以在执行实际连接之前对两个有界输入进行排序(即排序合并连接算法),或者在使用另一个输入之前从一个输入构建哈希表(即哈希连接算法的构建/探测阶段)。

DataStream API和Table API都提供专门的批处理运行时模式。

3)、示例3 - 通过仅切换标志来处理批处理和流数据

下面的示例说明了统一管道能够通过仅切换标志来处理批处理和流数据。

public static void test3() throws Exception {
		// 1、创建运行环境
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setRuntimeMode(RuntimeExecutionMode.BATCH);
		StreamTableEnvironment tenv = StreamTableEnvironment.create(env);

		// 2、创建输入流
		DataStream<Row> dataStream = env.fromElements(Row.of("alan", 18), Row.of("alanchan", 19), Row.of("alanchanchn", 20), Row.of("alan", 20));

		// 3、将datastream 转为 table
		Table inputTable = tenv.fromDataStream(dataStream).as("name", "salary");

		// 4、创建视图,该步骤不是必须
		tenv.createTemporaryView("InputTable", inputTable);
		Table resultTable = tenv.sqlQuery("SELECT name, SUM(salary) FROM InputTable GROUP BY name");

		// 5、将table转成datastream进行输出
		DataStream<Row> resultStream = tenv.toChangelogStream(resultTable);

		resultStream.print();
		
		env.execute();
	}
  • 运行结果

注意比较和示例2的输出区别

+I[alanchan, 19]
+I[alan, 38]
+I[alanchanchn, 20]

一旦将changelog 应用于外部系统(例如键值存储),可以看到两种模式都能够产生完全相同的输出表。通过在发出结果之前使用所有输入数据,批处理模式的更改日志仅由仅插入的更改组成。有关更多细节,请参阅下面的专用批处理模式部分。

3、集成说明

将Table API与DataStream API相结合的项目需要添加以下桥接模块之一。
它们包括对 flink-table-api-java或flink-table-api-scala的可传递依赖性,以及相应的特定于语言的DataStream api模块。

1)、maven依赖

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-api-java-bridge_2.12</artifactId>
  <version>1.17.1</version>
  <scope>provided</scope>
</dependency>

2)、import

使用DataStream API和Table API的Java或Scala版本声明公共管道需要以下导入。

// imports for Java DataStream API
import org.apache.flink.streaming.api.*;
import org.apache.flink.streaming.api.environment.*;

// imports for Table API with bridging to Java DataStream API
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.bridge.java.*;

3)、Configuration

TableEnvironment将采用传递的StreamExecutionEnvironment.中的所有配置选项。然而,不能保证对StreamExecutionEnvironment配置的进一步更改在实例化后传播到StreamTableEnvironment。在规划期间,将选项从Table API传播到DataStream API。

我们建议在切换到Table API之前尽早在DataStream API中设置所有配置选项。

import java.time.ZoneId;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

// create Java DataStream API
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// set various configuration early

env.setMaxParallelism(256);
env.getConfig().addDefaultKryoSerializer(MyCustomType.class, CustomKryoSerializer.class);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

// then switch to Java Table API
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

// set configuration early
tableEnv.getConfig().setLocalTimeZone(ZoneId.of("Europe/Berlin"));

// start defining your pipelines in both APIs...

4)、执行行为

这两个API都提供了执行管道的方法。换句话说:如果被请求,它们将编译一个作业图( job graph),该作业图将提交到集群并触发以执行。结果将流式传输到声明的sinks。

通常,这两个API都在方法名称中使用术语“执行”来标记这种行为。然而,Table API和DataStream API之间的执行行为略有不同。

1、DataStream API

DataStream API的StreamExecutionEnvironment使用生成器模式(builder pattern)来构造复杂的管道。管道可能会拆分为多个分支,这些分支可能以sink结尾,也可能不以sink结尾。环境缓冲(environment buffers)所有这些定义的分支,直到提交作业。

StreamExecutionEnvironment.execute()提交整个构建的管道,然后清除构建器。换句话说:不再声明sources 和sinks ,并且可以向生成器中添加新的管道。因此,每个DataStream程序通常以对StreamExecutionEnvironment.execute()的调用结束。或者,DataStream.executeAndCollect()隐式定义了一个sink,用于将结果流式传输到本地客户端。

2、Table API

在Table API中,分支管道仅在StatementSet中受支持,其中每个分支必须声明一个最终sink。TableEnvironment和StreamTableEnvironment都不提供专用的通用execute()方法。相反,它们提供了提交单个source-to-sink管道或语句集的方法:

final static String sinkSQL = "CREATE TABLE OutputTable (\n" +
			" userId INT,\r\n" + 
			" age INT,\r\n" + 
			" balance DOUBLE,\r\n" + 
			" userName STRING,\r\n" +
			" t_insert_time TIMESTAMP(3)\r\n" +
            ") WITH (\n" +
            "  'connector' = 'print'\n" +
            ")";
	
	final static String sinkSQL2 = "CREATE TABLE OutputTable2 (\n" +
			" userId INT,\r\n" + 
			" age INT,\r\n" + 
			" balance DOUBLE,\r\n" + 
			" userName STRING,\r\n" +
			" t_insert_time TIMESTAMP(3)\r\n" +
            ") WITH (\n" +
            "  'connector' = 'print'\n" +
            ")";
	
	final static String sourceSQL = "CREATE TABLE InputTable (\r\n" + 
			" userId INT,\r\n" + 
			" age INT,\r\n" + 
			" balance DOUBLE,\r\n" + 
			" userName STRING,\r\n" + 
			" t_insert_time AS localtimestamp,\r\n" + 
			" WATERMARK FOR t_insert_time AS t_insert_time\r\n" + 
			") WITH (\r\n" + 
			" 'connector' = 'datagen',\r\n" + 
			" 'rows-per-second'='10',\r\n" + 
			" 'fields.userId.kind'='sequence',\r\n" + 
			" 'fields.userId.start'='1',\r\n" + 
			" 'fields.userId.end'='20',\r\n" + 
			" 'fields.balance.kind'='random',\r\n" + 
			" 'fields.balance.min'='1',\r\n" + 
			" 'fields.balance.max'='100',\r\n" + 
			" 'fields.age.min'='1',\r\n" + 
			" 'fields.age.max'='100',\r\n" + 
			" 'fields.userName.length'='6'\r\n" + 
			");";

public static void test4() throws Exception {
		// 1、创建运行环境
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
		//sinkSQL
		//sourceSQL
		// 建表
		tenv.executeSql(sourceSQL);
		//
		tenv.executeSql(sinkSQL);
		tenv.executeSql(sinkSQL2);
		
		//插入表数据,方式一
		tenv.from("InputTable").insertInto("OutputTable").execute();
		tenv.executeSql("select * from OutputTable");
		tenv.from("InputTable").execute().print();
		
		//插入表数据,方式二
		tenv.executeSql("INSERT INTO OutputTable SELECT * FROM InputTable");
		tenv.executeSql("select * from OutputTable");
		
		//插入表数据,方式三
		tenv.createStatementSet()
				.addInsertSql("INSERT INTO OutputTable SELECT * FROM InputTable")
				.addInsertSql("INSERT INTO OutputTable2 SELECT * FROM InputTable")
				.execute();
		// 输出
		tenv.from("InputTable").execute().print();
		tenv.executeSql("SELECT * FROM InputTable").print();
		
		env.execute();
	}
  • 输出结果
3> +I[3, 99, 36.20987556045243, d23888, 2023-11-13T14:49:58.812]
15> +I[15, 39, 68.30743253178122, 43bec8, 2023-11-13T14:49:58.812]
2> +I[2, 62, 47.280395949976885, 7bae4e, 2023-11-13T14:49:58.812]
16> +I[16, 52, 42.10205629532836, 6baf0e, 2023-11-13T14:49:58.812]
10> +I[10, 25, 58.008035887440094, d43dea, 2023-11-13T14:49:58.812]
13> +I[13, 36, 70.9215559827798, 01bb28, 2023-11-13T14:49:58.812]
12> +I[12, 38, 30.31004698340413, 322ba8, 2023-11-13T14:49:58.812]
6> +I[6, 17, 32.28909358733212, 13bf88, 2023-11-13T14:49:58.812]
9> +I[9, 49, 44.52802246768357, e8280c, 2023-11-13T14:49:58.812]
8> +I[8, 80, 18.03487847824154, 803b2a, 2023-11-13T14:49:58.812]
5> +I[5, 61, 54.43695775227862, 063f08, 2023-11-13T14:49:58.812]
7> +I[7, 64, 33.886576642098404, 443dea, 2023-11-13T14:49:58.812]
14> +I[14, 92, 63.71527772015468, 123848, 2023-11-13T14:49:58.812]
11> +I[11, 22, 30.745102844313315, e62848, 2023-11-13T14:49:58.812]
4> +I[4, 78, 88.60724929598506, 55bca8, 2023-11-13T14:49:58.812]
1> +I[1, 82, 62.50149215989057, 0bba0c, 2023-11-13T14:49:58.812]
3> +I[19, 67, 14.244993215937432, e6c911, 2023-11-13T14:49:59.806]
1> +I[17, 67, 91.05078612782468, 560b6c, 2023-11-13T14:49:59.807]
4> +I[20, 95, 82.12047947156385, 1ac5b2, 2023-11-13T14:49:59.807]
2> +I[18, 81, 25.384055001988084, fe98d1, 2023-11-13T14:49:59.806]
+----+-------------+-------------+--------------------------------+--------------------------------+-------------------------+
| op |      userId |         age |                        balance |                       userName |           t_insert_time |
+----+-------------+-------------+--------------------------------+--------------------------------+-------------------------+
| +I |           1 |          91 |             22.629318048042723 |                         923e08 | 2023-11-13 14:49:59.800 |
| +I |           2 |          67 |              75.26915785038814 |                         342baa | 2023-11-13 14:49:59.803 |
| +I |           3 |          68 |              74.06076023217011 |                         1dbbce | 2023-11-13 14:49:59.803 |
| +I |           4 |          26 |              79.47471729272772 |                         083e2e | 2023-11-13 14:49:59.802 |
| +I |           5 |          97 |              82.56249330491859 |                         4a3c6e | 2023-11-13 14:49:59.804 |
| +I |           6 |          32 |              81.74903214944425 |                         fdac4e | 2023-11-13 14:49:59.800 |
| +I |           7 |          67 |              94.80154136831771 |                         f7acea | 2023-11-13 14:49:59.800 |
| +I |           8 |          53 |              50.85073238739004 |                         cfbd0c | 2023-11-13 14:49:59.800 |
| +I |           9 |          69 |              93.64054547476522 |                         7fa9ec | 2023-11-13 14:49:59.801 |
| +I |          10 |          66 |              61.92366658766452 |                         05b86a | 2023-11-13 14:49:59.803 |
| +I |          11 |          81 |              95.61717698776191 |                         efa8ce | 2023-11-13 14:49:59.797 |
| +I |          12 |           8 |             63.573174957723076 |                         0fbfec | 2023-11-13 14:49:59.802 |
| +I |          13 |          85 |             52.938510850778734 |                         43bfa8 | 2023-11-13 14:49:59.803 |
| +I |          14 |          26 |              5.130287258770441 |                         083c6c | 2023-11-13 14:49:59.797 |
| +I |          15 |          35 |               73.3318749510538 |                         0e3b4c | 2023-11-13 14:49:59.802 |
| +I |          16 |          84 |              16.24326410122912 |                         ac2d6e | 2023-11-13 14:49:59.802 |
| +I |          18 |          41 |              32.38455189801736 |                         b07afb | 2023-11-13 14:50:00.804 |
| +I |          19 |          24 |               77.6947569111452 |                         7f72ac | 2023-11-13 14:50:00.803 |
| +I |          20 |          92 |              82.53929937026987 |                         051fb9 | 2023-11-13 14:50:00.802 |
| +I |          17 |          93 |             12.784194121509948 |                         bce5d9 | 2023-11-13 14:50:00.801 |
+----+-------------+-------------+--------------------------------+--------------------------------+-------------------------+
20 rows in set

为了组合这两种执行行为,对StreamTableEnvironment.toDataStream或StreamTableEnviron.toChangelogStream的每次调用都将具体化(materialize )(即编译)Table API子管道(sub-pipeline),并将其插入DataStream API管道生成器(builder)中。这意味着之后必须调用StreamExecutionEnvironment.execute()或DataStream.executeAndCollect。Table API中的执行不会触发这些“外部部件(external parts)”。

// adds a branch with a printing sink to the StreamExecutionEnvironment
tableEnv.toDataStream(table).print();

// (2)

// executes a Table API end-to-end pipeline as a Flink job and prints locally,
// thus (1) has still not been executed
table.execute().print();

// executes the DataStream API pipeline with the sink defined in (1) as a
// Flink job, (2) was already running before
env.execute();

上述示例中有具体应用。

以上,本文是Flink table api 与 datastream api的集成的第一篇,主要介绍了集成的概述、table api 与 datastream api相互转换的三个示例以及其集成的说明(即maven依赖、import、配置以及执行行为),并以具体的示例进行说明。


【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年12月11日 0

暂无评论

推荐阅读
nNPyvzOmRTFq