问题描述
我已经设置了一个带有3个代理的kafka服务器。
我想从我的计算机向这三个经纪人发送一条消息,但我已经为每个经纪人配置了一个网址,如abc.com/kafka1/ abc.com/kafka2/ abc.com/kafka3/
在ngix
。
如何在metadata.broker.list
属性中使用这些URL?
我的代码如下。
package com.xxx.x.kafka.producer;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import java.util.Date;
import java.util.Properties;
import java.util.Random;
class TestProducer {
public static void main(String[] args) {
long events = Long.parseLong(args[0]);
Random rnd = new Random();
Properties props = new Properties();
props.put("metadata.broker.list", "abc.com/kafka1/:80,abc.com/kafka2/:80,abc.com/kafka3/:80");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("partitioner.class", "com.knx.adx.kafka.producer.SimplePartitioner");
props.put("request.required.acks", "1");
ProducerConfig config = new ProducerConfig(props);
Producer<String, String> producer = new Producer<String, String>(config);
for (long nEnvents = 0; nEnvents < events; nEnvents++) {
long runtime = new Date().getTime();
String ip = "192.168.2." + rnd.nextInt(255);
String msg = runtime + ",www.example.com" + ip;
KeyedMessage<String, String> data = new KeyedMessage<String, String>("page_visits", ip, msg);
producer.send(data);
}
producer.close();
}
}
这是我运行代码时遇到的错误。
Exception in thread "main" kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
at kafka.producer.Producer.send(Producer.scala:77)
at kafka.javaapi.producer.Producer.send(Producer.scala:33)
at com.knx.adx.kafka.producer.TestProducer.main(TestProducer.java:35)
line error producer.send(data);
1楼
配置变量metadata.broker.list
需要host1:port1,host2:port2
而不是URL。
尝试为每个代理配置不同的子域名,如kafka1.abc.com:80,kafka2.abc.com:80,kafka3.abc.com:80
并将这些子域指向适当的主机。
请参阅的producer config部分
这是用于引导,生产者将仅使用它来获取元数据(主题,分区和副本)。 将基于元数据中返回的代理信息建立用于发送实际数据的套接字连接。 格式为host1:port1,host2:port2,列表可以是代理的子集,也可以是指向代理子集的VIP。
2楼
我会说abc.com/kafka1/:80不是正确的语法。 我认为正确的应该是abc.com:9092。
metadata.broker.list属性中使用的url和port应该由您在Kafka broker server.properties文件中设置的内容(或启动它时设置的任何名称)确定。
重要的价值观是:
# The port the socket server listens on
port=xxx
# Hostname the broker will bind to. If not set, the server will bind to all interfaces
#host.name=localhost
# Hostname the broker will advertise to producers and consumers. If not set, it uses the
# value for "host.name" if configured. Otherwise, it will use the value returned from
# java.net.InetAddress.getCanonicalHostName().
# advertised.host.name=<hostname routable by clients>
默认端口为9092,如果使用80,请检查此端口。
希望这可以帮助。