Flink学习04 - 流处理API-sink
- Flink流处理API(DataStreamAPI)
-
- sink
-
- 官方连接框架(1.10版本)
- 第三方连接框架(1.10版本)
- 1.1 kafka - 案例 消息管道
-
- 生产者
- 消费者
- flink
- 1.2 Redis - 自定义RedisMapper
Flink流处理API(DataStreamAPI)
sink
? Flink 没有类似于 spark 中 foreach 方法,让用户进行迭代的操作。虽有对外的输出操作都要利用 Sink 完成。最后通过类似如下方式完成整个任务最终输出操作。
stream.addSink(new MySink(xxxx))
官方提供了一部分的框架的 sink。除此以外,需要用户自定义实现 sink。
官方连接框架(1.10版本)
第三方连接框架(1.10版本)
1.1 kafka - 案例 消息管道
flink消费 sensor主题的消息,推送到sinktest主题
生产者
消费者
flink
public class SinkTest1_Kafka {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);Properties props = new Properties();props.setProperty("bootstrap.servers", "192.168.10.35:9092");// 从Kafka读取数据DataStream<String> inputStream = env.addSource(new FlinkKafkaConsumer010<String>("sensor",new SimpleStringSchema(),props));// 使用java8的lambda表达式 转换成SensorReading类型DataStream<String> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2])).toString();});dataStream.addSink( new FlinkKafkaProducer010<String>("192.168.10.35:9092", "sinktest", new SimpleStringSchema()));env.execute();}
}
1.2 Redis - 自定义RedisMapper
public class SinkTest2_Redis {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(4);// 从文件读取数据DataStream<String> inputStream = env.readTextFile("/Users/seafyliang/DEV/Code_projects/Java_projects/study_projects/flink_study/src/main/resources/sensor.txt");// 使用java8的lambda表达式 转换成SensorReading类型DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));});// 定义jedis连接配置FlinkJedisPoolConfig config = new FlinkJedisPoolConfig.Builder().setHost("localhost").setPort(6379).build();dataStream.addSink( new RedisSink<>(config, new MyRedisMapper()));env.execute();}// 自定义RedisMapperpublic static class MyRedisMapper implements RedisMapper<SensorReading>{
// 定义保存数据到redis的命令, 存成Hash表, hset sensor_temp id temperature@Overridepublic RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.HSET, "sensor_temp");}@Overridepublic String getKeyFromData(SensorReading sensorReading) {
return sensorReading.getId();}@Overridepublic String getValueFromData(SensorReading sensorReading) {
return sensorReading.getTemperature().toString();}}
}