elk 日志 处理系统
1)ElasticSearch是一个基于Lucene的开源分布式搜索服务器。它的特点有:分布式,零配置,自动发现,索引自动分片,索引副本机制,restful风格接口,多数据源,自动搜索负载等。它提供了一个分布式多用户能力的全文搜索引擎,基于RESTful web接口。Elasticsearch是用Java开发的,并作为Apache许可条款下的开放源码发布,是第二流行的企业搜索引擎。设计用于云计算中,能够达到实时搜索,稳定,可靠,快速,安装使用方便。
在elasticsearch中,所有节点的数据是均等的。
2)Logstash是一个完全开源的工具,它可以对你的日志进行收集、过滤、分析,支持大量的数据获取方法,并将其存储供以后使用(如搜索)。说到搜索,logstash带有一个web界面,搜索和展示所有日志。一般工作方式为c/s架构,client端安装在需要收集日志的主机上,server端负责将收到的各节点日志进行过滤、修改等操作在一并发往elasticsearch上去。
3)Kibana 是一个基于浏览器页面的Elasticsearch前端展示工具,也是一个开源和免费的工具,Kibana可以为 Logstash 和 ElasticSearch 提供的日志分析友好的 Web 界面,可以帮助您汇总、分析和搜索重要数据日志。
一个完整的集中式日志系统,需要包含以下几个主要特点:**
1)收集-能够采集多种来源的日志数据
2)传输-能够稳定的把日志数据传输到中央系统
3)存储-如何存储日志数据
4)分析-可以支持 UI 分析
5)警告-能够提供错误报告,监控机制
ELK提供了一整套解决方案,并且都是开源软件,之间互相配合使用,完美衔接,高效的满足了很多场合的应用。目前主流的一种日志系统。
对日志的简单操作
elk对es和lostash他们俩在产生大量的日志文件的时候 进行缓解压力
1) awk grep cat tail 简单筛选
面对海量日志的场景 简单操作 效率过低
日志系统: 收集日志 分析日志数据 有好的 web ui
作用 1) 信息的查找 --异常(warn waning error...not found)
2) 服务诊断 --分析服务器的负载状况和运行状态 优化
3)数据分析 --top10
组成部分
1)采集端 agent 负载日志源数据进行采集、封装 并发送到聚合段
2)聚合端:collector 接收来自采集端的数据按照一定规则进行数据
处理 讲数据发送给存储端
3)存储端 store 数据存储(可扩展 可靠)
常见的方案:ELF EFK graylog ELK 流行分析
ELK方案
第一种 方案 没有消息队列
log 日志元数据
beats 日志元数据的采集 发送到 logstash 创建一个索引 存储到elas里面
有消息队列
满足消息队列 即可
实验
第一台 软件包 192.168.10.70
jdk-8u201-linux-x64.tar.gz
elasticsearch-6.3.2.tar_(1).gz
第二台 软件包 192.168.10.11
kafka_2.12-2.1.0.tgz
kibana-6.3.2-linux-x86_64.tar.gz
zookeeper-3.4.12.tar.gz
第三台 软件包 192.168.10.30
logstash-6.3.2.tar.gz
第四台 软件包 192.168.10.5
nginx-1.15.4.tar.gz
filebeat-6.3.2-linux-x86_64.tar.gz
第一台 配置
配置java环境
tar -zxf jdk-8u201-linux-x64.tar.gz
mv jdk1.8.0_201/ /usr/local/java
rm -rf /usr/bin/java
echo 'export JAVA_HOME=/usr/local/java export JRE_HOME=/usr/local/java/jre export CLASSPATH=$JAVA_HOME/lib:$JRE_HOME/lib export PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin' >> /etc/profile
source /etc/profile
java -version
tar -zxf elasticsearch-6.3.2.tar_\(1\).gz
mv elasticsearch-6.3.2 /usr/local/es
echo '* soft nofile 655360' >> /etc/security/limits.conf
echo '* hard nofile 655360' >> /etc/security/limits.conf
echo '* soft nproc 2048' >> /etc/security/limits.conf
echo '* hard nproc 4096' >> /etc/security/limits.conf
echo 'vm.max_map_count=655360' >> /etc/sysctl.conf
sysctl -p
useradd es
mkdir -p /es/{data,logs}
chown -R es:es /usr/local/es/ /es
ntpdate ntp.ntsc.ac.cn 四台同步时间
reboot
vim /usr/local/es/config/elasticsearch.yml 把自己当成集群
17行 取消注释
cluster.name: my-application
13行 取消注释 并添加
node.name: node-1
node.data: true
node.master: true
35行39行取消注释
path.data: /es/data
path.logs: /es/logs
57行取消注释本机IP
network.host: 192.168.10.70
61行取消注释
http.port: 9200
74取消注释 联络自己就ok
discovery.zen.minimum_master_nodes: 1
第二台主机
tar -zxf kibana-6.3.2-linux-x86_64.tar.gz
mv kibana-6.3.2-linux-x86_64 /usr/local/kibana
vim /usr/local/kibana/config/kibana.yml
2行取消注释
server.port: 5601 需要记住
7行 本机IP
server.host: "192.168.10.11"
28行 修改es的IP
elasticsearch.url: "http://192.168.10.70:9200"
安装kafka
tar -zxf zookeeper-3.4.12.tar.gz
mv zookeeper-3.4.12 /usr/local/zookeeper
cd /usr/local/zookeeper/
mkdir data
echo 1 > data/myid
cp conf/zoo_sample.cfg conf/zoo.cfg
vim conf/zoo.cfg
12行
dataDir=/usr/local/zookeeper/data
dataLogDir=/usr/local/zookeeper/data
/usr/local/zookeeper/bin/zkServer.sh start
tar -zxf kafka_2.12-2.1.0.tgz
[root@localhost ~]# mv kafka_2.12-2.1.0 /usr/local/kafka
[root@localhost ~]# cd /usr/local/kafka/
[root@localhost kafka]# vim config/server.properties
21行
broker.id=1
31行 本机 IP 监听
listeners=PLAINTEXT://192.168.10.11:9092
65行 2个分区
num.partitions=2
123行 本机 IP
zookeeper.connect=192.168.10.11:2181
/usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
netstat -anput | grep 9092
/usr/local/kafka/bin/kafka-topics.sh --create --zookeeper 192.168.10.11:2181 --replication-factor 1 --partitions 2 --topic elk
[root@localhost kafka]# /usr/local/kafka/bin/kafka-console-producer.sh \
> --broker-list 192.168.10.11:9092 --topic elk
>hel^H^Hasd
>das
验证
新建终端 如果数据一样 就对了
[root@localhost ~]# /usr/local/kafka/bin/kafka-console-consumer.sh \
> --bootstrap-server 192.168.10.11:9092 --topic elk \
> --from-beginning
hasd
das
111
第三台主机
tar -zxf logstash-6.3.2.tar.gz
mv logstash-6.3.2 /usr/local/logstash
vim /usr/local/logstash/config/logstash.yml
64行
path.config: /usr/local/logstash/config/*.conf
77行 自动重载
config.reload.automatic: true
81行 取消注释 自动加载配置文件的时间间隔
config.reload.interval: 3s
190行 本机IP
http.host: "192.168.10.30"
第四台主机
tar -zxf filebeat-6.3.2-linux-x86_64.tar.gz
mv filebeat-6.3.2-linux-x86_64 /usr/local/filebeat
安装nginx
方案1 nginx 收集 filebeat 拿去日志 logstash -->es<-- kibana
input output
操作 设置输入和输出
vim /usr/local/filebeat/filebeat.yml 相当于给数据一个标签
编辑
默认输入 Filebeat inputs
15 21 27 28 注释掉
22行 输出
filebeat:
prospectors:
- type: log
paths:
- /usr/local/nginx/logs/access.log
tage: ["nginx"]
# Change to true to enable this input configuration.
enabled: true 此配置是否生效
默认输出 Elasticsearch output
148 150 注释掉
158 160 取消注释 输入
output.logstash:
# The Logstash hosts
hosts: ["192.168.10.30:5044"] 第三台IP
第三台主机 这个给数据一个索引
vim /usr/local/logstash/config/nginx-logstash.conf
input {
beats {
port => 5044
}
}
output {
elasticsearch {
hosts => "192.168.10.70:9200" 第一天IP
index => "nginx"
}
}
~
第一天 启动
su es
/usr/local/es/bin/elasticsearch
第二台 启动 不用配置 因为只要接收一下日志就可以
/usr/local/kibana/bin/kibana
第三台 启动
/usr/local/logstash/bin/logstash -t /usr/local/logstash/config/nginx-logstash.conf
第四台 启动
/usr/local/filebeat/filebeat -c /usr/local/filebeat/filebeat.yml
访问第二台IP:5601
方案二 加入消息队列
第四台主机修改filebeat
vim /usr/local/filebeat/filebeat.yml
156-460 行
#utput.logstash:
# The Logstash hosts
# hosts: ["192.168.10.30:5044"]
output.kafka:
enabled: true
hosts: ["192.168.10.1
第三台主机
vim /usr/local/logstash/config/nginx-logstash.conf
input {
kafka {
bootstrap_servers => "192.168.10.11:9092" kafka 的IP端口
consumer_threads => 1 线程数量
decorate_events => "ture" 消息的偏移量 加到日志里面去
topics => "elk"
auto_offset_reset => "latest" 自己把便宜量设置刀最新的里面
}
}
output {
elasticsearch {
hosts => "192.168.10.70:9200"
index => "kafka"
}
}
启动 所有服务 和kefka
方案二 的页面配置
https://blog.csdn.net/weixin_43557605/article/details/101287006