spark 流处理实例
  LStkWOxQIeE5 2023年11月01日 64 0

 

开发环境:

系统:win 11  java : 1.8  scala:2.13  spark : 3.3.2

 

一, 使用 spark 结构化流读取文件数据,并做分组统计。

功能:spark  以结构化流形式从文件夹读取 csv 后缀数据文件,并进行连表分组统计。每次触发计算后,结果表输出到console控制板。

代码:

package org.example; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.streaming.OutputMode; import org.apache.spark.sql.streaming.StreamingQuery; import org.apache.spark.sql.streaming.StreamingQueryException; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructType; import java.util.concurrent.TimeoutException; public class Main { /* 例子:从文件中读取流, 被定义模式,生成dataset ,使用sql api 进行分析。 */
    public static void main(String[] args) throws TimeoutException, StreamingQueryException { System.out.println("Hello world!"); SparkSession spark = SparkSession.builder().appName("spark streaming").config("spark.master", "local") .config("spark.sql.warehouse.dir", "file:///app/") .getOrCreate(); spark.sparkContext().setLogLevel("ERROR"); StructType schema =
                new StructType().add("empId", DataTypes.StringType).add("empName", DataTypes.StringType) .add("department", DataTypes.StringType); Dataset<Row> rawData = spark.readStream().option("header", false).format("csv").schema(schema) .csv("D:/za/spark_data/*.csv"); rawData.createOrReplaceTempView("empData"); Dataset<Row> result = spark.sql("select count(*), department from  empData group by department"); StreamingQuery query = result.writeStream().outputMode("complete").format("console").start();  // 每次触发,全表输出
 query.awaitTermination(); } }

输出:

二, 使用 spark 结构化流读取socket流,做单词统计,使用Java编程

功能:spark 读取本地机器的网络流数据,并统计。

代码:

package org.example; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.streaming.StreamingQuery; import org.apache.spark.sql.streaming.StreamingQueryException; import java.util.Arrays; import java.util.concurrent.TimeoutException; public class SocketStreaming_wordcount { /* * 从socket 读取字符流,并做word count分析 * * */
    public static void main(String[] args) throws TimeoutException, StreamingQueryException { SparkSession spark = SparkSession .builder() .appName("JavaStructuredNetworkWordCount") .config("spark.master", "local") .getOrCreate(); // dataframe 表示 socket 字符流
        Dataset<Row> lines = spark .readStream() .format("socket") .option("host", "localhost") .option("port", 9999) .load(); // 把一行字符串切分为 单词
        Dataset<String> words = lines .as(Encoders.STRING()) .flatMap((FlatMapFunction<String, String>) x -> Arrays.asList(x.split(" ")).iterator(), Encoders.STRING()); // 对单词分组计数
        Dataset<Row> wordCounts = words.groupBy("value").count(); // 开始查询并打印输出到console
        StreamingQuery query = wordCounts.writeStream() .outputMode("complete") .format("console") .start(); query.awaitTermination(); } }

输出:

二, 使用 spark 结构化流读取socket流,做单词统计,使用scala 编程

功能:同上

代码:

import org.apache.spark.sql.SparkSession object Main { def main(args: Array[String]): Unit = { val spark = SparkSession .builder .appName("streaming_socket_scala") .config("spark.master", "local") .getOrCreate() import spark.implicits._ // 创建datafram 象征从网络socket 接收流
    val lines = spark.readStream .format("socket") .option("host", "localhost") .option("port", 9999) .load() // 切分一行成单词
    val words = lines.as[String].flatMap(_.split(" ")) // 进行单词统计
    val wordCounts = words.groupBy("value").count() // 开始查询并输出
    val query = wordCounts.writeStream .outputMode("complete") .format("console") .start()
query.awaitTermination() } }

输出:

 

【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月08日 0

暂无评论

推荐阅读
LStkWOxQIeE5
作者其他文章 更多