1. sarama 是什么?
sarama 的出现意味着,golang 有完整的 apache kafka 客户端文档的全部用例了
2. 为什么那么受欢迎?
他完整,兼容性好
不需要 CGO 的支持
支持的 golang 版本从 1.15 到目前最新的 1.16
支持的 kafka 版本横跨 2.6 到 2.8,并且之前发布的老版本依然支持
3. 怎么使用
下载扩展库 go get github.com/Shopify/sarama
即可。
安装完首先在 test 中写一个队列任务发布者:kafka 的集群,自己用 docker 在 wsl 下搭建就行了,网上例子非常多,这里就不详细解答了
请看我代码逐行解释:
package testerimport ("fmt""github.com/Shopify/sarama""log""os""testing""time"
)//集群地址
var address = []string{"192.168.83.89:19092","192.168.83.89:29092","192.168.83.89:39092"}//创建kafka任务发布者
func TestKafkaProducter(t *testing.T) {//配置发布者config := sarama.NewConfig()//确认返回,记得一定要写,因为本次例子我用的是同步发布者config.Producer.Return.Successes = true//设置超时时间 这个超时时间一旦过期,新的订阅者在这个超时时间后才创建的,就不能订阅到消息了config.Producer.Timeout = 5 * time.Second//连接发布者,并创建发布者实例p, err := sarama.NewSyncProducer(address, config)if err != nil {log.Printf("sarama.NewSyncProducer err, message=%s \n", err)return}//程序退出时释放资源defer p.Close()//设置一个逻辑上的分区名,叫安彦飞topic := "anyanfei"//这个是发布的内容srcValue := "sync: this is a message. index=%d"//发布者循环发送0-9的消息内容for i:=0; i<10; i++ {value := fmt.Sprintf(srcValue, i)//创建发布者消息体msg := &sarama.ProducerMessage{Topic:topic,Value:sarama.ByteEncoder(value),}//发送消息并返回消息所在的物理分区和偏移量partition, offset, err := p.SendMessage(msg)if err != nil {log.Printf("send message(%s) err=%s \n", value, err)}else {fmt.Fprintf(os.Stdout, value + "发送成功,partition=%d, offset=%d \n", partition, offset)}time.Sleep(500*time.Millisecond)}
}
我们开始来创建订阅者,俗称消费者:同样是在 tester 包下进行编写,测试用嘛(一定要注意,实际项目中不能用 test,test 默认超时时间是 10 分钟,如果有测试用例一直占用且超过 10 分钟会发生 panic)
package testerimport ("context""fmt""github.com/Shopify/sarama""log""os""os/signal""strings""sync""syscall""testing""time"
)...发布者的代码块...func TestKafkaConsumer(t *testing.T) {newKafkaConsumer()
}//开始创建kafka订阅者
func newKafkaConsumer(){/**group:设置订阅者群 如果多个订阅者group一样,则随机挑一个进行消费,当然也可以设置轮训,在设置里面修改;若多个订阅者的group不同,则一旦发布者发布消息,所有订阅者都会订阅到同样的消息;topics:逻辑分区必须与发布者相同,还是用安彦飞,不然找不到内容咯当然订阅者是可以订阅多个逻辑分区的,只不过因为演示方便我写了一个,你可以用英文逗号分割在这里写多个*/var (group = "Consumer1" topics = "anyanfei")log.Println("Starting a new Sarama consumer")//配置订阅者config := sarama.NewConfig()//配置偏移量config.Consumer.Offsets.Initial = sarama.OffsetNewest//开始创建订阅者consumer := Consumer{ready: make(chan bool),}//创建一个上下文对象,实际项目中也一定不要设置超时(当然,按你项目需求,我是没见过有项目需求要多少时间后取消订阅的)ctx, cancel := context.WithCancel(context.Background())//创建订阅者群,集群地址发布者代码里已定义client, err := sarama.NewConsumerGroup(address, group, config)if err != nil {log.Panicf("Error creating consumer group client: %v", err)}//创建同步组var wg sync.WaitGroupwg.Add(1)go func() {defer wg.Done()for {/**官方说:`订阅者`应该在无限循环内调用当`发布者`发生变化时需要重新创建`订阅者`会话以获得新的声明所以这里把订阅者放在了循环体内*/if err := client.Consume(ctx, strings.Split(topics, ","), &consumer); err != nil {log.Panicf("Error from consumer: %v", err)}// 检查上下文是否被取消,收到取消信号应当立刻在本协程中取消循环if ctx.Err() != nil {return}//获取订阅者准备就绪信号consumer.ready = make(chan bool)}}()<-consumer.ready // 获取到了订阅者准备就绪信号后打印下面的话log.Println("Sarama consumer up and running!...")//golang优雅退出的信号通道创建sigterm := make(chan os.Signal, 1)//golang优雅退出的信号获取signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)//创建选择器,如果不是上下文取消或者用户ctrl+c这种系统级退出,则就不向下执行了select {case <-ctx.Done():log.Println("terminating: context cancelled")case <-sigterm:log.Println("terminating: via signal")}//取消上下文cancel()wg.Wait()//关闭客户端if err = client.Close(); err != nil {log.Panicf("Error closing client: %v", err)}
}//重写订阅者,并重写订阅者的所有方法
type Consumer struct {ready chan bool
}// Setup方法在新会话开始时运行的,然后才使用声明
func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error {// Mark the consumer as readyclose(consumer.ready)return nil
}// 一旦所有的订阅者协程都退出,Cleaup方法将在会话结束时运行
func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error {return nil
}// 订阅者在会话中消费消息,并标记当前消息已经被消费。
func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {for message := range claim.Messages() {log.Printf("Message claimed: value = %s, timestamp = %v, topic = %s", string(message.Value), message.Timestamp, message.Topic)session.MarkMessage(message, "")}return nil
}
下面开始执行:打开两个订阅者挂起:
订阅者 1:
PS E:\go_code\src\go-web-demo\tester> go test -v -run TestKafkaConsumer
=== RUN TestKafkaConsumer
2021/06/20 16:41:51 Starting a new Sarama consumer
2021/06/20 16:41:52 Sarama consumer up and running!...
2021/06/20 16:41:59 Message claimed: value = sync: this is a message. index=0, timestamp = 2021-06-20 16:41:58.975 +0800 CST, topic = anyanfei
2021/06/20 16:41:59 Message claimed: value = sync: this is a message. index=1, timestamp = 2021-06-20 16:41:59.56 +0800 CST, topic = anyanfei
2021/06/20 16:42:00 Message claimed: value = sync: this is a message. index=2, timestamp = 2021-06-20 16:42:00.152 +0800 CST, topic = anyanfei
2021/06/20 16:42:00 Message claimed: value = sync: this is a message. index=3, timestamp = 2021-06-20 16:42:00.698 +0800 CST, topic = anyanfei
2021/06/20 16:42:01 Message claimed: value = sync: this is a message. index=4, timestamp = 2021-06-20 16:42:01.251 +0800 CST, topic = anyanfei
2021/06/20 16:42:01 Message claimed: value = sync: this is a message. index=5, timestamp = 2021-06-20 16:42:01.793 +0800 CST, topic = anyanfei
2021/06/20 16:42:02 Message claimed: value = sync: this is a message. index=6, timestamp = 2021-06-20 16:42:02.379 +0800 CST, topic = anyanfei
2021/06/20 16:42:03 Message claimed: value = sync: this is a message. index=7, timestamp = 2021-06-20 16:42:02.929 +0800 CST, topic = anyanfei
2021/06/20 16:42:03 Message claimed: value = sync: this is a message. index=8, timestamp = 2021-06-20 16:42:03.491 +0800 CST, topic = anyanfei
2021/06/20 16:42:04 Message claimed: value = sync: this is a message. index=9, timestamp = 2021-06-20 16:42:04.03 +0800 CST, topic = anyanfei
订阅者 2:
PS E:\go_code\src\go-web-demo\tester> go test -v -run TestKafkaConsumer
=== RUN TestKafkaConsumer
2021/06/20 16:41:23 Starting a new Sarama consumer
2021/06/20 16:41:24 Sarama consumer up and running!...
2021/06/20 16:41:59 Message claimed: value = sync: this is a message. index=0, timestamp = 2021-06-20 16:41:58.975 +0800 CST, topic = anyanfei
2021/06/20 16:41:59 Message claimed: value = sync: this is a message. index=1, timestamp = 2021-06-20 16:41:59.56 +0800 CST, topic = anyanfei
2021/06/20 16:42:00 Message claimed: value = sync: this is a message. index=2, timestamp = 2021-06-20 16:42:00.152 +0800 CST, topic = anyanfei
2021/06/20 16:42:00 Message claimed: value = sync: this is a message. index=3, timestamp = 2021-06-20 16:42:00.698 +0800 CST, topic = anyanfei
2021/06/20 16:42:01 Message claimed: value = sync: this is a message. index=4, timestamp = 2021-06-20 16:42:01.251 +0800 CST, topic = anyanfei
2021/06/20 16:42:01 Message claimed: value = sync: this is a message. index=5, timestamp = 2021-06-20 16:42:01.793 +0800 CST, topic = anyanfei
2021/06/20 16:42:02 Message claimed: value = sync: this is a message. index=6, timestamp = 2021-06-20 16:42:02.379 +0800 CST, topic = anyanfei
2021/06/20 16:42:03 Message claimed: value = sync: this is a message. index=7, timestamp = 2021-06-20 16:42:02.929 +0800 CST, topic = anyanfei
2021/06/20 16:42:03 Message claimed: value = sync: this is a message. index=8, timestamp = 2021-06-20 16:42:03.491 +0800 CST, topic = anyanfei
2021/06/20 16:42:04 Message claimed: value = sync: this is a message. index=9, timestamp = 2021-06-20 16:42:04.03 +0800 CST, topic = anyanfei
一开始是没有消息的,会阻塞在 Sarama consumer up and running!...
直到我开启了发布者,才会产生上述订阅者的内容:
PS E:\go_code\src\go-web-demo\tester> go test -v -run TestKafkaProducter
sync: this is a message. index=0发送成功,partition=0, offset=12
sync: this is a message. index=1发送成功,partition=4, offset=13
sync: this is a message. index=2发送成功,partition=7, offset=12
sync: this is a message. index=3发送成功,partition=4, offset=14
sync: this is a message. index=4发送成功,partition=7, offset=13
sync: this is a message. index=5发送成功,partition=5, offset=8
sync: this is a message. index=6发送成功,partition=4, offset=15
sync: this is a message. index=7发送成功,partition=1, offset=9
sync: this is a message. index=8发送成功,partition=1, offset=10
sync: this is a message. index=9发送成功,partition=4, offset=16
总结
Shopify/sarama 这个库用来做 kafka 的 golang 客户端库,我给你说,这针不戳~~~
以上所有内容均采用最新官方案例做示例
参考资料
https://github.com/Shopify/sarama
还想了解更多吗?
更多请查看:https://github.com/Shopify/sarama
欢迎加入我们GOLANG中国社区:https://gocn.vip/
《酷Go推荐》招募:
各位Gopher同学,最近我们社区打算推出一个类似GoCN每日新闻的新栏目《酷Go推荐》,主要是每周推荐一个库或者好的项目,然后写一点这个库使用方法或者优点之类的,这样可以真正的帮助到大家能够学习到
新的库,并且知道怎么用。
大概规则和每日新闻类似,如果报名人多的话每个人一个月轮到一次,欢迎大家报名!(报名地址:https://wj.qq.com/s2/7734329/3f51)
扫码也可以加入 GoCN 的大家族哟~
Gopher China2021大会日程详情来了!
点击下方「阅读原文」即可报名参加大会