当前位置: 代码迷 >> 综合 >> kafka+zookeeper+filebeat集群
  详细解决方案

kafka+zookeeper+filebeat集群

热度:75   发布时间:2023-10-19 00:52:37.0

环境

ElasticSearch集群的高可用和自平衡方案会在节点挂掉(重启)后自动在别的结点上复制该结点的分片,这将导致了大量的IO和网络开销。
如果离开的节点重新加入集群,elasticsearch为了对数据分片(shard)进行再平衡,会为重新加入的节点再次分配数据分片(Shard), 当一台es因为压力过大而挂掉以后,其他的es服务会备份本应那台es保存的数据,造成更大压力,于是整个集群会发生雪崩。
生产环境下建议关闭自动平衡。
三台服务器
10.21.2.224
10.21.2.225
10.21.2.226

1.安装zookeeper源码包.配三台,每台/tmp/zookeeper/myid不同

配置文件 conf/zoo.cfgdataDir=/tmp/zookeeper
clientPort=2181
maxClientCnxns=0
tickTime=2000
initLimit=10
syncLimit=5
server.0=10.21.2.224:2887:3887
server.1=10.21.2.225:2887:3887
server.2=10.21.2.226:2887:3887

启动

./zkServer.sh start

2.安装kafka源码包

10.21.2.224 >> server.propertiesbroker.id=0
listeners=PLAINTEXT://0.0.0.0:9092
port=9092
advertised.host.name=10.21.2.224
advertised.listeners=PLAINTEXT://10.21.2.224:9092
num.network.threads=30
num.io.threads=80
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/opt/data/kafka_data
num.partitions=15
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=10.21.2.224:2181,10.21.2.225:2181,10.21.2.226:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
10.21.2.225 >> server.propertiesbroker.id=1
listeners=PLAINTEXT://0.0.0.0:9092
port=9092
advertised.host.name=10.21.2.225
advertised.listeners=PLAINTEXT://10.21.2.225:9092
num.network.threads=30
num.io.threads=80
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/opt/data/kafka_data
num.partitions=15
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=10.21.2.224:2181,10.21.2.225:2181,10.21.2.226:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
10.21.2.226 >> server.propertiesbroker.id=2
listeners=PLAINTEXT://0.0.0.0:9092
port=9092
advertised.host.name=10.21.2.226
advertised.listeners=PLAINTEXT://10.21.2.226:9092
num.network.threads=30
num.io.threads=80
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/opt/data/kafka_data
num.partitions=15
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=10.21.2.224:2181,10.21.2.225:2181,10.21.2.226:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0

启动

nohup ./bin/kafka-server-start.sh config/server.properties &

测试

选择一个节点上创建一个新的Topic
./bin/kafka-topics.sh --create --zookeeper 10.21.2.224:2181,10.21.2.225:2181,10.21.2.226:2181 --topic test  --replication-factor 3 --partitions 3查看topic副本信息
./bin/kafka-topics.sh --describe --zookeeper 10.21.2.224:2181,10.21.2.225:2181,10.21.2.226:2181 --topic test查看已经创建的topic信息
./bin/kafka-topics.sh --list --zookeeper 10.21.2.224:2181,10.21.2.225:2181,10.21.2.226:2181测试生产者发送消息./bin/kafka-console-producer.sh --broker-list 10.21.2.224:9092,10.21.2.225:9092,10.21.2.226:9092 --topic test测试消费者消费消息./bin/kafka-console-consumer.sh --bootstrap-server 10.21.2.224:9092,10.21.2.225:9092,10.21.2.226:9092  --from-beginning --topic test

压测

https://www.cnblogs.com/xiao987334176/p/10075659.html

3.filebeat

filebeat.modules:
filebeat.prospectors:
- input_type: logpaths:- /tmp/nohup1.logfields:log_source: asc_batchlog_topics: asc- input_type: logpaths:- /tmp/nohup2.logfields:log_topics: test2#------------------------------- Kafka output ----------------------------------
output.kafka:enabled: truehosts: ["10.21.2.224:9092","10.21.2.225:9092","10.21.2.226:9092"]topic: '%{[fields][log_topics]}'compression: gzipmax_message_bytes: 1000000

4.logstash配置


input {kafka{bootstrap_servers => "10.21.2.224:9092,10.21.2.225:9092,10.21.2.226:9092"topics => ["ad","novel","mdm","yuqing","ir-ygx","mini-game","azkaban","ir-feihuo","ykusercenter","yksplus","ykforgetpwd","ykpaycenter","yksocialite","hps"]codec => "json"consumer_threads => 5decorate_events => trueauto_offset_reset => "latest"}
}output {if [fields][log_topics] == "ad" {elasticsearch {hosts => ["10.21.2.219:9200"]index => "ad-%{+YYYY.MM.dd}"user => elasticpassword => changeme}}else if [fields][log_topics] == 'nove' {elasticsearch {hosts => ["10.21.2.219:9200"]index => "novel-%{+YYYY.MM.dd}"user => elasticpassword => changeme}}
}
  相关解决方案