【flink番外篇】4、flink的sink(内置、mysql、kafka、redis、clickhouse、分布式缓存、广播变量)介绍及示例(1) - File、Socket、console
  nNPyvzOmRTFq 2023年12月23日 47 0

Flink 系列文章

一、Flink 专栏

Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。

  • 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。

  • 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。

  • 3、Flik Table API和SQL基础系列 本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。

  • 4、Flik Table API和SQL提高与应用系列 本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。

  • 5、Flink 监控系列 本部分和实际的运维、监控工作相关。

二、Flink 示例专栏

Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。

两专栏的所有文章入口点击:Flink 系列文章汇总索引


(文章目录)


本文介绍了Flink sink结果至console、文件和socket几端中,具体以实际情况为准。

如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。

本文除了maven依赖外,没有其他依赖。

本专题分为以下几篇文章: 【flink番外篇】4、flink的sink(内置、mysql、kafka、redis、clickhouse、分布式缓存、广播变量)介绍及示例(1) - File、Socket、console 【flink番外篇】4、flink的sink(内置、mysql、kafka、redis、clickhouse、分布式缓存、广播变量)介绍及示例(2) - jdbc/mysql 【flink番外篇】4、flink的sink(内置、mysql、kafka、redis、clickhouse、分布式缓存、广播变量)介绍及示例(3) - redis 【flink番外篇】4、flink的sink(内置、mysql、kafka、redis、clickhouse、分布式缓存、广播变量)介绍及示例(4) - clickhouse 【flink番外篇】4、flink的sink(内置、mysql、kafka、redis、clickhouse、分布式缓存、广播变量)介绍及示例(5) - kafka 【flink番外篇】4、flink的sink(内置、mysql、kafka、redis、clickhouse、分布式缓存、广播变量)介绍及示例(6) - 分布式缓存 【flink番外篇】4、flink的sink(内置、mysql、kafka、redis、clickhouse、分布式缓存、广播变量)介绍及示例(7) - 广播变量 【flink番外篇】4、flink的sink(内置、mysql、kafka、redis、clickhouse、分布式缓存、广播变量)介绍及示例(8) - 完整版

一、maven依赖

<properties>
	<encoding>UTF-8</encoding>
	<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
	<maven.compiler.source>1.8</maven.compiler.source>
	<maven.compiler.target>1.8</maven.compiler.target>
	<java.version>1.8</java.version>
	<scala.version>2.12</scala.version>
	<flink.version>1.17.0</flink.version>
</properties>

<dependencies>
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-clients</artifactId>
		<version>${flink.version}</version>
		<scope>provided</scope>
	</dependency>
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-java</artifactId>
		<version>${flink.version}</version>
		<scope>provided</scope>
	</dependency>
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-streaming-java</artifactId>
		<version>${flink.version}</version>
		<scope>provided</scope>
	</dependency>
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-csv</artifactId>
		<version>${flink.version}</version>
		<scope>provided</scope>
	</dependency>
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-json</artifactId>
		<version>${flink.version}</version>
		<scope>provided</scope>
	</dependency>
	<dependency>
		<groupId>org.apache.hadoop</groupId>
		<artifactId>hadoop-common</artifactId>
		<version>3.1.4</version>
	</dependency>
	<dependency>
		<groupId>org.apache.hadoop</groupId>
		<artifactId>hadoop-client</artifactId>
		<version>3.1.4</version>
	</dependency>
	<dependency>
		<groupId>org.apache.hadoop</groupId>
		<artifactId>hadoop-hdfs</artifactId>
		<version>3.1.4</version>
	</dependency>
</dependencies>

二、Flink sink介绍

Data sinks 使用 DataStream 并将它们转发到文件、套接字、外部系统或打印它们。

Flink 自带了多种内置的输出格式,这些格式相关的实现封装在 DataStreams 的算子里:

  • writeAsText() / TextOutputFormat - 将元素按行写成字符串。通过调用每个元素的 toString() 方法获得字符串。
  • writeAsCsv(…) / CsvOutputFormat - 将元组写成逗号分隔值文件。行和字段的分隔符是可配置的。每个字段的值来自对象的 toString() 方法。
  • print() / printToErr() - 在标准输出/标准错误流上打印每个元素的 toString() 值。 可选地,可以提供一个前缀(msg)附加到输出。这有助于区分不同的 print 调用。如果并行度大于1,输出结果将附带输出任务标识符的前缀。
  • writeUsingOutputFormat() / FileOutputFormat - 自定义文件输出的方法和基类。支持自定义 object 到 byte 的转换。
  • writeToSocket - 根据 SerializationSchema 将元素写入套接字。
  • addSink - 调用自定义 sink function。Flink 捆绑了连接到其他系统(例如 Apache Kafka)的连接器,这些连接器被实现为 sink functions。

注意,DataStream 的 write*() 方法主要用于调试目的。它们不参与 Flink 的 checkpointing,这意味着这些函数通常具有至少有一次语义。刷新到目标系统的数据取决于 OutputFormat 的实现。这意味着并非所有发送到 OutputFormat 的元素都会立即显示在目标系统中。此外,在失败的情况下,这些记录可能会丢失。

为了将流可靠地、精准一次地传输到文件系统中,请使用 FileSink。此外,通过 .addSink(…) 方法调用的自定义实现也可以参与 Flink 的 checkpointing,以实现精准一次的语义。

三、sink 到文件、console示例

1、console输出

该种方法比较简单,直接调用print方法即可。

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @author alanchan
 *
 */
public class TestSinkFileDemo {

	/**
	 * @param args
	 * @throws Exception
	 */
	public static void main(String[] args) throws Exception {
		// env
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

		// source
		DataStream<String> ds = env.fromElements("i am alanchan", "i like flink");
		System.setProperty("HADOOP_USER_NAME", "alanchan");

		// transformation

		// sink
		ds.print();
		ds.print("输出标识");
		//		i like flink
		//		i am alanchan
		//		输出标识> i like flink
		//		输出标识> i am alanchan
		
		// execute
		env.execute();

	}

}

2、sink到文件

1)、sink txt文件到hdfs上

sink到本地是一样的,不再赘述。

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @author alanchan
 *
 */
public class TestSinkFileDemo {

	/**
	 * @param args
	 * @throws Exception
	 */
	public static void main(String[] args) throws Exception {
		// env
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

		// source
		DataStream<String> ds = env.fromElements("i am alanchan", "i like flink");
		System.setProperty("HADOOP_USER_NAME", "alanchan");

		// transformation

		// sink
		ds.print();
		ds.print("输出标识");
//		i like flink
//		i am alanchan
//		输出标识> i like flink
//		输出标识> i am alanchan
		
		// 并行度与写出的文件个数有关,一个并行度写一个文件,多个并行度写多个文件
		//如果并行度是1的话,该路径是一个文件名称;
		//如果并行度大于1的话,该路径是一个文件夹,文件夹下的文件名是并行度的数字,并行度为2,该文件夹下会创建名称为1和2的两个文件
		//运行下面的示例时,hdfs上不能存在该文件,也即下面两行代码运行时需要注释一行
		ds.writeAsText("hdfs://server2:8020///flinktest/sinktest/words").setParallelism(2);		
		ds.writeAsText("hdfs://server2:8020///flinktest/sinktest/words").setParallelism(1);
		
		// execute
		env.execute();

	}

}

  • 并行度为1的运行结果如下 在这里插入图片描述
  • 并行度为2的运行结果如下 在这里插入图片描述

2)、sink csv文件到本地

该方法中只能使用tuple格式组织数据

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FileSystem.WriteMode;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @author alanchan
 *
 */
public class TestSinkFileDemo {
	public static final String DEFAULT_LINE_DELIMITER = "\n";
	public static final String DEFAULT_FIELD_DELIMITER = ",";

	public static void main(String[] args) throws Exception {
		// env
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

		// source
		DataStream<Tuple2<Integer, String>> ds = env.fromElements(
						Tuple2.of(1, "i am alanchan"), 
						Tuple2.of(2, "i like flink"), 
						Tuple2.of(3, "i like hadoop too"),
						Tuple2.of(4, "你呢?")
					);

		// transformation

		// sink
		String file = "D:\\workspace\\flink1.17-java\\testdatadir\\csvfile.csv";

		// 使用默认的其他三个参数
//		ds.writeAsCsv(file);
//		ds.writeAsCsv(file, WriteMode.OVERWRITE);
		ds.writeAsCsv(file, WriteMode.OVERWRITE, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER);

		// execute
		env.execute();
	}

}

  • 输出文件 在这里插入图片描述

在这里插入图片描述

3)、sink text文件到hdfs上(writeUsingOutputFormat)

输出格式Flink 1.17版本实现的如下 在这里插入图片描述

FileOutputFormat系统已经实现了几种格式,如下 在这里插入图片描述

完整代码如下

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.io.FileOutputFormat.OutputDirectoryMode;
import org.apache.flink.api.java.io.CsvOutputFormat;
import org.apache.flink.api.java.io.TextOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FileSystem.WriteMode;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @author alanchan
 *
 */
public class TestSinkFileDemo {

	public static void main(String[] args) throws Exception {
		// env
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

		// source
		DataStream<String> ds = env.fromElements("i am alanchan", "i like flink");
		System.setProperty("HADOOP_USER_NAME", "alanchan");

		// transformation

		// sink
		TextOutputFormat format = new TextOutputFormat(new Path("hdfs://server1:8020///flinktest/sinktest/foramts"), "UTF-8");
//		CsvOutputFormat format_csv = new CsvOutputFormat(new Path());
		
		format.setOutputDirectoryMode(OutputDirectoryMode.ALWAYS);
		format.setWriteMode(WriteMode.OVERWRITE);
		ds.writeUsingOutputFormat(format);

		// execute
		env.execute();
	}

}

  • 程序运行后,hdfs文件输出结果 在这里插入图片描述

四、sink到socket示例(writeToSocket)

本示例比较简单,没有太多需要说明的地方。 下图是系统内置实现的序列化类,根据自己的需要选择。 在这里插入图片描述 完整代码实现

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.formats.json.JsonSerializationSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

/**
 * @author alanchan
 *
 */
public class TestSinkSocketDemo {

	/**
	 * @param args
	 * @throws Exception
	 */
	public static void main(String[] args) throws Exception {
		// env
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

		// source
		DataStream<String> ds = env.fromElements("i am alanchan", "i like flink");

		// transformation

		// sink
		// ip、端口、序列化
		ds.writeToSocket("192.168.10.42", 9999, new SimpleStringSchema());

		// execute
		env.execute();

	}

}

  • 验证

1、先开通9999端口,本示例是通过nc -lk 9999 来开启的 2、启动应用程序 3、观察9999端口的输出 在这里插入图片描述

以上,本文介绍了Flink sink结果至console、文件和socket几端中,具体以实际情况为准。

如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。

本专题分为以下几篇文章: 【flink番外篇】4、flink的sink(内置、mysql、kafka、redis、clickhouse、分布式缓存、广播变量)介绍及示例(1) - File、Socket、console 【flink番外篇】4、flink的sink(内置、mysql、kafka、redis、clickhouse、分布式缓存、广播变量)介绍及示例(2) - jdbc/mysql 【flink番外篇】4、flink的sink(内置、mysql、kafka、redis、clickhouse、分布式缓存、广播变量)介绍及示例(3) - redis 【flink番外篇】4、flink的sink(内置、mysql、kafka、redis、clickhouse、分布式缓存、广播变量)介绍及示例(4) - clickhouse 【flink番外篇】4、flink的sink(内置、mysql、kafka、redis、clickhouse、分布式缓存、广播变量)介绍及示例(5) - kafka 【flink番外篇】4、flink的sink(内置、mysql、kafka、redis、clickhouse、分布式缓存、广播变量)介绍及示例(6) - 分布式缓存 【flink番外篇】4、flink的sink(内置、mysql、kafka、redis、clickhouse、分布式缓存、广播变量)介绍及示例(7) - 广播变量 【flink番外篇】4、flink的sink(内置、mysql、kafka、redis、clickhouse、分布式缓存、广播变量)介绍及示例(8) - 完整版

【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年12月23日 0

暂无评论

推荐阅读
nNPyvzOmRTFq