当前位置: 代码迷 >> 综合 >> kafka安装、常用命令、集群、producer、consumer
  详细解决方案

kafka安装、常用命令、集群、producer、consumer

热度:64   发布时间:2023-12-16 14:26:50.0

文章目录

  • kafka安装
  • kafka常用命令
  • producer
  • kafka集群
  • consumer

kafka安装

1、解压并改名

tar -zxf kafka_2.11-2.0.0.tgz
mv kafka_2.11-2.0.0 kafka211

2、修改配置文件

vi /root/software/kafka211/config
#broker的全局唯一编号,不能重复
broker.id=0
#kafka运行日志存放的路径
log.dirs=/opt/bigdata/kafka211/kafka_logs
#broker需要使用zookeeper保存meta数据
zookeeper.connect=192.168.232.211:2181
#删除topic需要server.properties中设置delete.topic.enable=true否则只是标记删除
delete.topic.enable=true
advertised.listeners=PLAINTEXT://192.168.232.211:9092

3、环境变量

vi /etc/profile
export KAFKA_HOME=/opt/bigdata/kafka211
export PATH=$PATH:$KAFKA_HOME/bin
source /ect/profile

4、启动
1)启动zookeeper
配置zookeeper环境变量

vi /etc/profile
export ZOOKEEPER_HOME=/root/software/zkpr
export PATH=$PATH:$ZOOKEEPER_HOME/bin
source /etc/profile```
zkServer.sh start
2)启动kafka```xml
kafka-server-start.sh -daemon /opt/bigdata/kafka211/config/server.properties  

在这里插入图片描述

另一种启动方法 (挂起状态启动,同时设置日志输出)
nohup kafka-server-start.sh /root/software/kafka211/config/server.properties >kafka.log 2>&1
关闭kafka 
kafka-server-stop.sh    或者直接kill

kafka常用命令

查看kafka队列
kafka-topics.sh --zookeeper 192.168.232.211:2181 --list  删除kafka 队列
kafka-topics.sh --zookeeper 192.168.232.211:2181 --delete --topic 队列名创建kafka队列  需要同时设置队列名 、分区数量和副本数量(副本数量必须小于等于集群机器数量)
kafka-topics.sh -zookeeper 192.168.232.211:2181 --create --topic mydemo --partitions 1 --replication-factor 1查看主题详情
kafka-topics.sh --zookeeper 192.168.232.211:2181 --describe --topic mydemo生产消息到  mydemo
kafka-console-producer.sh --topic mydemo --broker-list 192.168.232.211:9092消费消息 从mydemokafka-console-consumer.sh --topic mydemo --bootstrap-server 192.168.232.211:9092 --from-beginning查看队列消息数量
kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 192.168.232.211:9092 --topic users -time -1 --offsets 1

producer

package Kafka.stu;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class MyProducer2 {
    public static void main(String[] args) {
    Properties properties = new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092");      //配置kafka端口properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);  //配置key序列化类型properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class); //配置value序列化类型properties.put(ProducerConfig.ACKS_CONFIG,"1");KafkaProducer<String, String> producer = new KafkaProducer<>(properties);            //加载配置ProducerRecord<String, String> record = new ProducerRecord<>("secDemo", "456hello 122222");  //指定 主题 和 消息producer.send(record);          //发送消息producer.close();               //关闭资源}
}

循环从控制台输入

package cn.kb11;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;
import java.util.Scanner;/*** @Author qiaozhengchao* @Date 2021/5/20* @Description*/
public class MyProducer {
    public static void main(String[] args) {
    Properties properties = new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.232.211:9092");properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);/** 0 producer在发送数据后,不会等待broker任何相应,无法确保数据正确发送到broker中* 1 只需要得到分区副本中leader确认就可以* -1 producer需要等到所有的副本全部确认,相应时间最长,数据最安全,不会丢失数据*/properties.put(ProducerConfig.ACKS_CONFIG,"0");KafkaProducer<String,String> producer = new KafkaProducer<>(properties);String content = "1";Scanner scanner = new Scanner(System.in);while (content.equals("1")){
    System.out.println("请输入内容");String txt = scanner.next();ProducerRecord<String, String> record = new ProducerRecord<>("mydemo", txt);producer.send(record);System.out.println("是否退出:0:退出 1:继续发送");content=scanner.next();}}
}

kafka集群

和安装伪分布式kafka的配置文件的区别

#broker的全局唯一编号,不能重复,根据机器配置不同的broker.id
broker.id=0
#broker需要使用zookeeper保存meta数据
zookeeper.connect=192.168.232.211:2181,192.168.232.210:2181
需要将所有机器的ip加一起

consumer

package Kafka.stu;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.util.Collections;
import java.util.Properties;public class MyConsumer {
    public static void main(String[] args) {
    Properties properties = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop103:9092");properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,"30000");     //设置会话间隔properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");     //设置是否自动提交
// properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000"); //设置自动提交时间间隔properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");/*earliest当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费latest当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据nonetopic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常* */properties.put(ConsumerConfig.GROUP_ID_CONFIG,"group_1");    //创建用户组//创建单个用户/*KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer(properties);kafkaConsumer.subscribe(Collections.singleton("secDemo"));while (true) {ConsumerRecords<String, String> record = kafkaConsumer.poll(100);for (ConsumerRecord<String, String> o : record) {System.out.println(o.offset() + " " + o.key() + " " + o.value());}}*///secDemo有四个分区,一个消费者组(group_1)中只有一个消费者,//最优化的方式是在消费者组中,即有几个分区,就创建几个消费者//设置自动提交后,同一个组的用户第一次可以取到,消费者提交过后,就取不到了,想要在取,更换用户组名,//创建多个用户for (int i = 0; i < 4; i++) {
    new Thread(new Runnable() {
    @Overridepublic void run() {
    KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer(properties);kafkaConsumer.subscribe(Collections.singleton("secDemo"));while (true) {
    ConsumerRecords<String, String> record = kafkaConsumer.poll(100);for (ConsumerRecord<String, String> o : record) {
    System.out.println(o.offset() + " " + o.key() + " " + o.value());}}}}).start();}}
}