flink1.16.0
hive-2.3.9
hadoop2.7.7(本地安装)
package cn.mrt.flink.connector;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;
public class HiveConnectorToTest {
public static void main(String[] args) throws Exception {
/* //获取本地流执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
//读取数据源
tableEnv.executeSql("CREATE TABLE KafkaTableSource (" +
" `user` STRING," +
" `url` STRING," +
" `ts` TIMESTAMP(3) METADATA FROM 'timestamp')"+
" WITH ("+
"'connector' = 'kafka',"+
"'topic' = 'events-source',"+
"'properties.bootstrap.servers' = '192.168.56.131:9092',"+
"'properties.group.id' = 'testGroup',"+
"'scan.startup.mode' = 'latest-offset',"+
" 'format' = 'csv')");
//执行检查
Table resultTable = tableEnv.sqlQuery("select * from KafkaTableSource");
//输出到控制台
tableEnv.toDataStream(resultTable).print();*/
/*
* 下面是将kafka数据输出到hive的hive catalog配置
* */
EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
TableEnvironment tableEnv1 = TableEnvironment.create(settings);
//String type="hive";
String name="myHive";
String defaultDatabase="default";
//String hiveConfDir="/home/hadoop/app/hive-2.3.9/conf";
//String path = HiveConnectorToTest.class.getClassLoader().getResource("hive-site.xml").getPath();
// String path=Thread.currentThread().getContextClassLoader().getResource("hive-site.xml").getPath();
//File file = new File("src/main/resources/conf");
// String hiveConfDir = file.getAbsolutePath();
//String path= HiveConnectorToTest.class.getClassLoader().getResource("hive-site.xml").getPath();
//system.out.println(path);
String hiveConfDir="D:\\Program Files (x86)\\hadoop\\hadoop-2.7.7\\etc\\hadoop";
System.out.println(hiveConfDir);
//String hiveConfDir = "D:\\Program Files (x86)\\hadoop\\hadoop-2.7.7\\etc\\hadoop";
//String hiveConfDir = "C:\\Users\\19770\\Desktop\\warehouse\\class20221124-mrt-flink\\src\\main\\resources\\conf";
String version="2.3.9";
String hadoopConfDir = "D:\\Program Files (x86)\\hadoop\\hadoop-2.7.7\\etc\\hadoop";
// String hadoopConfDir = "";
//创建一个HiveCatalog,并在表环境中注册
HiveCatalog hive = new HiveCatalog(name,defaultDatabase,hiveConfDir);
tableEnv1.registerCatalog("myHive", hive);
//配置hive方言
tableEnv1.getConfig().setSqlDialect(SqlDialect.HIVE);
//配置default方言
// tableEnv1.getConfig().setSqlDialect(SqlDialect.DEFAULT);
//
//使用HiveCatalog作为当前会话的catalog
tableEnv1.useCatalog("myHive");
tableEnv1.useDatabase("default");
//sql客户端使用sql语句创建HiveCatalog
// create catalog myHive with('type'='hive','hive-conf-dir'='/home/hadoop/app/hive-2.3.9/conf');
//设置sql方言(default/hive)
/*
* 1.sql客户端设置
* set table.sql-dialect=hive;
* 2.在sql-cli-defaults.yaml中配置
* execution:
*planner: blink
*type: batch
*result-mode: table
*configuration:
*table.sql-dialect: hive
*
* */
//读写hive
//使用注册的catalog
tableEnv1.executeSql("show databases").print();
//输出到kafka topic
/*tableEnv1.executeSql("CREATE TABLE KafkaTableSink (" +
" `user` STRING," +
" `url` STRING," +
" `ts` TIMESTAMP)"+
" WITH ("+
"'connector' = 'kafka',"+
"'topic' = 'events-sink',"+
"'properties.bootstrap.servers' = '192.168.56.131:9092',"+
" 'format' = 'csv')");
resultTable.executeInsert("KafkaTableSink");
*/
//执行程序
//env.execute();
}
}