1、Pulsar IO引入
Apache Pulsar 是业界领先的消息系统。使用消息系统时,一个较为常见的问题就是:将数据移入或移出消息平台的最佳方法是什么?
当然,用户可以使用 Pulsar 的 consumer 和 producer API 编写自定义代码,来传输数据。但除此之外,是否还有其他方法呢?
以下为用户提出的一些相关问题:
要将数据发布到 Pulsar 或使用 Pulsar 中的数据,我应该在
哪里
运行相应程序?要将数据发布到 Pulsar 或使用 Pulsar 中的数据,我应该
怎样
运行相应程序?
用户之所以会提出这些问题,是因为其他消息/发布-订阅系统没有提供有组织且容错的方式来帮助用户从外部系统输入数据或将数据输出到外部系统,因而用户需要寻求自定义解决方案并手动运行。
为了解决上述问题并简化这一过程,因此推出了 Pulsar IO 。
Pulsar IO 通过利用现有的 Pulsar Functions 框架来输入/输出数据
。而 Pulsar Functions 框架的所有优势(如:容错性、并行性、弹性、负载平衡、按需更新等)都可以直接被 Pulsar 输入/输出数据的应用程序所利用。
而且,我们发现经常会出现这样的情况,用户花很大功夫(因为他们不是消息系统方面的专家,可能也不想成为这一领域的专家)去编写自定义程序,用于从消息传递系统访问数据。
自定义编写这些应用程序不仅会很困难,而且我们发现,许多用户在尝试实现执行相同功能的应用程序时,做了相同的工作。归根结底,消息系统只是用于移动数据的工具,因此,在设计 Pulsar IO 框架时,我们的主要目标之一就是易用性。
我们希望用户能够在不编写任何代码,也不用同时成为 Pulsar 和外部系统专家的情况下,可以从外部系统输入数据或将数据输出到外部系统。
2、Pulsar IO 框架
首先,我们定义两个应用程序,
一个作为 source =》将数据输入到 Pulsar
另一个作为 sink=》从 Pulsar 接收数据。
Source 将数据从外部系统导入 Pulsar,而 sink 将数据从 Pulsar 导出到外部系统。
具体来看,source 从外部系统读取数据,并将数据写入 Pulsar topic,而 sink 从一个或多个 Pulsar topic 读取数据,并将数据写入外部系统。
Pulsar IO 框架在现有的 Pulsar functions 框架上运行。单个 source 和 sink 可以像 function一样与 Pulsar broker 一起运行,如下图所示。
因此,Pulsar Functions 框架的所有优势都适用于 Pulsar IO 框架,即 sink 和 source 应用程序。
正如前面提到的,我们的设计目标包括用户无需编写任何自定义应用程序,也无需编写任何代码就可以将数据移入或移出 Pulsar。
因此,Pulsar IO 框架中有多种内置 source 和 sink(Kafka、Twitter Firehose、Cassandra、Aerospike 等,还会支持更多),用户只需使用一个命令便可运行。用户因此可以关注于业务逻辑,而无需担心实现细节。
3、使用 Pulsar IO
使用 Pulsar IO 框架很容易。用户可以在命令行界面使用一行简单的命令启动内置 source 或 sink。例如,用户可以用下面的命令来提交 source 到已有的 Pulsar 集群,命令格式如下:
Source
$ ./bin/pulsar-admin source create \ --tenant <tenant> \ --namespace <namespace> \ --name <source-name> \ --destinationTopicName <input-topics> \ --source-type <source-type>
以下示例为运行 twitter firehose source 的命令,用于将 Twitter 中的数据导入 Pulsar:
$ ./bin/pulsar-admin source create \
--tenant test \
--namespace ns1 \
--name twitter-source \
--destinationTopicName twitter_data \
--sourceConfigFile examples/twitter.yml \
--source-type twitter
经过以上步骤,用户即可向 Pulsar 输入数据,而无需编写或编译任何代码。唯一可能需要的是一个配置文件,用于为该 source 或 sink 指定某些配置。
Sink
用户可以通过以下格式的命令向现有的 Pulsar 集群中提交待运行的内置 sink:
$ ./bin/pulsar-admin sink create \
--tenant <tenant> \
--namespace <namespace> \
--name <sink-name> \
--inputs <input-topics> \
--sink-type <sink-type>
以下为运行 Cassandra sink 的示例命令,用于将数据从 Pulsar 导出到 Cassandra:
$ ./bin/pulsar-admin sink create \
--tenant public \
--namespace default \
--name cassandra-test-sink \
--sink-type cassandra \
--sinkConfigFile examples/cassandra-sink.yml \
--inputs test_cassandra
更多关于如何运行 Cassandra source 的信息,参阅快速入门指南:
https://pulsar.apache.org/docs/en/2.1.1-incubating/io-quickstart/
本地模式
以上命令显示了如何在“集群”模式下(即作为现有 Pulsar 集群的一部分)运行 source 和 sink。
除此之外,还可以在本地运行模式下将 source 和 sink 作为独立进程运行,这一模式会在机器上生成本地进程并且运行 source 或者 sink 的逻辑。
本地运行模式有助于测试和调试,但是,需要用户自行监控和监督。以下为在本地运行模式下运行 source 的命令示例:
$ ./bin/pulsar-admin sink localrun \
--tenant public \
--namespace default \
--name cassandra-test-sink \
--sink-type cassandra \
--sinkConfigFile examples/cassandra-sink.yml \
--inputs test_cassandra
动态更新source/sink配置
由于 Pulsar IO 框架在 Pulsar Functions 上运行,因此可以通过更新参数和配置来动态更新 source 或 sink。例如,当希望利用前面提到的 Twitter firehose source 将数据输入到另一个 Pulsar topic 时,可以执行以下命令:
$ ./bin/pulsar-admin source update \--tenant test \--namespace ns1 \--name twitter-source \--destinationTopicName twitter_data_2 \--sourceConfigFile examples/twitter.yml \--source-type twitter
也可以使用同样格式的命令更新 sink。大多数 source 和 sink 的更新都可以在运行时进行配置,从而简化修改、测试、部署等流程。
4、自定义Source、Sink
详情请查看
https://mp.weixin.qq.com/s?__biz=MzUxOTc4NDc2MQ%3D%3D&idx=1&mid=2247485274&scene=21&sn=616026cade5b161338f522bb4ddecae8#wechat_redirect
摘自 ApachePulsar公众号
https://mp.weixin.qq.com/s?__biz=MzUxOTc4NDc2MQ%3D%3D&idx=1&mid=2247485274&scene=21&sn=616026cade5b161338f522bb4ddecae8#wechat_redirect