Apache Pulsar 是一个分布式的发布-订阅消息系统,sink 是 Pulsar 的一个组件,用于将数据导入至其他系统。
1、Create-创建 sink
$ bin/pulsar-admin sink create <options>
常用参数
-
-a,–archive : 指定 sink 的 NAR 包
-
–classname : 指定 sink 的类名称
-
-i,–inputs : 指定 sink 的 topic,多个 topic 用逗号隔开
-
–name : 指定 sink 的名称
-
–namespace : 指定 sink 的命名空间
-
–parallelism : 指定 sink 的并发数
-
–sink-config-file : 指定 sink 的 yaml 配置文件
-
–tenant : 指定 sink 的租户
实践
(1)创建 mysql-jdbc-sink,并指定 nar 文件、topic、名称、yaml 配置文件和并发数。
$ bin/pulsar-admin sink create \
--archive connectors/pulsar-io-jdbc-2.4.0.nar \
--inputs test-jdbc \
--name mysql-jdbc-sink \
--sink-config-file connectors/mysql-jdbc-sink.yaml \
--parallelism 1
(2)如果出现以下信息,则说明创建成功。
Created successfully
2、Update-更新 sink
$ bin/pulsar-admin sink update <options>
常用参数
-
-a,–archive : 指定 sink 的 NAR 包
-
–classname : 指定 sink 的类名称
-
-i,–inputs : 指定 sink 的 topic,多个 topic 用逗号隔开
-
–name : 指定 sink 的名称
-
–namespace : 指定 sink 的命名空间
-
–parallelism : 指定 sink 的并发数
-
–sink-config-file : 指定 sink 的 yaml 配置文件
-
–tenant : 指定 sink 的租户
实践
(1)将 parallelism 更新至 2。
$ bin/pulsar-admin sink update \
--name mysql-jdbc-sink \
--parallelism 2
(2)如果出现以下信息,则说明更新成功。
Updated successfully
(3)查看 mysql-jdbc-sink 的信息,再次验证更新结果。
$ bin/pulsar-admin sink get \
--tenant public \
--namespace default \
--name mysql-jdbc-sink
(4)Parallelism 为 2,说明已更新成功。
{
"tenant": "public","namespace": "default","name": "mysql-jdbc-sink","className": "org.apache.pulsar.io.jdbc.JdbcAutoSchemaSink","inputSpecs": {
"test-jdbc": {
"isRegexPattern": false}},"configs": {
"password": "jdbc","jdbcUrl": "jdbc:mysql://127.0.0.1:3306/test_jdbc","userName": "root","tableName": "test_jdbc"},"parallelism": 2,"processingGuarantees": "ATLEAST_ONCE","retainOrdering": false,"autoAck": true
}
3、Delete-删除 sink
$ bin/pulsar-admin sink delete <options>
常用参数
-
–name : 指定 sink 的名称
-
–namespace : 指定 sink 的命名空间
-
–tenant : 指定 sink 的租户
实践
(1)删除 mysql-jdbc-sink。
$ bin/pulsar-admin sink delete \
--tenant public \
--namespace default \
--name mysql-jdbc-sink
(2)如果出现以下信息,则说明删除成功。
Deleted successfully
(3)查看 mysql-jdbc-sink 的信息,再次验证删除结果。
$ bin/pulsar-admin sink get \
--tenant public \
--namespace default \
--name mysql-jdbc-sink
(4)mysql-jdbc-sink 不存在,说明已删除成功。
HTTP 404 Not FoundReason: Sink mysql-jdbc-sink doesn't exist
4、List-显示所有 sink
$ bin/pulsar-admin sink list <options>
常用参数
-
–namespace : 指定 sink 的命名空间
-
–tenant : 指定 sink 的租户
实践
(1)显示所有 sink。
$ bin/pulsar-admin sink list \
--tenant public \
--namespace default
(2)返回结果显示前文创建的 mysql-jdbc-sink。
["mysql-jdbc-sink"
]
5、Get-显示 sink 的信息
$ bin/pulsar-admin sink get <options>
常用参数
-
–name : 指定 sink 的名称
-
–namespace : 指定 sink 的命名空间
-
–tenant : 指定 sink 的租户
实践
(1)显示 sink 的信息。
$ bin/pulsar-admin sink get \
--tenant public \
--namespace default \
--name mysql-jdbc-sink
(2)返回结果显示 mysql-jdbc-sink 的信息,包括租户、命名空间和名称等。
{
"tenant": "public","namespace": "default","name": "mysql-jdbc-sink","className": "org.apache.pulsar.io.jdbc.JdbcAutoSchemaSink","inputSpecs": {
"test-jdbc": {
"isRegexPattern": false}},"configs": {
"password": "jdbc","jdbcUrl": "jdbc:mysql://127.0.0.1:3306/test_jdbc","userName": "root","tableName": "test_jdbc"},"parallelism": 1,"processingGuarantees": "ATLEAST_ONCE","retainOrdering": false,"autoAck": true
}
6、Status-显示 sink 的状态
$ bin/pulsar-admin sink status <options>
常用参数
-
–instance-id : 指定 sink 的实例 ID
-
如果未指定,则获取所有实例的状态
-
–name : 指定 sink 的名称
-
–namespace : 指定 sink 的命名空间
-
–tenant : 指定 sink 的租户
实践
(1)显示 mysql-jdbc-sink 的状态。
$ bin/pulsar-admin sink status \
--tenant public \
--namespace default \
--name mysql-jdbc-sink
(2)返回结果显示 mysql-jdbc-sink 的状态信息,包括实例数量、是否正在运行和 worker ID 等。
{
"numInstances" : 1,"numRunning" : 1,"instances" : [ {
"instanceId" : 0,"status" : {
"running" : true,"error" : "","numRestarts" : 0,"numReadFromPulsar" : 0,"numSystemExceptions" : 0,"latestSystemExceptions" : [ ],"numSinkExceptions" : 0,"latestSinkExceptions" : [ ],"numWrittenToSink" : 0,"lastReceivedTime" : 0,"workerId" : "c-standalone-fw-tengdeMBP.lan-8080"}} ]
}
7、Stop-停止 sink
$ bin/pulsar-admin sink stop <options>
常用参数
-
–instance-id : 指定 sink 的实例 ID
-
如果未指定,则停止所有实例的状态
-
–name : 指定 sink 的名称
-
–namespace : 指定 sink 的命名空间
-
–tenant : 指定 sink 的租户
实践
(1)停止 mysql-jdbc-sink。
$ bin/pulsar-admin sink stop \
--tenant public \
--namespace default \
--name mysql-jdbc-sink \
--instance-id 0
(2)如果出现以下信息,则说明停止成功。
Stopped successfully
8、 Start-启动 sink
$ bin/pulsar-admin sink start <options>
常用参数
-
–instance-id : 指定 sink 的实例 ID
如果未指定,则启动所有实例 -
–name : 指定 sink 的名称
-
–namespace : 指定 sink 的命名空间
-
–tenant : 指定 sink 的租户
实践
(1)启动 mysql-jdbc-sink。
$ bin/pulsar-admin sink start \
--tenant public \
--namespace default \
--name mysql-jdbc-sink \
--instance-id 0
(2)如果出现以下信息,则说明启动成功。
Started successfully
9、Restart-重启 sink
$ bin/pulsar-admin sink restart <options>
常用参数
-
–instance-id : 指定 sink 的实例 ID
-
如果未指定,则获取所有实例的状态
-
–name : 指定 sink 的名称
-
–namespace : 指定 sink 的命名空间
-
–tenant : 指定 sink 的租户
实践
(1)重启 mysql-jdbc-sink。
$ bin/pulsar-admin sink restart \
--tenant public \
--namespace default \
--name mysql-jdbc-sink \
--instance-id 0
(2)如果出现以下信息,则说明重启成功。
Restarted successfully
10、Localrun-本地运行
在本地运行一个 Pulsar IO sink connector,方便调试。
$ bin/pulsar-admin sink localrun <options>
常用参数
-
-a,–archive : 指定 source 的 NAR 包
-
–classname : 指定 sink 的类名称
-
-i,–inputs : 指定 sink 的 topic,多个 topic 用逗号隔开
-
–name : 指定 sink 的名称
-
–namespace : 指定 sink 的命名空间
-
–parallelism : 指定 sink 的并发数
-
–sink-config-file : 指定 sink 的 yaml 配置文件
-
–tenant : 指定 sink 的租户
摘自
https://mp.weixin.qq.com/s?__biz=MzUyMjkzMjA1Ng%3D%3D&idx=1&mid=2247484210&scene=21&sn=786f607bbb3ab21bfa639bb619f3e566#wechat_redirect