当前位置: 代码迷 >> 综合 >> mysql-canal-kafka-redis
  详细解决方案

mysql-canal-kafka-redis

热度:103   发布时间:2023-11-22 11:06:34.0

前面已经讲过mysql部署,canal-kafka部署,现在讲kafka-redis
springboot

  1. 依赖
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

2)application.yml

spring:redis:host: 192.168.66.101port: 6379database: 0#password: 123456kafka:# Kafka服务地址bootstrap-servers: 192.168.66.101:9092consumer:# 指定一个默认的组名group-id: consumer-group1#序列化反序列化key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproducer:key-serializer: org.apache.kafka.common.serialization.StringDeserializervalue-serializer: org.apache.kafka.common.serialization.StringDeserializer# 批量抓取batch-size: 65536# 缓存容量buffer-memory: 524288

3)RedisClient

@Component
public class RedisClient {/*** 获取redis模版*/@Resourceprivate StringRedisTemplate stringRedisTemplate;/*** 设置redis的key-value*/public void setString(String key, String value) {setString(key, value, null);}/*** 设置redis的key-value,带过期时间*/public void setString(String key, String value, Long timeOut) {stringRedisTemplate.opsForValue().set(key, value);if (timeOut != null) {stringRedisTemplate.expire(key, timeOut, TimeUnit.SECONDS);}}/*** 获取redis中key对应的值*/public String getString(String key) {return stringRedisTemplate.opsForValue().get(key);}/*** 删除redis中key对应的值*/public Boolean deleteKey(String key) {return stringRedisTemplate.delete(key);}
}

4)创建一个CanalBean对象进行接收json数据格式

@Data
public class CanalBean {//数据private List<TbCommodityInfo> data;//数据库名称private String database;private long es;//递增,从1开始private int id;//是否是DDL语句private boolean isDdl;//表结构的字段类型private MysqlType mysqlType;//UPDATE语句,旧数据private String old;//主键名称private List<String> pkNames;//sql语句private String sql;private SqlType sqlType;//表名private String table;private long ts;//(新增)INSERT、(更新)UPDATE、(删除)DELETE、(删除表)ERASE等等private String type;
}@Data
public class MysqlType {private String id;private String commodity_name;private String commodity_price;private String number;private String description;
}
@Data
public class SqlType {private int id;private int commodity_name;private int commodity_price;private int number;private int description;
}@Data
public class TbCommodityInfo {private String id;private String commodity_name;private String commodity_price;private String number;private String description;
}

创建一个消费者CanalConsumer进行消费

@Component
public class CanalConsumer {//日志记录private static Logger log = LoggerFactory.getLogger(CanalConsumer.class);//redis操作工具类@Resourceprivate RedisClient redisClient;//监听的队列名称为:canaltopic@KafkaListener(topics = "canaltopic")public void receive(ConsumerRecord<?, ?> consumer) {String value = (String) consumer.value();log.info("topic名称:{},key:{},分区位置:{},下标:{},value:{}", consumer.topic(), consumer.key(),consumer.partition(), consumer.offset(), value);//转换为javaBeanCanalBean canalBean = JSONObject.parseObject(value, CanalBean.class);//获取是否是DDL语句boolean isDdl = canalBean.getIsDdl();//获取类型String type = canalBean.getType();//不是DDL语句if (!isDdl) {List<TbCommodityInfo> tbCommodityInfos = canalBean.getData();//过期时间long TIME_OUT = 600L;if ("INSERT".equals(type)) {//新增语句for (TbCommodityInfo tbCommodityInfo : tbCommodityInfos) {String id = tbCommodityInfo.getId();//新增到redis中,过期时间是10分钟redisClient.setString(id, JSONObject.toJSONString(tbCommodityInfo), TIME_OUT);}} else if ("UPDATE".equals(type)) {//更新语句for (TbCommodityInfo tbCommodityInfo : tbCommodityInfos) {String id = tbCommodityInfo.getId();//更新到redis中,过期时间是10分钟redisClient.setString(id, JSONObject.toJSONString(tbCommodityInfo), TIME_OUT);}} else {//删除语句for (TbCommodityInfo tbCommodityInfo : tbCommodityInfos) {String id = tbCommodityInfo.getId();//从redis中删除redisClient.deleteKey(id);}}}}
}
CREATE TABLE `tb_commodity_info` (`id` varchar(32) NOT NULL,`commodity_name` varchar(512) DEFAULT NULL COMMENT '商品名称',`commodity_price` varchar(36) DEFAULT '0' COMMENT '商品价格',`number` int(10) DEFAULT '0' COMMENT '商品数量',`description` varchar(2048) DEFAULT '' COMMENT '商品描述',PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='商品信息表';
INSERT INTO `canaldb`.`tb_commodity_info` (`id`, `commodity_name`, `commodity_price`, `number`, `description`) VALUES ('3e71a81fd80711eaaed600163e046cc3', '叉烧包', '3.99', '3', '又大又香的叉烧包,老人小孩都喜欢');
  相关解决方案