FlinkSQL实践记录3 -- join


1. 背景

select .. count(*) .. where .. group by ..,再对join关联做一些实践。

2. 代码

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings settings = EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build();

        TableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);

        // 把kafka中的topic映射成一个输入临时表
        tableEnv.executeSql(
                "CREATE TABLE sensor_source(id STRING, name STRING) WITH  (" +
                        " 'connector' = 'kafka'," +
                        " 'topic' = 'flinksqldemo'," +
                        " 'properties.bootstrap.servers' = 'ip:port'," +
                        " 'properties.group.id' = 'flinksqlCount'," +
                        " 'scan.startup.mode' = 'earliest-offset'," +
                        " 'format' = 'json')"
        );
        // 把mysql中表映射成输入维表
        tableEnv.executeSql(
                "CREATE TEMPORARY TABLE mysql_source (" +
                        "               id STRING," +
                        "               name STRING," +
                        "               PRIMARY KEY (id) NOT ENFORCED" +
                        ") WITH (" +
                        " 'connector' = 'jdbc'," +
                        " 'url' = 'jdbc:mysql://ip:port/kafka?serverTimezone=UTC'," +
                        " 'table-name' = 'test_info'," +
                        " 'username' = 'xxx'," +
                        " 'password' = 'xxx'" +
                        ")"
        );

        String mysql_sql = "CREATE TABLE mysql_sink (" +
                "               name STRING," +
                "               cnt BIGINT," +
                "               PRIMARY KEY (name) NOT ENFORCED" +
                ") WITH (" +
                " 'connector' = 'jdbc'," +
                " 'url' = 'jdbc:mysql://ip:port/kafka?serverTimezone=UTC'," +
                " 'table-name' = 'count_info'," +
                " 'username' = 'xxx'," +
                " 'password' = 'xxx'" +
                ")";

        tableEnv.executeSql(mysql_sql);

        TableResult tableResult = tableEnv.executeSql(
                "INSERT INTO mysql_sink " +
                        "SELECT c.name, count(*) as cnt " +
                        "FROM sensor_source o " +
                        "JOIN mysql_source c " +
                        "on o.id = c.id " +
                        "where o.id > 3 " +
                        "group by c.name "
        );
        System.out.println(tableResult.getJobClient().get().getJobStatus());

mysql表中更新了最新的统计结果

3. 问题

mysql_source作为维表,如果发生变更,上述代码中flink不能捕获mysql维表test_info的变更并更新到mysql_sink。
如果维表需要实时更新,以上代码如何修改才能输出正确的结果?下一篇进行实践测试