当前位置: 代码迷 >> 综合 >> 初学kafka streams2.1.0原理及代码实现(wordcount)
  详细解决方案

初学kafka streams2.1.0原理及代码实现(wordcount)

热度:48   发布时间:2023-12-16 02:03:12.0

前面学习了kafka的基础知识,给大家推荐《kafka权威指南》这本书,这次学习kafka streams,参考书籍:《kafka从入门到实践》,电子书分享给大家。但是书中给出的有些示例是比较老的版本,有些类的用法已经更新了,我使用的是最新版的2.1.0,在此推荐大家参考官方文档:Apache Kafka

《kafka权威指南》

链接:https://pan.baidu.com/s/1_4u3C7D3dGpJCfqW0U5r0A 
提取码:aglc 

《kafka从入门到实践》

链接:https://pan.baidu.com/s/1-P0RfgBUIxVIL5TYjkr1xg 
提取码:nul7 

----------------------------------------------------------------------------------------------------------------------------------------------------------------------------

看了一些书和博客,刚开始觉得很多概念都很抽象,后来看的多了,融合了很多前辈的分析之后,有了自己的浅显理解,希望把一些抽象的概念转变为通俗易懂的描述,以助于理解。

1.Kafka Streams简介

kafka是一个用来构建流处理程序的java库,它基于kafka的分区水平扩展对数据进行管理。利用kafka的并发模型实现负载均衡。
2.kafka streams中一些概念

处理器拓扑:可以理解为是一个对数据进行处理的抽象流程图(有向无环图),下面这张图就是一个处理器拓扑,每一个圆圈代表一个处理器,其中有两个特殊的处理器:source processor和sink processor。中间的线段表示流,数据进来后,经过一个一个的圆圈(处理器)之后,将数据输出出去。

流:是一个无限的、不断更新的数据集,每一条记录都是以键值对的形式存在。可以理解为不间断的产生数据记录。

流处理器:我们接收到数据后,会对数据进行一些处理,而kafka streams这个类对我们常用的一些处理封装了一些方法,比如map()、filter()、join()等,我们可以直接调用这些方法对数据进行处理。每一个处理步骤都可以理解为一个处理器,如上图中的圆圈节点表示。数据经过一个处理器处理之后将数据流向下一个处理器。

source processor:从数据源(Topic)那里获取数据作为整个处理流程的入口。

sink processor:数据处理逻辑的终点,将处理结果发送到指定的Topic中。

kafka提供了两种定义流处理拓扑的API:

(1)Kafka Streams DSL API:这里面给我们提供了常用的一些方法,我们直接拿来用就行了,不需要进行深度的操作,比如map,filter,join等操作。

(2)Low-Level Processor API:如果上述提供的方法不能满足我们的业务需求,那么我们就用这个API来开发自己的处理逻辑,在这个API里面根据我们的业务需求自定义处理逻辑。这种方法由于是我们自定义的,所以需要我们去操作一些底层的处理,实现起来有些复杂。

KStream:是流的一种,正如上面提到的,流是以键值对的形式存在的,所以KStream中以键值对的形式记录着数据,但是它的特点是数据是源源不断地向里面追加,类似于数据库里面的Insert操作,只管往里里面加数据,每一条数据都是独立的,和其他数据没有关系。看名字也有助于我们理解,stream就是把它理解为不断地流入数据,不会对数据进行覆盖或更新。我们称这种为记录流,对,只管记录。

KTable:也是流的一种,以键值对的形式记录数据。但是它与KStream的主要区别在于这里面的记录是会根据key来不断更新的,类似于数据库中基于主键的数据更新。同一个Key的记录,只保持最新的一条。看名字中的table,就可以理解为它类似于一个有状态的表,只记录了最新的数据。我们称这种为更新日志流。嗯,数据是不断更新的。

KStream和KTable中的数据就是从Topic中获得的,然后经过处理器的操作之后再变成KStream或者KTable,也就是可以理解为这两者都是流,经过逻辑处理之后产生的还是KStream或者KTable,且这两者之间还可以进行转换。从下面这个图也可以看出他们的处理结果的差异:

窗口:就是按照时间把数据分成一个一个组。因为在有些应用场景中需要对数据按照时间划分来处理。窗口包含窗口大小和滑动步长。窗口大小就是把数据按照多长时间进行分组,也就是每次处理多长的数据。滑动步长就是每次向前移动多长时间的数据。其实通俗一点的理解就是,有一个固定长度的盒子(窗口大小),处理当前盒子中的数据,然后每次向前移动的长度就是步长,每次处理的数据多少就是固定的这个盒子的长度。看下图就好理解了:

窗口有不同的类型,在此不详细解释。

3.kafka streams的应用

3.1创建maven项目,引入kafka streams的依赖:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>www.wyhuii.com</groupId><artifactId>wyhuii</artifactId><version>0.0.1-SNAPSHOT</version><name>MyKafkaStreams</name><url>http://maven.apache.org</url><dependencies><!-- kafka --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.1.0</version></dependency><!-- kafka streams --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-streams</artifactId><version>2.1.0</version></dependency></dependencies>
</project>

3.2使用KafkaStreams实现单词计数

package teststreams;import java.util.Arrays;
import java.util.Properties;import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.state.KeyValueStore;/*** 使用KStream实现单词个数统计* @author wyhui*/
public class KStreamPrint {public static void main(String[] args) throws InterruptedException {//构造实例化KafkaStreams对象的配置Properties prop = new Properties();/*** 每个Streams应用程序必须要有一个应用ID,这个ID用于协调应用实例,也用于命名内部的本地存储和相关主题。* 对于同一个kafka集群里的每一个Streams应用来说,这个名字必须是唯一的。*/prop.put(StreamsConfig.APPLICATION_ID_CONFIG, "mywordcount");//名字随意起,但必须唯一prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.184.128:9092");//kafka服务器IP//在读写数据时,应用程序需要对消息进行序列化和反序列化,因此提供了序列化类和反序列化类。prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());/** 创建KStreamBuilder对象* KStreamBuilder builder = new KStreamBuilder();* 在网上参考的很多博客以及书籍都是用的这种方法来创建KStreamBuiler对象,从而得到KStream,* 但是在我编写代码时会发现这个类无法导入,后来看了官方文档后发现,我用的是最新版的kafka,里面的一些类更新了。* 所以在此参考kafka官方示例中给出的方法:创建StreamsBuilder,从而得到KStream。* 由此可见,还是官方文档给的示例最权威。*///创建StreamsBuilder对象StreamsBuilder builder = new StreamsBuilder();//以要处理的Topic作为参数使用builder的stram方法得到一个KStream流KStream<String, String> countStream = builder.stream("streamstopic-in");KTable<String, Long> wordcounts = countStream.flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split(" ")))//将topic中得到的流中的每一个值都变成小写然后以空格分割.groupBy((key, word) -> word)//分组.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"));//计数//将更新日志流转为记录流然后输出到另一个Topicwordcounts.toStream().to("streamstopic-out", Produced.with(Serdes.String(), Serdes.Long()));//基于拓扑和配置属性定义一个KafkaStreams对象KafkaStreams streams = new KafkaStreams(builder.build(), prop);//启动kafkastreams引擎streams.start();/*//一般情况下,Streams应用程序会一直运行下去,此处由于模拟测试,数据量少,我们就让线程休眠一段时间然后停止运行Thread.sleep(5000L);//停止运行streams.close();*/}}

在运行程序之前,我们先把服务启动,然后创建一下代码中提到的数据来源的topic(streamstopic-in)以及流处理之后的topic(streamstopic-out)。

3.2.2在streamstopic-in中生产几条消息:

3.2.3运行程序。

3.2.4使用下面的命令消费输出单词统计处理结果:

bin/kafka-console-consumer.sh --bootstrap-server 192.168.184.128:9092 
--from-beginning --topic streamstopic-out  
--formatter kafka.tools.DefaultMessageFormatter 
--property print.key=true 
--property print.value=true 
--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer 
--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

3.2.5回车之后,我们就可以看到结果:

以上就是对kafka streams的简单实现,在实现过程中遇到的一些问题请参考博客:kafka streams2.1.0官方wordcount代码实现过程中遇到的问题(NoSuchMethodError,UnsatisfiedLinkError)_QYHuiiQ的博客-CSDN博客

项目源码:https://github.com/wyhuiii/KafkaStreams-count