导言
项目主要是通过Filebeat读取日志文件,传输到Kafka上,Logstash获取Kafka的消息,过滤日志信息,传输到ElasticSearch上。实现数据的实时统计。楼主也是刚接触了ElasticSearch,有什么错误的,或者更好的操作,可以提供下,一起讨论。
1. 下载Filebeat的基础文件(根据自己的系统选择对应的文件
https://www.elastic.co/cn/downloads/beats/filebeat)
2. filebeat的配置文件(filebeat.yml)
3. 日志源是json形式
1.D:\program\php\a.log{"ip":"168.168.1.1"}
2.D:\program\php\b.log{“ips”:"127.0.0.2"}
4. 配置filebeat的配置文件如下
filebeat.inputs:
# 日志类型
- type: logfields:# 用于后面的识别判断log_source: ip# 是否实时读取enabled: true# 日志文件位置paths:-D:\program\php\a.log# 日志文件中包含id数据进行收集 include_lines: ['id'] # 多长时间去检测新的文件生成的时间 ,默认为 10s scan_frequency: 1s
- type: logfields:log_source: ipsenabled: true paths:-D:\program\php\b.log# 日志文件中包含ids数据进行收集 include_lines: ['ids'] # 多长时间去检测新的文件生成的时间 ,默认为 10s scan_frequency: 1s
processors:
# 删除掉filebeat再传输过程中附加的多余字段"log", "ecs","agent","agent"
- drop_fields:fields: ["log", "ecs","agent","agent"] # 输出源,这是是输出到kafka
output.kafka:enabled: truehosts: ["127.0.0.1:9092"]# 并发负载均衡kafka输出的数量worker: 2topic: "ip"# 单条消息的大小,默认值为10Mmax_message_bytes: 1024# 配置多个topic输出topics:# 根据上列设置的log_source设置传输到kafka的topics- topic: "%{[fields.log_source]}"when.contains:# 日志文件中包含的内容传输message: "ip"- topic: "%{[fields.log_source]}"when.contains:message: "ips"
# 进程数量
max_procs: 2
queue.mem:# 消息队列大小,默认值为4096(filebeat最大的可能占用的内存是max_message_bytes * queue.mem.events )events: 4096
# 发送的最小事件数,默认为 2048flush.min_events: 512
# 最长等待时间,设为为 0,事件将立即使用flush.timeout: 1s
5. 运行命令 ./filebeat -e -c filebeat-test.yml (在filebeat的根目录下运行)
6. Filebeat配置信息参数(后续补充)
原文链接:https://blog.csdn.net/xfxf0520/article/details/101440065
欢迎大家访问我的博客:随风飘雁的博客