当前位置: 代码迷 >> 综合 >> Pulsar IO——Sink使用汇总
  详细解决方案

Pulsar IO——Sink使用汇总

热度:26   发布时间:2023-11-28 01:18:23.0

Apache Pulsar 是一个分布式的发布-订阅消息系统,sink 是 Pulsar 的一个组件,用于将数据导入至其他系统。

1、Create-创建 sink

$ bin/pulsar-admin sink create <options>

常用参数

  • -a,–archive : 指定 sink 的 NAR 包

  • –classname : 指定 sink 的类名称

  • -i,–inputs : 指定 sink 的 topic,多个 topic 用逗号隔开

  • –name : 指定 sink 的名称

  • –namespace : 指定 sink 的命名空间

  • –parallelism : 指定 sink 的并发数

  • –sink-config-file : 指定 sink 的 yaml 配置文件

  • –tenant : 指定 sink 的租户

实践
(1)创建 mysql-jdbc-sink,并指定 nar 文件、topic、名称、yaml 配置文件和并发数。

$ bin/pulsar-admin sink create \
--archive connectors/pulsar-io-jdbc-2.4.0.nar \
--inputs test-jdbc \
--name mysql-jdbc-sink \
--sink-config-file connectors/mysql-jdbc-sink.yaml \
--parallelism 1

(2)如果出现以下信息,则说明创建成功。

Created successfully

2、Update-更新 sink

$ bin/pulsar-admin sink update <options>

常用参数

  • -a,–archive : 指定 sink 的 NAR 包

  • –classname : 指定 sink 的类名称

  • -i,–inputs : 指定 sink 的 topic,多个 topic 用逗号隔开

  • –name : 指定 sink 的名称

  • –namespace : 指定 sink 的命名空间

  • –parallelism : 指定 sink 的并发数

  • –sink-config-file : 指定 sink 的 yaml 配置文件

  • –tenant : 指定 sink 的租户

实践
(1)将 parallelism 更新至 2。

$ bin/pulsar-admin sink update \
--name mysql-jdbc-sink \
--parallelism 2

(2)如果出现以下信息,则说明更新成功。

Updated successfully

(3)查看 mysql-jdbc-sink 的信息,再次验证更新结果。

$ bin/pulsar-admin sink get \
--tenant public \
--namespace default \
--name mysql-jdbc-sink

(4)Parallelism 为 2,说明已更新成功。

{
    "tenant": "public","namespace": "default","name": "mysql-jdbc-sink","className": "org.apache.pulsar.io.jdbc.JdbcAutoSchemaSink","inputSpecs": {
    "test-jdbc": {
    "isRegexPattern": false}},"configs": {
    "password": "jdbc","jdbcUrl": "jdbc:mysql://127.0.0.1:3306/test_jdbc","userName": "root","tableName": "test_jdbc"},"parallelism": 2,"processingGuarantees": "ATLEAST_ONCE","retainOrdering": false,"autoAck": true
}

3、Delete-删除 sink

$ bin/pulsar-admin sink delete <options>

常用参数

  • –name : 指定 sink 的名称

  • –namespace : 指定 sink 的命名空间

  • –tenant : 指定 sink 的租户

实践
(1)删除 mysql-jdbc-sink。

$ bin/pulsar-admin sink delete \
--tenant public \
--namespace default \
--name mysql-jdbc-sink

(2)如果出现以下信息,则说明删除成功。

Deleted successfully

(3)查看 mysql-jdbc-sink 的信息,再次验证删除结果。

$ bin/pulsar-admin sink get \
--tenant public \
--namespace default \
--name mysql-jdbc-sink

(4)mysql-jdbc-sink 不存在,说明已删除成功。

HTTP 404 Not FoundReason: Sink mysql-jdbc-sink doesn't exist

4、List-显示所有 sink

$ bin/pulsar-admin sink list <options>

常用参数

  • –namespace : 指定 sink 的命名空间

  • –tenant : 指定 sink 的租户

实践
(1)显示所有 sink。

$ bin/pulsar-admin sink list \
--tenant public \
--namespace default

(2)返回结果显示前文创建的 mysql-jdbc-sink。

["mysql-jdbc-sink"
]

5、Get-显示 sink 的信息

$ bin/pulsar-admin sink get <options>

常用参数

  • –name : 指定 sink 的名称

  • –namespace : 指定 sink 的命名空间

  • –tenant : 指定 sink 的租户

实践

(1)显示 sink 的信息。

$ bin/pulsar-admin sink get \
--tenant public \
--namespace default \
--name mysql-jdbc-sink

(2)返回结果显示 mysql-jdbc-sink 的信息,包括租户、命名空间和名称等。

{
    "tenant": "public","namespace": "default","name": "mysql-jdbc-sink","className": "org.apache.pulsar.io.jdbc.JdbcAutoSchemaSink","inputSpecs": {
    "test-jdbc": {
    "isRegexPattern": false}},"configs": {
    "password": "jdbc","jdbcUrl": "jdbc:mysql://127.0.0.1:3306/test_jdbc","userName": "root","tableName": "test_jdbc"},"parallelism": 1,"processingGuarantees": "ATLEAST_ONCE","retainOrdering": false,"autoAck": true
}

6、Status-显示 sink 的状态

$ bin/pulsar-admin sink status <options>

常用参数

  • –instance-id : 指定 sink 的实例 ID

  • 如果未指定,则获取所有实例的状态

  • –name : 指定 sink 的名称

  • –namespace : 指定 sink 的命名空间

  • –tenant : 指定 sink 的租户

实践

(1)显示 mysql-jdbc-sink 的状态。

$ bin/pulsar-admin sink status \
--tenant public \
--namespace default \
--name mysql-jdbc-sink

(2)返回结果显示 mysql-jdbc-sink 的状态信息,包括实例数量、是否正在运行和 worker ID 等。

{
    "numInstances" : 1,"numRunning" : 1,"instances" : [ {
    "instanceId" : 0,"status" : {
    "running" : true,"error" : "","numRestarts" : 0,"numReadFromPulsar" : 0,"numSystemExceptions" : 0,"latestSystemExceptions" : [ ],"numSinkExceptions" : 0,"latestSinkExceptions" : [ ],"numWrittenToSink" : 0,"lastReceivedTime" : 0,"workerId" : "c-standalone-fw-tengdeMBP.lan-8080"}} ]
}

7、Stop-停止 sink

$ bin/pulsar-admin sink stop <options>

常用参数

  • –instance-id : 指定 sink 的实例 ID

  • 如果未指定,则停止所有实例的状态

  • –name : 指定 sink 的名称

  • –namespace : 指定 sink 的命名空间

  • –tenant : 指定 sink 的租户

实践
(1)停止 mysql-jdbc-sink。

$ bin/pulsar-admin sink stop \
--tenant public \
--namespace default \
--name mysql-jdbc-sink \
--instance-id 0

(2)如果出现以下信息,则说明停止成功。

Stopped successfully

8、 Start-启动 sink

$ bin/pulsar-admin sink start <options>

常用参数

  • –instance-id : 指定 sink 的实例 ID
    如果未指定,则启动所有实例

  • –name : 指定 sink 的名称

  • –namespace : 指定 sink 的命名空间

  • –tenant : 指定 sink 的租户

实践

(1)启动 mysql-jdbc-sink。

$ bin/pulsar-admin sink start \
--tenant public \
--namespace default \
--name mysql-jdbc-sink \
--instance-id 0

(2)如果出现以下信息,则说明启动成功。

Started successfully

9、Restart-重启 sink

$ bin/pulsar-admin sink restart <options>

常用参数

  • –instance-id : 指定 sink 的实例 ID

  • 如果未指定,则获取所有实例的状态

  • –name : 指定 sink 的名称

  • –namespace : 指定 sink 的命名空间

  • –tenant : 指定 sink 的租户

实践

(1)重启 mysql-jdbc-sink。

$ bin/pulsar-admin sink restart \
--tenant public \
--namespace default \
--name mysql-jdbc-sink \
--instance-id 0

(2)如果出现以下信息,则说明重启成功。

Restarted successfully

10、Localrun-本地运行

在本地运行一个 Pulsar IO sink connector,方便调试。

$ bin/pulsar-admin sink localrun <options>

常用参数

  • -a,–archive : 指定 source 的 NAR 包

  • –classname : 指定 sink 的类名称

  • -i,–inputs : 指定 sink 的 topic,多个 topic 用逗号隔开

  • –name : 指定 sink 的名称

  • –namespace : 指定 sink 的命名空间

  • –parallelism : 指定 sink 的并发数

  • –sink-config-file : 指定 sink 的 yaml 配置文件

  • –tenant : 指定 sink 的租户

摘自
https://mp.weixin.qq.com/s?__biz=MzUyMjkzMjA1Ng%3D%3D&idx=1&mid=2247484210&scene=21&sn=786f607bbb3ab21bfa639bb619f3e566#wechat_redirect

  相关解决方案