当前位置: 代码迷 >> 综合 >> Flink学习04 - 流处理API-sink
  详细解决方案

Flink学习04 - 流处理API-sink

热度:91   发布时间:2024-01-15 18:25:33.0

Flink学习04 - 流处理API-sink

  • Flink流处理API(DataStreamAPI)
    • sink
      • 官方连接框架(1.10版本)
      • 第三方连接框架(1.10版本)
      • 1.1 kafka - 案例 消息管道
        • 生产者
        • 消费者
        • flink
      • 1.2 Redis - 自定义RedisMapper

Flink流处理API(DataStreamAPI)

image-20210105145626637

sink

? Flink 没有类似于 spark 中 foreach 方法,让用户进行迭代的操作。虽有对外的输出操作都要利用 Sink 完成。最后通过类似如下方式完成整个任务最终输出操作。

stream.addSink(new MySink(xxxx))

官方提供了一部分的框架的 sink。除此以外,需要用户自定义实现 sink。

官方连接框架(1.10版本)

image-20210218154920997

第三方连接框架(1.10版本)

image-20210218154935600

1.1 kafka - 案例 消息管道

flink消费 sensor主题的消息,推送到sinktest主题

生产者

image-20210218160838532

消费者

image-20210218160901521

flink
public class SinkTest1_Kafka {
    public static void main(String[] args) throws Exception{
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);Properties props = new Properties();props.setProperty("bootstrap.servers", "192.168.10.35:9092");// 从Kafka读取数据DataStream<String> inputStream = env.addSource(new FlinkKafkaConsumer010<String>("sensor",new SimpleStringSchema(),props));// 使用java8的lambda表达式 转换成SensorReading类型DataStream<String> dataStream = inputStream.map(line -> {
    String[] fields = line.split(",");return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2])).toString();});dataStream.addSink( new FlinkKafkaProducer010<String>("192.168.10.35:9092", "sinktest", new SimpleStringSchema()));env.execute();}
}

1.2 Redis - 自定义RedisMapper

public class SinkTest2_Redis {
    public static void main(String[] args) throws Exception{
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(4);// 从文件读取数据DataStream<String> inputStream = env.readTextFile("/Users/seafyliang/DEV/Code_projects/Java_projects/study_projects/flink_study/src/main/resources/sensor.txt");// 使用java8的lambda表达式 转换成SensorReading类型DataStream<SensorReading> dataStream = inputStream.map(line -> {
    String[] fields = line.split(",");return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));});// 定义jedis连接配置FlinkJedisPoolConfig config = new FlinkJedisPoolConfig.Builder().setHost("localhost").setPort(6379).build();dataStream.addSink( new RedisSink<>(config, new MyRedisMapper()));env.execute();}// 自定义RedisMapperpublic static class MyRedisMapper implements RedisMapper<SensorReading>{
    // 定义保存数据到redis的命令, 存成Hash表, hset sensor_temp id temperature@Overridepublic RedisCommandDescription getCommandDescription() {
    return new RedisCommandDescription(RedisCommand.HSET, "sensor_temp");}@Overridepublic String getKeyFromData(SensorReading sensorReading) {
    return sensorReading.getId();}@Overridepublic String getValueFromData(SensorReading sensorReading) {
    return sensorReading.getTemperature().toString();}}
}

image-20210305222834434

  相关解决方案