当前位置: 代码迷 >> 综合 >> Pulsar IO——Source使用汇总
  详细解决方案

Pulsar IO——Source使用汇总

热度:56   发布时间:2023-11-28 01:18:39.0

Apache Pulsar 是一个分布式发布订阅的消息系统。Source 是 Pulsar 的一个组件,用来将其他系统的数据输入至 Pulsar。本文介绍 Apache Pulsar Source 的基础知识,例如,Source 的常用命令使用示例。

1、Source 常用命令

1.1、Create-创建 source

常用参数

  • archive: 指定 source 的 NAR 包

  • classname: 指定 source 的类名称

  • destination-topic-name: 指定目标 topic 名称

  • name: 指定 source 的名称

  • namespace: 指定 source 的命名空间

  • parallelism: 指定 source 的并发数

  • source-config-file: 指定 source 使用的配置文件

  • tenant: 指定 source 所属的租户

在租户 public 和命名空间 default 下,创建名为 kafka 的 source。

./bin/pulsar-admin source create \
--archive ./pulsar-io-kafka-2.4.0.nar \
--classname org.apache.pulsar.io.kafka.KafkaBytesSource \
--tenant public \
--namespace default \
--name kafka \
--destination-topic-name my-topic \
--source-config-file ./conf/kafkaSourceConfig.yaml \
--parallelism 1

返回结果

"Created successfully"

如果命令行窗口显示以上信息,说明创建成功。

1.2、Update-更新 source

常用参数

  • archive: 指定 source 的 NAR 包

  • classname: 指定 source 的类名称

  • destination-topic-name: 指定目标 topic 名称

  • name: 指定 source 的名称

  • namespace: 指定 source 的命名空间

  • parallelism: 指定 source 的并发数

  • source-config-file: 指定 source 使用的配置文件

  • tenant: 指定 source 所属的租户

更新租户 public 命名空间 default 下面名称为 kafka 的 source。

./bin/pulsar-admin source update \
--archive ./pulsar-io-kafka-2.4.0.nar \
--classname org.apache.pulsar.io.kafka.KafkaBytesSource \
--tenant public \
--namespace default \
--name kafka \
--destination-topic-name my-topic \
--source-config-file ./conf/kafkaSourceConfig.yaml \
--parallelism 1 \
--cpu 2

返回结果

"Updated successfully"
./bin/pulsar-admin source get \
--tenant public \
--namespace default \
--name kafka

返回结果

{
    "tenant": "public","namespace": "default","name": "kafka","className": "org.apache.pulsar.io.kafka.KafkaBytesSource","topicName": "my-topic","configs": {
    "bootstrapServers": "pulsar-kafka:9092","groupId": "test-pulsar-io1","topic": "my-topic","sessionTimeoutMs": "10000","autoCommitEnabled": "false"},"parallelism": 1,"processingGuarantees": "ATLEAST_ONCE","resources": {
    "cpu": 2.0,"ram": 1073741824,"disk": 10737418240}
}

以上示例成功更新了CPU。

1.3、Delete-删除 source

常用参数

  • name: 指定 source 的名称

  • namespace: 指定 source 的命名空间

  • tenant: 指定 source 所属的租户

删除租户 public 命名空间 default 下面名称为 kafka 的 source。

./bin/pulsar-admin source delete --tenant public --namespace default --name kafka
"Delete source successfully"
./bin/pulsar-admin source get --tenant public --namespace default --name kafka
HTTP 404 Not Found
Reason: Source kafka doesn't exist

1.4、Start-启动 source

常用参数

  • name: 指定 source 的名称

  • namespace: 指定 source 的命名空间

  • tenant: 指定 source 所属的租户

  • instance-id: 指定 source 的 instance-id,如果未指定,将启动所有实例

启动租户 public 命名空间 default 下面名称为 kafka 的 source。

./bin/pulsar-admin source start \
--tenant public \
--namespace default \
--name kafka \
--instance-id 0

返回结果

Started successfully

1.5、Stop-停止 source

常用参数

  • name: 指定 source 的名称

  • namespace: 指定 source 的命名空间

  • tenant: 指定 source 所属的租户

  • instance-id: 指定 source 的 instance-id,如果未指定,将停止所有实例

停止租户 public 命名空间 default 下面名称为 kafka 的 source。

./bin/pulsar-admin source stop \
--tenant public \
--namespace default \
--name kafka \
--instance-id 0

返回结果

Stopped successfully

1.6、Get-获取 source 信息

常用参数

  • name: 指定 source 的名称

  • namespace: 指定 source 的命名空间

  • tenant: 指定 source 所属的租户

获取名称为 kafka 的 source 的信息。

./bin/pulsar-admin source get \
--tenant public \
--namespace default \
--name kafka

返回结果

{
    "tenant": "public","namespace": "default","name": "kafka","className": "org.apache.pulsar.io.kafka.KafkaBytesSource","topicName": "my-topic","configs": {
    "bootstrapServers": "pulsar-kafka:9092","groupId": "test-pulsar-io1","topic": "my-topic","sessionTimeoutMs": "10000","autoCommitEnabled": "false"},"parallelism": 1,"processingGuarantees": "ATLEAST_ONCE"
}

以上显示了刚才创建的 source 信息,包括租户、 namespace 、 名称、类名称、所在机器等。

1.7、Status-检查 source 状态

常用参数

  • name: 指定 source 的名称

  • namespace: 指定 source 的命名空间

  • tenant: 指定 source 所属的租户

  • instance-id: 指定 source 的 instance-id,如果未指定,将获取所有实例状态

获取名称为 kafka 的 source 的运行状态。

./bin/pulsar-admin source status \
--tenant public \
--namespace default \
--name kafka

返回结果

{
    "numInstances" : 1,"numRunning" : 1,"instances" : [ {
    "instanceId" : 0,"status" : {
    "running" : true,"error" : "","numRestarts" : 0,"numReceivedFromSource" : 0,"numSystemExceptions" : 0,"latestSystemExceptions" : [ ],"numSourceExceptions" : 0,"latestSourceExceptions" : [ ],"numWritten" : 0,"lastReceivedTime" : 0,"workerId" : "c-standalone-fw-7e0cf1b3bf9d-8080"}} ]
}

以上展示了 source 的实例信息,包括是否正在运行、实例 id、workId 等。

1.8、List-列出所有 source 信息

常用参数

  • namespace: 指定 source 的命名空间

  • tenant: 指定 source 所属的租户

显示租户为 public、命名空间为 default 的 source。

./bin/pulsar-admin source list \
--tenant public \
--namespace default

返回结果

["kafka"
]

1.9、Restart-重启 source

常用参数

  • name: 指定 source 的名称

  • namespace: 指定 source 的命名空间

  • tenant: 指定 source 所属的租户

  • instance-id: 指定 source 的 instance-id,如果未指定,将重启所有实例

重启租户 public 命名空间 default 下面名称为 kafka 的 source。

./bin/pulsar-admin source restart \
--tenant public \
--namespace default \
--name kafka \
--instance-id 0

返回结果

Restarted successfully

1.10、Localrun-在本地运行 source

常用参数

  • archive: 指定 source 的 NAR 包

  • classname: 指定 source 的类名称

  • destination-topic-name: 指定目标 topic 名称

  • name: 指定 source 的名称

  • namespace: 指定 source 的命名空间

  • parallelism: 指定 source 的并发数

  • source-config-file: 指定 source 使用的配置文件

  • tenant: 指定 source 所属的租户

摘自
https://mp.weixin.qq.com/s?__biz=MzUyMjkzMjA1Ng%3D%3D&idx=1&mid=2247484185&scene=21&sn=8293ed7c887190447c3e0c25e0516be5#wechat_redirect

2、Demo

Pulsar2.7,kafka-0.11.0.1

下载pulsar-io-kafka-2.7.0.nar

https://archive.apache.org/dist/pulsar/pulsar-2.7.0/connectors/pulsar-io-kafka-2.7.0.nar

下载kafka_2.11-0.11.0.1.jar

从安装好的{kafka_home}/libs下获取

配置kafkaSourceConfig.yaml

vim {
    Pulsar_Home}/conf/kafkaSourceConfig.yaml
configs:bootstrapServers: "kafka_ip:9092"groupId: "test-pulsar-io"topic: "test"sessionTimeoutMs: "10000"autoCommitEnabled: "false"

创建Source

./bin/pulsar-admin source create \
--archive ./connnector/pulsar-io-kafka-2.7.0.nar \
--classname org.apache.pulsar.io.kafka.KafkaBytesSource \
--tenant public \
--namespace default \
--name kafka \
--destination-topic-name test \
--source-config-file ./conf/kafkaSourceConfig.yaml \
--parallelism 1

create后查看source的状态,发现已经启动了

./bin/pulsar-admin source status \
> --tenant public \
> --namespace default \
> --name kafka

在这里插入图片描述

开启pulsar consumer终端

开启pulsar consumer等待msg中。。。

bin/pulsar-client consume \
persistent://public/default/test \
-n 100 \
-s "consumer-test" \
-t "Exclusive"

kafka端produce msg

在这里插入图片描述

观察pulsar端consume msg

在这里插入图片描述

  相关解决方案