Flink java模拟生成自定义流式数据
  TEZNKK3IfmPf 2023年11月14日 30 0

思路如下:

  1. 定义一个POJO类,注意flink里使用的类必须有一个无参的构造方法
  2. 自定义DataSource实现SourceFunction接口
  3. 使用ctx.collect()传入想要发送的数据就可以了

首先定义一个POJO类:

class MyData {
     
       
    public int keyId;
    public long timestamp;
    public double value;

    public MyData() {
     
       
    }

    public MyData(int accountId, long timestamp, double value) {
     
       
        this.keyId = accountId;
        this.timestamp = timestamp;
        this.value = value;
    }

    public long getKeyId() {
     
       
        return keyId;
    }

    public void setKeyId(int keyId) {
     
       
        this.keyId = keyId;
    }

    public long getTimestamp() {
     
       
        return timestamp;
    }

    public void setTimestamp(long timestamp) {
     
       
        this.timestamp = timestamp;
    }

    public double getValue() {
     
       
        return value;
    }

    public void setValue(double value) {
     
       
        this.value = value;
    }

    @Override
    public String toString() {
     
       
        return "MyData{" +
                "keyId=" + keyId +
                ", timestamp=" + timestamp +
                ", value=" + value +
                '}';
    }
}

生成自己的数据:

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.util.Random;

public class CreateMyData {
     
       
    public static void main(String[] args) throws Exception {
     
       

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<MyData> sourceStream = env.addSource(new MyDataSource());
        env.setParallelism(3);
        sourceStream.print();
        env.execute();
    }


    private static class MyDataSource implements SourceFunction<MyData> {
     
       
        // 定义标志位,用来控制数据的产生
        private boolean isRunning = true;
        private final Random random = new Random(0);

        @Override
        public void run(SourceContext ctx) throws Exception {
     
       
            while (isRunning) {
     
       
                ctx.collect(new MyData(random.nextInt(5), System.currentTimeMillis(), random.nextFloat()));
                Thread.sleep(1000L); // 1s生成1个数据
            }
        }

        @Override
        public void cancel() {
     
       
            isRunning = false;
        }
    }
}
【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月14日 0

暂无评论

推荐阅读
  TEZNKK3IfmPf   22天前   48   0   0 java
  TEZNKK3IfmPf   2024年05月31日   55   0   0 java
TEZNKK3IfmPf