Storm集成 JDBC
  bhG8jH8b1hMi 2023年11月13日 33 0


  • 创建 maven 工程,pom 文件如下:
<dependencies>
	<dependency>
		<groupId>org.apache.storm</groupId>
		<artifactId>storm-core</artifactId>
		<version>1.0.3</version>
		<scope>provided</scope>
	</dependency>
	<!-- 与jdbc集成 -->
	<dependency>
		<groupId>org.apache.storm</groupId>
		<artifactId>storm-jdbc</artifactId>
		<version>1.0.3</version>
		<scope>provided</scope>
	</dependency>
	<dependency>
		<groupId>mysql</groupId>
		<artifactId>mysql-connector-java</artifactId>
		<version>5.1.43</version>
	</dependency>
</dependencies>
  • Spout 任务代码如下:
package storm;

import java.util.Map;
import java.util.Random;

import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;

public class WordCountSpout extends BaseRichSpout {

	private static final long serialVersionUID = 1571765705181254611L;

	// 模拟数据
	private String[] data = {"I love Beijing", "I love China", "Beijing is the capital of China"};
	
	// 用于往下一个组件发送消息
	private SpoutOutputCollector collector;
	
	public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
		this.collector = collector;
	}

	public void nextTuple() {
		Utils.sleep(3000);
		// 由Strom框架调用,用于接收外部数据源的数据
		int random = (new Random()).nextInt(3);
		String sentence = data[random];
		
		// 发送数据
		System.out.println("发送数据:" + sentence);
		this.collector.emit(new Values(sentence));
	}

	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("sentence"));
	}
}
  • 用于分词的 Bolt 任务代码如下:
package storm;

import java.util.Map;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

public class WordCountSplitBolt extends BaseRichBolt {

	private static final long serialVersionUID = -7399165475264468561L;

	private OutputCollector collector;
	
	public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
		this.collector = collector;
	}

	public void execute(Tuple tuple) {
		String sentence = tuple.getStringByField("sentence");
		// 分词
		String[] words = sentence.split(" ");
		for (String word : words) {
			this.collector.emit(new Values(word, 1));
		}
	}

	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("word", "count"));
	}
}
  • 用于计数的 Bolt 任务:
package storm;

import java.util.HashMap;
import java.util.Map;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

public class WordCountBoltCount extends BaseRichBolt {

	private static final long serialVersionUID = -3206516572376524950L;

	private OutputCollector collector;
	
	private Map<String, Integer> result = new HashMap<String, Integer>();
	
	public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
		this.collector = collector;
	}

	public void execute(Tuple tuple) {
		String word = tuple.getStringByField("word");
		int count = tuple.getIntegerByField("count");
		
		if (result.containsKey(word)) {
			result.put(word, result.get(word) + count);
		} else {
			result.put(word, 1);
		}
		// 直接输出到屏幕
		System.out.println("输出的结果是:" + result);
		
		// 将统计结果插入到数据库中
		this.collector.emit(new Values(word, result.get(word)));
	}

	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("word", "total"));
	}
}
  • 用于连接的 ConnectionProvider 的代码如下:
package jdbc;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;

import org.apache.storm.jdbc.common.ConnectionProvider;

public class MyConnectionProvider implements ConnectionProvider {

	private static final long serialVersionUID = -4784999115987415445L;

	private static String driver = "com.mysql.jdbc.Driver";
	
	private static String url = "jdbc:mysql://qujianlei:3306/storm";
	
	private static String user = "root";
	
	private static String password = "123";
	
	static {
		try {
			Class.forName(driver);
		} catch (ClassNotFoundException e) {
			e.printStackTrace();
		}
	}

	public Connection getConnection() {
		try {
			return DriverManager.getConnection(url, user, password);
		} catch (SQLException e) {
			e.printStackTrace();
		}
		return null;
	}

	public void prepare() {
	}
	
	public void cleanup() {
	}
}
  • 获取 JdbcBolt 的工具类如下:
package jdbc;

import org.apache.storm.jdbc.bolt.JdbcInsertBolt;
import org.apache.storm.jdbc.common.ConnectionProvider;
import org.apache.storm.jdbc.mapper.JdbcMapper;
import org.apache.storm.jdbc.mapper.SimpleJdbcMapper;
import org.apache.storm.topology.IRichBolt;

public class JdbcBoltUtils {
	public static IRichBolt createJDBCBolt() {
		ConnectionProvider connectionProvider = new MyConnectionProvider();
		JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper("result", connectionProvider);
		return new JdbcInsertBolt(connectionProvider, simpleJdbcMapper).
			withTableName("result").withQueryTimeoutSecs(30);
	}
}

注:result 为表的名字,共有两个字段:word, total

  • Topology 的代码如下:
package storm;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;

import jdbc.JdbcBoltUtils;

public class WordCountTopology {

	public static void main(String[] args) throws Exception {
		TopologyBuilder builder = new TopologyBuilder();
		
		// 设置任务的spout组件
		builder.setSpout("wordcount_spout", new WordCountSpout());
		
		// 设置任务的第一个bolt组件
		builder.setBolt("wordcount_splitbolt", new WordCountSplitBolt()).
			shuffleGrouping("wordcount_spout");
		
		// 设置任务的第二个bolt组件
		builder.setBolt("wordcount_count", new WordCountBoltCount()).
			fieldsGrouping("wordcount_splitbolt", new Fields("word"));
		
		// 设置任务的第三个bolt组件将数据持久化到mysql
		builder.setBolt("wordcount_jdbcBolt", JdbcBoltUtils.createJDBCBolt()).
			shuffleGrouping("wordcount_count");
		
		// 创建Topology任务
		StormTopology wc = builder.createTopology();
		
		Config config = new Config();
		
		// 提交到本地运行
		LocalCluster localCluster = new LocalCluster();
		localCluster.submitTopology("mywordcount", config, wc);
		
		// 提交任务到Storm集群运行
//		StormSubmitter.submitTopology(args[0], config, wc);
	}
}
  • 右击,运行即可(注:Eclipse 要以管理员身份启动)。


【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月13日 0

暂无评论

推荐阅读
bhG8jH8b1hMi