flink本地开发测试hive connector
  UqrkOCyfkQZc 2023年11月02日 61 0

flink1.16.0

hive-2.3.9

hadoop2.7.7(本地安装)

package cn.mrt.flink.connector;


import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;


public class HiveConnectorToTest {
    public static void main(String[] args) throws Exception {
       /* //获取本地流执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        //读取数据源
        tableEnv.executeSql("CREATE TABLE KafkaTableSource (" +
                " `user` STRING," +
                " `url` STRING," +
                " `ts` TIMESTAMP(3) METADATA FROM 'timestamp')"+
                " WITH ("+
                "'connector' = 'kafka',"+
                "'topic' = 'events-source',"+
                "'properties.bootstrap.servers' = '192.168.56.131:9092',"+
                "'properties.group.id' = 'testGroup',"+
                "'scan.startup.mode' = 'latest-offset',"+
                " 'format' = 'csv')");
        //执行检查
        Table resultTable = tableEnv.sqlQuery("select * from KafkaTableSource");
        //输出到控制台
        tableEnv.toDataStream(resultTable).print();*/

        /*
         * 下面是将kafka数据输出到hive的hive catalog配置
         * */
        EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
        TableEnvironment tableEnv1 = TableEnvironment.create(settings);
        //String type="hive";
        String name="myHive";
        String defaultDatabase="default";
        //String hiveConfDir="/home/hadoop/app/hive-2.3.9/conf";
         //String path  = HiveConnectorToTest.class.getClassLoader().getResource("hive-site.xml").getPath();
       // String path=Thread.currentThread().getContextClassLoader().getResource("hive-site.xml").getPath();
       //File file = new File("src/main/resources/conf");
       // String hiveConfDir = file.getAbsolutePath();
        //String path= HiveConnectorToTest.class.getClassLoader().getResource("hive-site.xml").getPath();
       //system.out.println(path);
        String hiveConfDir="D:\\Program Files (x86)\\hadoop\\hadoop-2.7.7\\etc\\hadoop";

        System.out.println(hiveConfDir);
        //String hiveConfDir = "D:\\Program Files (x86)\\hadoop\\hadoop-2.7.7\\etc\\hadoop";
        //String hiveConfDir = "C:\\Users\\19770\\Desktop\\warehouse\\class20221124-mrt-flink\\src\\main\\resources\\conf";
        String version="2.3.9";
       String  hadoopConfDir = "D:\\Program Files (x86)\\hadoop\\hadoop-2.7.7\\etc\\hadoop";
       // String hadoopConfDir = "";
        //创建一个HiveCatalog,并在表环境中注册
         HiveCatalog hive = new HiveCatalog(name,defaultDatabase,hiveConfDir);
        tableEnv1.registerCatalog("myHive", hive);
        //配置hive方言
        tableEnv1.getConfig().setSqlDialect(SqlDialect.HIVE);
        //配置default方言
        // tableEnv1.getConfig().setSqlDialect(SqlDialect.DEFAULT);
        //
        //使用HiveCatalog作为当前会话的catalog
        tableEnv1.useCatalog("myHive");
        tableEnv1.useDatabase("default");
        //sql客户端使用sql语句创建HiveCatalog
        // create catalog myHive with('type'='hive','hive-conf-dir'='/home/hadoop/app/hive-2.3.9/conf');
        //设置sql方言(default/hive)
        /*
         * 1.sql客户端设置
         * set table.sql-dialect=hive;
         * 2.在sql-cli-defaults.yaml中配置
         * execution:
         *planner: blink
         *type: batch
         *result-mode: table
         *configuration:
         *table.sql-dialect: hive
         *
         * */

        //读写hive
        //使用注册的catalog

        tableEnv1.executeSql("show databases").print();






        //输出到kafka topic
        /*tableEnv1.executeSql("CREATE TABLE KafkaTableSink (" +
                " `user` STRING," +
                " `url` STRING," +
                " `ts` TIMESTAMP)"+
                " WITH ("+
                "'connector' = 'kafka',"+
                "'topic' = 'events-sink',"+
                "'properties.bootstrap.servers' = '192.168.56.131:9092',"+
                " 'format' = 'csv')");
        resultTable.executeInsert("KafkaTableSink");
*/
        //执行程序
        //env.execute();

    }

}
【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月08日 0

暂无评论

UqrkOCyfkQZc
最新推荐 更多

2024-05-31