FlinkSQL实践记录3 -- join
1. 背景
后select .. count(*) .. where .. group by ..
,再对join关联做一些实践。
2. 代码
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
TableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
// 把kafka中的topic映射成一个输入临时表
tableEnv.executeSql(
"CREATE TABLE sensor_source(id STRING, name STRING) WITH (" +
" 'connector' = 'kafka'," +
" 'topic' = 'flinksqldemo'," +
" 'properties.bootstrap.servers' = 'ip:port'," +
" 'properties.group.id' = 'flinksqlCount'," +
" 'scan.startup.mode' = 'earliest-offset'," +
" 'format' = 'json')"
);
// 把mysql中表映射成输入维表
tableEnv.executeSql(
"CREATE TEMPORARY TABLE mysql_source (" +
" id STRING," +
" name STRING," +
" PRIMARY KEY (id) NOT ENFORCED" +
") WITH (" +
" 'connector' = 'jdbc'," +
" 'url' = 'jdbc:mysql://ip:port/kafka?serverTimezone=UTC'," +
" 'table-name' = 'test_info'," +
" 'username' = 'xxx'," +
" 'password' = 'xxx'" +
")"
);
String mysql_sql = "CREATE TABLE mysql_sink (" +
" name STRING," +
" cnt BIGINT," +
" PRIMARY KEY (name) NOT ENFORCED" +
") WITH (" +
" 'connector' = 'jdbc'," +
" 'url' = 'jdbc:mysql://ip:port/kafka?serverTimezone=UTC'," +
" 'table-name' = 'count_info'," +
" 'username' = 'xxx'," +
" 'password' = 'xxx'" +
")";
tableEnv.executeSql(mysql_sql);
TableResult tableResult = tableEnv.executeSql(
"INSERT INTO mysql_sink " +
"SELECT c.name, count(*) as cnt " +
"FROM sensor_source o " +
"JOIN mysql_source c " +
"on o.id = c.id " +
"where o.id > 3 " +
"group by c.name "
);
System.out.println(tableResult.getJobClient().get().getJobStatus());
mysql表中更新了最新的统计结果
3. 问题
mysql_source作为维表,如果发生变更,上述代码中flink不能捕获mysql维表test_info的变更并更新到mysql_sink。
如果维表需要实时更新,以上代码如何修改才能输出正确的结果?下一篇进行实践测试