Flink写HBase


需求:通过Flink处理流数据,处理结果写入HBase

实现:通过继承RichSinkFunction类,自定义Sink

1.常量类

public class HBaseConstant {
    public static final String TABLE_NAME = "tableName";
    public static final String COLUMN_FAMILY = "columnFamily";
    public static final String ROW_KEY = "rowKey";
    public static final String COLUMN_NAME_LIST = "columnNameList";
}

2.自定义Sink代码

public class HBaseSink extends RichSinkFunction> {
    Connection conn = null;

    @Override
    public void open(Configuration parameters) throws Exception {
        org.apache.hadoop.conf.Configuration config = HBaseConfiguration.create();
        config.set(HConstants.ZOOKEEPER_QUORUM, "node2,node3,node4");
        config.set(HConstants.ZOOKEEPER_CLIENT_PORT, "2181");
        conn = ConnectionFactory.createConnection(config);
    }

    @Override
    public void invoke(Map value, Context context) throws Exception {
        String tableName = value.get(HBaseConstant.TABLE_NAME);
        String columnFamily = value.get(HBaseConstant.COLUMN_FAMILY);
        String rowKey = value.get(HBaseConstant.ROW_KEY);
        String columnNameList = value.get(HBaseConstant.COLUMN_NAME_LIST);

        Table table = conn.getTable(TableName.valueOf(tableName));
        Put put = new Put(Bytes.toBytes(rowKey));
        String[]  columnNames= columnNameList.split(",");
        for (String columnName : columnNames) {
            put.addColumn(Bytes.toBytes(columnFamily),Bytes.toBytes(columnName), Bytes.toBytes(value.get(columnName)));
        }
        table.put(put);
    }

    @Override
    public void close() throws Exception {
        if (conn != null) {
            conn.close();
        }
    }
}

3.Flink流计算代码

public class StreamToHBase {
    public static void main(String[] args) {
        StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();

        // nk -lk 8888
        DataStreamSource streamSource = streamEnv.socketTextStream("node4", 8888);


        SingleOutputStreamOperator wordStream = streamSource.flatMap(new FlatMapFunction() {
            @Override
            public void flatMap(String line, Collector collector) {
                String[] words = line.split(" ");
                for (String word : words) {
                    collector.collect(word);
                }
            }
        });

        SingleOutputStreamOperator> map = wordStream.map(new MapFunction>() {
            @Override
            public Tuple2 map(String vin) throws Exception {
                return Tuple2.of(vin, 1);
            }
        });

        SingleOutputStreamOperator> sum = map.keyBy(0).sum(1);

        SingleOutputStreamOperator> hbaseResult = sum.map(new MapFunction, Map>() {
            @Override
            public Map map(Tuple2 tuple2) throws Exception {
                String word = tuple2.f0;
                Integer count = tuple2.f1;

                Map map = new HashMap<>();
                map.put(HBaseConstant.TABLE_NAME, "word");
                map.put(HBaseConstant.COLUMN_FAMILY, "cf");

                List list = new ArrayList<>();
                list.add("word");
                list.add("count");
                String columnNames = Joiner.on(",").join(list);

                String rowKey = LocalDate.now() + word;
                map.put(HBaseConstant.ROW_KEY, rowKey);
                map.put(HBaseConstant.COLUMN_NAME_LIST, columnNames);
                map.put("word", word);
                map.put("count", String.valueOf(count));

                return map;
            }
        });

        hbaseResult.addSink(new HBaseSink());
    }
}