ELK EFLK日志平台基于ElastAlert的监控告警
  ETWZF7L1nsXD 2023年11月02日 44 0

一、前言

1.1、产生背景

ElastAlert最初由Yelp开发并开源,旨在解决实时监控和告警的需求。由于Elasticsearch的日志处理能力强大,许多组织和企业使用它来存储和分析大量的日志数据。然而,仅仅存储和分析数据可能无法满足实时监控和快速响应的需求(XPACK收费),因此ElastAlert应运而生。

1.2、功能介绍

  • 实时日志监控:ElastAlert可以实时监听Elasticsearch中的日志数据,支持灵活的查询语法来检索和筛选符合特定条件的日志事件。
  • 告警规则定义:用户可以使用YAML配置文件定义告警规则,包括查询条件、时间窗口、触发条件等。
  • 多种告警通知方式:ElastAlert支持多种告警通知方式,如电子邮件、Slack、PagerDuty等。
  • 内置和自定义告警策略:ElastAlert内置了一些常见的告警策略及自定义告警模块,如持续时间、聚合计数、阈值等。
  • 智能告警延迟和降噪:ElastAlert具有智能的延迟和降噪机制(时间窗口、事件频率等),避免因短暂的异常导致大量冗余告警。

二、ElastAlert部署配置

2.1、封装dingtalk告警模块

下载dingtalk组件:

$ wget https://xmars-devops.oss-cn-shanghai.aliyuncs.com/AliCloud/master.zip

增加dingtalk告警逻辑 dingtalk_alert.py

#! /usr/bin/env python
# -*- coding: utf-8 -*-
import json
import requests
from elastalert.alerts import Alerter, DateTimeEncoder
from requests.exceptions import RequestException
from elastalert.util import EAException
import time
import hmac
import hashlib
import base64
import urllib.parse

class DingTalkAlerter(Alerter):
   required_options = frozenset(['dingtalk_webhook', 'dingtalk_msgtype'])

   def __init__(self, rule):
       super(DingTalkAlerter, self).__init__(rule)
       self.dingtalk_webhook_url = self.rule['dingtalk_webhook']
       self.dingtalk_msgtype = self.rule.get('dingtalk_msgtype', 'text')
       self.dingtalk_isAtAll = self.rule.get('dingtalk_isAtAll', False)
       self.dingtalk_title = self.rule.get('dingtalk_title', '')
       self.dingtalk_secret = self.rule.get('dingtalk_secret','')
   def format_body(self, body):
       return body.encode('utf8')

   def alert(self, matches):
       headers = {
           "Content-Type": "application/json",
           "Accept": "application/json;charset=utf-8"
      }
       body = self.create_alert_body(matches)
       payload = {
           "msgtype": self.dingtalk_msgtype,
           "text": {
               "content": body
          },
           "at": {
               "isAtAll": False
          }
      }
       if self.dingtalk_secret!="":
           timestamp = str(round(time.time() * 1000))
           secret = self.dingtalk_secret
           secret_enc = secret.encode('utf-8')
           string_to_sign = '{}\n{}'.format(timestamp, secret)
           string_to_sign_enc = string_to_sign.encode('utf-8')
           hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest()
           sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))
           self.dingtalk_webhook_url=self.dingtalk_webhook_url+"×tamp={}&sign={}".format(timestamp,sign)

       try:
           response = requests.post(self.dingtalk_webhook_url,
                                    data=json.dumps(payload, cls=DateTimeEncoder),
                                    headers=headers)
           response.raise_for_status()
       except RequestException as e:
           raise EAException("Error request to Dingtalk: {0}".format(str(e)))

   def get_info(self):
       return {
           "type": "dingtalk",
           "dingtalk_webhook": self.dingtalk_webhook_url
      }
       pass

重新封装基于dingtalk模块的监控告警:

FROM jertel/elastalert-docker:0.2.4
ADD master.zip /opt/elastalert/
RUN cd /opt/elastalert;unzip master.zip;cd elastalert-dingtalk-plugin-master;pip3 install -i https://mirrors.aliyun.com/pypi/simple/   pyOpenSSL==16.2.0;pip3 install -i https://mirrors.aliyun.com/pypi/simple/ setuptools==46.1.3;cp -r elastalert_modules /usr/local/lib/python3.6/;cd /usr/local/lib/python3.6/elastalert_modules; rm -rf dingtalk_alert.py
ADD dingtalk_alert.py /usr/local/lib/python3.6/elastalert_modules/
ENV TZ=Asia/Shanghai
RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone

镜像编译&推送

$ docker build -t harbor-local.kubernets.cn/library/elatrt:v22 .

$ docker push harbor-local.kubernets.cn/library/elatrt:v22

2.2、定制K8S方式部署

  • configmap:配置文件 & Rules文件
  • deployment:elastalert控制器文件
apiVersion: v1
kind: ConfigMap
metadata:
name: elastalert-config
namespace: logging
labels:
  app: elastalert
data:
elastalert_config: |-            # elastalert配置文件
  ---
  rules_folder: /opt/rules       # 指定规则的目录
  scan_subdirectories: false
  es_host: elasticsearch-master# 修改为当前集群的es链接地址
  es_port: 9200
  run_every:                     # 多久从 ES 中查询一次
    seconds:  30
  buffer_time:              #向上翻30分钟查找
    minutes: 30
  writeback_index: elastalert    #创建索引名字
  use_ssl: False      #ssl不做认证
  verify_certs: True
  alert_time_limit:             # 失败重试限制
    minutes:  2400

---
apiVersion: v1
kind: ConfigMap
metadata:
name: elastalert-rules
namespace: logging
labels:
  app: elastalert
data:
rule_config.yaml: |-        # elastalert规则文件
  name: test-alert     # 规则名字,唯一值
  es_host: elasticsearch-master     #es地址,k8s的es
  es_port: 9200               #es端口
  type: any                  #所有类型
  index: k8s-*               #要搜索的索引
  num_events: 1
  timeframe:
    minutes: 1
    #1分钟内,统计个数大于等于1个触发报警
  filter:
    - query:
      query_string:
        query: "ERROR"  #key:value格式,匹配错误日志
  alert:
  - "elastalert_modules.dingtalk_alert.DingTalkAlerter"  #钉钉模块
  dingtalk_webhook: "https://oapi.dingtalk.com/robot/send?access_token=064dda46xxxxxxe6760d70df9017a27332b3760d143d9ebdb248f0"  #钉钉地址
  dingtalk_sercurity_tpye: "sign"      #钉钉加签格式
  dingtalk_msgtype: "text"             #消息类型
  dingtalk_secret: "SEC82341xxxxxxx01a31xxxxa2bcbae91920ace2acc56cc023ac8"  #钉钉加签
  alert_subject: "EFK Error!!!"      #报警信息
  alert_text_type: alert_text_only
  alert_text: |  #和下面匹配key:value
    EFK 日志报错, 参照如下信息进行定位!!!
    time: {}
    hostname: {}
    podName: {}
    nameSpaces: {}
    message: {}
    logIndex: {}
  alert_text_args:
  - "@timestamp"
  - kubernetes.host
  - kubernetes.pod_name
  - kubernetes.namespace_name
  - message
  - kubernetes.labels.logIndex
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: elastalert
namespace: logging
labels:
  app: elastalert
spec:
selector:
  matchLabels:
    app: elastalert
template:
  metadata:
    labels:
      app: elastalert
  spec:
    containers:
    - name: elastalert
      image: harbor-local.kubernets.cn/library/elatrt:v22
      imagePullPolicy: IfNotPresent
      command: ["/opt/elastalert/run.sh"]
      volumeMounts:
      - name: config
        mountPath: /opt/config
      - name: rules
        mountPath: /opt/rules
      resources:
        limits:
          cpu: 512m
          memory: 512Mi
        requests:
          cpu: 50m
          memory: 256Mi
    volumes:
    - name: rules
      configMap:
        name: elastalert-rules
    - name: config
      configMap:
        name: elastalert-config
        items:
        - key: elastalert_config
          path: elastalert_config.yaml

应用部署:

$ kubectl apply -f elastalert.yaml
$ kubectl get pod -nlogging |grep elastalert

三、测试验证

部署测试pod

apiVersion: v1
kind: Pod
metadata:
name: counter
labels:
  logging: "true" # 一定要具有该标签才会被采集
  logIndex: "zhdya"  # 指定索引名称
spec:
containers:
  - name: count
    image: busybox
    args:
       [
        /bin/sh,
        -c,
        'i=0; while true; do echo "ERROR $i: $(date)"; i=$((i+1)); sleep 1; done',# 新增报错ERROR字段
       ]

Kinbana查看日志数据:

ELK EFLK日志平台基于ElastAlert的监控告警_python

查看钉钉告警:

ELK EFLK日志平台基于ElastAlert的监控告警_elasticsearch_02

四、企业场景下的多类型告警方案

4.1、多索引,多匹配

name: MultipleErrorLogs
type: frequency
num_events: 5    
timeframe:      
minutes: 10
#10分钟内,统计个数大于等于5个触发报警
index:
  - myapp-logs-*
  - otherapp-logs-*
filter:
- query:
  query_string:
    query: "error OR exception OR warning"
    default_operator: OR
- query:
  query_string:
    query: "something else"
    default_operator: OR
- query:
  query_string:
    query: "another error"
    default_operator: OR
alert:
- "email"

4.2、多个单索引,单匹配

name: ErrorAlert1
type: frequency
num_events: 1
timeframe:
  minutes: 5
index: my-index-1
filter:
- query:
    query_string:
      query: "error OR exception"
      default_operator: OR
alert:
- "email"

name: ErrorAlert2
type: frequency
num_events: 1
timeframe:
  minutes: 5
index: my-index-2
filter:
- query:
    query_string:
      query: "file not found OR invalid argument"
      default_operator: OR
alert:
- "email"

4.3、基于复杂场景下的configmap

apiVersion: v1
kind: ConfigMap
metadata:
  name: elastalert-config
  namespace: logging
  labels:
    app: elastalert
data:
  elastalert_config: |-            #  elastalert配置文件
    ---
    rules_folder: /opt/rules       # 指定规则的目录
    scan_subdirectories: false
    es_host: elasticsearch-master
    es_port: 9200
    run_every:                     # 多久从 ES 中查询一次
      minutes:  1
    buffer_time:              #向上翻30分钟查找
      minutes: 30
    writeback_index: elastalert    #创建索引名字
    use_ssl: False      #ssl不做认证
    verify_certs: True
    alert_time_limit:             # 失败重试限制
      minutes:  2400

---
apiVersion: v1
kind: ConfigMap
metadata:
  name: elastalert-rules
  namespace:  logging
  labels:
    app: elastalert
data:
  rule_config.yaml: |-        # elastalert规则文件
     name: test-alert     # 规则名字,唯一值
     es_host: elasticsearch-master     #es地址,k8s的es
     es_port: 9200               #es端口
     #type: any                  #所有类型
     index: allapps-*           #要搜索的索引
     #策略规则,如果在10分钟内,匹配的个数大于等于5,那么就触发钉钉报警
     type: frequency
     num_events: 5
     timeframe:
       minutes: 10
     filter:
      - query:
         query_string:
           query: "\"获取AccessToken失败\""
     alert:
     - "elastalert_modules.dingtalk_alert.DingTalkAlerter"  #钉钉模块
     dingtalk_webhook: "https://oapi.dingtalk.com/robot/send?access_token=cc67ce67cb556XXXXXXXda113c2d30f98e4647e704"  #钉钉地址
     dingtalk_sercurity_tpye: "sign"      #钉钉加签格式,感觉可以不要
     dingtalk_msgtype: "text"             #发消息内容
     dingtalk_secret: "SEC2dc9489ccac40XXXXXX4b066c6fd8fd54ffcaa834df2"  #钉钉加签
     alert_subject: "ERROR!!!"      #报警信息
     alert_text_type: alert_text_only
     alert_text: |  #和下面匹配key:value
      日志监控
      mess: {}
      pod-name: {}
     alert_text_args:
     - message
     - kubernetes.pod.name


  rule_kafka.yaml: |-        # elastalert规则文件
    name: kafka-alert     # 规则名字,唯一值
    es_host: elasticsearch     #es地址,k8s的es
    es_port: 9200               #es端口
    #type: any                  #所有类型
    index: dtk-go-tb-order-*           #要搜索的索引
    type: frequency
    num_events: 5
    timeframe:
      minutes: 10
    filter:
     - query:
        query_string:
          query: "\"写入Kafka消息彻底失败\""
    alert:
    - "elastalert_modules.dingtalk_alert.DingTalkAlerter"  #钉钉模块
    dingtalk_webhook: "https://oapi.dingtalk.com/robot/send?access_token=cc67ce67cb55XXXXXX113c2d30ea521cf7e704"  #钉钉地址
    dingtalk_sercurity_tpye: "sign"      #钉钉加签格式,感觉可以不要
    dingtalk_msgtype: "text"             #发消息内容
    dingtalk_secret: "SEC2dc9489ccac40fb1XXXXXXX6fd8fd4ffcaa834df2"  #钉钉加签
    alert_subject: "ERROR!!!"      #报警信息
    alert_text_type: alert_text_only
    alert_text: |  #和下面匹配key:value
     日志监控
     mess: {}
     pod-name: {}
    alert_text_args:
    - message
    - kubernetes.pod.name

五、总结

  • 部署方式:改良传统docker运行方式,适配当前EFLK模式,运行在Kubernetes集群;
  • 规则定义:通过编写规则文件来定义要监控的指标或事件,并设置条件和阈值;
  • 告警方式:提升并优化告警方式为快消息类型,提升效率;
  • 调优和优化:可以根据企业实际情况进行参数调优。包括调整告警规则、优化查询性能、改进告警策略等;
【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月08日 0

暂无评论

推荐阅读
  9JCEeX0Eg8g4   2023年11月25日   37   0   0 ednpython
ETWZF7L1nsXD