需求:通过Flink处理流数据,处理结果写入HBase
实现:通过继承RichSinkFunction类,自定义Sink
1.常量类
public class HBaseConstant {
public static final String TABLE_NAME = "tableName";
public static final String COLUMN_FAMILY = "columnFamily";
public static final String ROW_KEY = "rowKey";
public static final String COLUMN_NAME_LIST = "columnNameList";
}
2.自定义Sink代码
public class HBaseSink extends RichSinkFunction
3.Flink流计算代码
public class StreamToHBase {
public static void main(String[] args) {
StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
// nk -lk 8888
DataStreamSource streamSource = streamEnv.socketTextStream("node4", 8888);
SingleOutputStreamOperator wordStream = streamSource.flatMap(new FlatMapFunction() {
@Override
public void flatMap(String line, Collector collector) {
String[] words = line.split(" ");
for (String word : words) {
collector.collect(word);
}
}
});
SingleOutputStreamOperator> map = wordStream.map(new MapFunction>() {
@Override
public Tuple2 map(String vin) throws Exception {
return Tuple2.of(vin, 1);
}
});
SingleOutputStreamOperator> sum = map.keyBy(0).sum(1);
SingleOutputStreamOperator