Apache Pulsar 是一个分布式发布订阅的消息系统。Source 是 Pulsar 的一个组件,用来将其他系统的数据输入至 Pulsar。本文介绍 Apache Pulsar Source 的基础知识,例如,Source 的常用命令使用示例。
1、Source 常用命令
1.1、Create-创建 source
常用参数
-
archive: 指定 source 的 NAR 包
-
classname: 指定 source 的类名称
-
destination-topic-name: 指定目标 topic 名称
-
name: 指定 source 的名称
-
namespace: 指定 source 的命名空间
-
parallelism: 指定 source 的并发数
-
source-config-file: 指定 source 使用的配置文件
-
tenant: 指定 source 所属的租户
在租户 public 和命名空间 default 下,创建名为 kafka 的 source。
./bin/pulsar-admin source create \
--archive ./pulsar-io-kafka-2.4.0.nar \
--classname org.apache.pulsar.io.kafka.KafkaBytesSource \
--tenant public \
--namespace default \
--name kafka \
--destination-topic-name my-topic \
--source-config-file ./conf/kafkaSourceConfig.yaml \
--parallelism 1
返回结果
"Created successfully"
如果命令行窗口显示以上信息,说明创建成功。
1.2、Update-更新 source
常用参数
-
archive: 指定 source 的 NAR 包
-
classname: 指定 source 的类名称
-
destination-topic-name: 指定目标 topic 名称
-
name: 指定 source 的名称
-
namespace: 指定 source 的命名空间
-
parallelism: 指定 source 的并发数
-
source-config-file: 指定 source 使用的配置文件
-
tenant: 指定 source 所属的租户
更新租户 public 命名空间 default 下面名称为 kafka 的 source。
./bin/pulsar-admin source update \
--archive ./pulsar-io-kafka-2.4.0.nar \
--classname org.apache.pulsar.io.kafka.KafkaBytesSource \
--tenant public \
--namespace default \
--name kafka \
--destination-topic-name my-topic \
--source-config-file ./conf/kafkaSourceConfig.yaml \
--parallelism 1 \
--cpu 2
返回结果
"Updated successfully"
./bin/pulsar-admin source get \
--tenant public \
--namespace default \
--name kafka
返回结果
{
"tenant": "public","namespace": "default","name": "kafka","className": "org.apache.pulsar.io.kafka.KafkaBytesSource","topicName": "my-topic","configs": {
"bootstrapServers": "pulsar-kafka:9092","groupId": "test-pulsar-io1","topic": "my-topic","sessionTimeoutMs": "10000","autoCommitEnabled": "false"},"parallelism": 1,"processingGuarantees": "ATLEAST_ONCE","resources": {
"cpu": 2.0,"ram": 1073741824,"disk": 10737418240}
}
以上示例成功更新了CPU。
1.3、Delete-删除 source
常用参数
-
name: 指定 source 的名称
-
namespace: 指定 source 的命名空间
-
tenant: 指定 source 所属的租户
删除租户 public 命名空间 default 下面名称为 kafka 的 source。
./bin/pulsar-admin source delete --tenant public --namespace default --name kafka
"Delete source successfully"
./bin/pulsar-admin source get --tenant public --namespace default --name kafka
HTTP 404 Not Found
Reason: Source kafka doesn't exist
1.4、Start-启动 source
常用参数
-
name: 指定 source 的名称
-
namespace: 指定 source 的命名空间
-
tenant: 指定 source 所属的租户
-
instance-id: 指定 source 的 instance-id,如果未指定,将启动所有实例
启动租户 public 命名空间 default 下面名称为 kafka 的 source。
./bin/pulsar-admin source start \
--tenant public \
--namespace default \
--name kafka \
--instance-id 0
返回结果
Started successfully
1.5、Stop-停止 source
常用参数
-
name: 指定 source 的名称
-
namespace: 指定 source 的命名空间
-
tenant: 指定 source 所属的租户
-
instance-id: 指定 source 的 instance-id,如果未指定,将停止所有实例
停止租户 public 命名空间 default 下面名称为 kafka 的 source。
./bin/pulsar-admin source stop \
--tenant public \
--namespace default \
--name kafka \
--instance-id 0
返回结果
Stopped successfully
1.6、Get-获取 source 信息
常用参数
-
name: 指定 source 的名称
-
namespace: 指定 source 的命名空间
-
tenant: 指定 source 所属的租户
获取名称为 kafka 的 source 的信息。
./bin/pulsar-admin source get \
--tenant public \
--namespace default \
--name kafka
返回结果
{
"tenant": "public","namespace": "default","name": "kafka","className": "org.apache.pulsar.io.kafka.KafkaBytesSource","topicName": "my-topic","configs": {
"bootstrapServers": "pulsar-kafka:9092","groupId": "test-pulsar-io1","topic": "my-topic","sessionTimeoutMs": "10000","autoCommitEnabled": "false"},"parallelism": 1,"processingGuarantees": "ATLEAST_ONCE"
}
以上显示了刚才创建的 source 信息,包括租户、 namespace 、 名称、类名称、所在机器等。
1.7、Status-检查 source 状态
常用参数
-
name: 指定 source 的名称
-
namespace: 指定 source 的命名空间
-
tenant: 指定 source 所属的租户
-
instance-id: 指定 source 的 instance-id,如果未指定,将获取所有实例状态
获取名称为 kafka 的 source 的运行状态。
./bin/pulsar-admin source status \
--tenant public \
--namespace default \
--name kafka
返回结果
{
"numInstances" : 1,"numRunning" : 1,"instances" : [ {
"instanceId" : 0,"status" : {
"running" : true,"error" : "","numRestarts" : 0,"numReceivedFromSource" : 0,"numSystemExceptions" : 0,"latestSystemExceptions" : [ ],"numSourceExceptions" : 0,"latestSourceExceptions" : [ ],"numWritten" : 0,"lastReceivedTime" : 0,"workerId" : "c-standalone-fw-7e0cf1b3bf9d-8080"}} ]
}
以上展示了 source 的实例信息,包括是否正在运行、实例 id、workId 等。
1.8、List-列出所有 source 信息
常用参数
-
namespace: 指定 source 的命名空间
-
tenant: 指定 source 所属的租户
显示租户为 public、命名空间为 default 的 source。
./bin/pulsar-admin source list \
--tenant public \
--namespace default
返回结果
["kafka"
]
1.9、Restart-重启 source
常用参数
-
name: 指定 source 的名称
-
namespace: 指定 source 的命名空间
-
tenant: 指定 source 所属的租户
-
instance-id: 指定 source 的 instance-id,如果未指定,将重启所有实例
重启租户 public 命名空间 default 下面名称为 kafka 的 source。
./bin/pulsar-admin source restart \
--tenant public \
--namespace default \
--name kafka \
--instance-id 0
返回结果
Restarted successfully
1.10、Localrun-在本地运行 source
常用参数
-
archive: 指定 source 的 NAR 包
-
classname: 指定 source 的类名称
-
destination-topic-name: 指定目标 topic 名称
-
name: 指定 source 的名称
-
namespace: 指定 source 的命名空间
-
parallelism: 指定 source 的并发数
-
source-config-file: 指定 source 使用的配置文件
-
tenant: 指定 source 所属的租户
摘自
https://mp.weixin.qq.com/s?__biz=MzUyMjkzMjA1Ng%3D%3D&idx=1&mid=2247484185&scene=21&sn=8293ed7c887190447c3e0c25e0516be5#wechat_redirect
2、Demo
Pulsar2.7,kafka-0.11.0.1
下载pulsar-io-kafka-2.7.0.nar
https://archive.apache.org/dist/pulsar/pulsar-2.7.0/connectors/pulsar-io-kafka-2.7.0.nar
下载kafka_2.11-0.11.0.1.jar
从安装好的{kafka_home}/libs下获取
配置kafkaSourceConfig.yaml
vim {
Pulsar_Home}/conf/kafkaSourceConfig.yaml
configs:bootstrapServers: "kafka_ip:9092"groupId: "test-pulsar-io"topic: "test"sessionTimeoutMs: "10000"autoCommitEnabled: "false"
创建Source
./bin/pulsar-admin source create \
--archive ./connnector/pulsar-io-kafka-2.7.0.nar \
--classname org.apache.pulsar.io.kafka.KafkaBytesSource \
--tenant public \
--namespace default \
--name kafka \
--destination-topic-name test \
--source-config-file ./conf/kafkaSourceConfig.yaml \
--parallelism 1
create后查看source的状态,发现已经启动了
./bin/pulsar-admin source status \
> --tenant public \
> --namespace default \
> --name kafka
开启pulsar consumer终端
开启pulsar consumer等待msg中。。。
bin/pulsar-client consume \
persistent://public/default/test \
-n 100 \
-s "consumer-test" \
-t "Exclusive"