本篇是接上一篇而来的:
log4j日志传送至flume至kafka显示输出
1、Testlog类信息:
package com.bupt.realcaldemo.test;
import org.apache.log4j.Logger;public class Testlog {protected static final Logger LOG=Logger.getLogger(Testlog.class);public static void main(String[] args) {// 用户idString uid = "123";// 电影idString mid = "10";// 用户给电影的评分double score = 5.0;// 埋点信息String sign = "abc";LOG.info("这是一条需要进行计算的数字信息" + sign + uid + sign + mid + sign + String.valueOf(score) + sign);}
}
2、consumer类信息
package com.bupt.kafka;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.Arrays;
import java.util.Properties;public class Consumer {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", "test-consumer-group");// 开启自动提交 offset 功能props.put("enable.auto.commit", "true");// 自动提交 offset 的时间间隔props.put("auto.commit.interval.ms", "1000");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);// 注册topicconsumer.subscribe(Arrays.asList("demo"));while (true) {// 拉取数据ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records) {// System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());String value = record.value();String[] strings = value.split("abc");String uid = strings[1];String mid = strings[2];double score = Double.valueOf(strings[3]);System.out.println("实时获取数据:");System.out.println("用户Id为: " + uid);System.out.println("电影Id为: " + mid);System.out.println("用户评分为: " + score);}}}
}
3、运行(consumer瞬间就获取到了相关日志信息)
4、 总结
这种实时获取信息的方式有很多用处,比如实时推荐等功能,我们拿到信息后,就可以做下一步的处理了。