当前位置: 代码迷 >> 综合 >> flume+kafka实现实时信息获取
  详细解决方案

flume+kafka实现实时信息获取

热度:93   发布时间:2023-12-14 14:34:12.0

本篇是接上一篇而来的:

log4j日志传送至flume至kafka显示输出

 

1、Testlog类信息:

package com.bupt.realcaldemo.test;
import org.apache.log4j.Logger;public class Testlog {protected static final Logger LOG=Logger.getLogger(Testlog.class);public static void main(String[] args) {// 用户idString uid = "123";// 电影idString mid = "10";// 用户给电影的评分double score = 5.0;// 埋点信息String sign = "abc";LOG.info("这是一条需要进行计算的数字信息" +  sign + uid + sign + mid + sign + String.valueOf(score) + sign);}
}

 

2、consumer类信息

package com.bupt.kafka;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.Arrays;
import java.util.Properties;public class Consumer {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", "test-consumer-group");// 开启自动提交 offset 功能props.put("enable.auto.commit", "true");// 自动提交 offset 的时间间隔props.put("auto.commit.interval.ms", "1000");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);// 注册topicconsumer.subscribe(Arrays.asList("demo"));while (true) {// 拉取数据ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records) {// System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());String value = record.value();String[] strings = value.split("abc");String uid = strings[1];String mid = strings[2];double score = Double.valueOf(strings[3]);System.out.println("实时获取数据:");System.out.println("用户Id为: " + uid);System.out.println("电影Id为: " + mid);System.out.println("用户评分为: " + score);}}}
}

 

3、运行(consumer瞬间就获取到了相关日志信息)

 

4、 总结

这种实时获取信息的方式有很多用处,比如实时推荐等功能,我们拿到信息后,就可以做下一步的处理了。