ELK日志架构

首先要明确一点的是,ELK需要以下多个工具组件来结合才能使用:

  1. Filebat 轻量级的日志收集工具

  2. Logstash,主要是用来日志的搜集、分析、过滤日志的工具,支持大量的数据获取方式

    一般分为两个版本, client 端我们会通过 Filebat 代替;而 server 端则还是使用 Logstash 方实现,并存放到ES之中进行记录。

  3. Elasticsearch,本身是一个开源的分布式搜索引擎

  4. Kibana,图形化界面协助分析、处理日志数据

ELK几种常见的架构模式

架构一

架构一.png

elk架构是最基础的elk初代状态,但是他的问题也非常明显。上图的是一个非常简单的ELK架构模型,也就是通过在服务节点上通过使用 Logstash 来收集各个服务产生的日志,然后再发送到ES之中进行存储,我们则可以通过Kibana来查询分析我们的ES架构。但是这种方式的缺陷也非常明显,需要在每一个服务上搞一个Logstash来收集并发送消息到我们的目标ES。

架构二

架构二.png

基于最初的ELK架构,很快就衍生出EFK架构,也就是将Logstash完全由Filebeat替换掉。但是这样其实也不太完美,更加完美的做法其实就是将Logstash Cluster作为中间层,组成EFLK。
在上图之中,为了降低性能的消耗的问题,我们将客户端手机数据的部分修改为 Filebeat,然后让Filebeat会将日志数据发送到Logstash上,由多台Logstash组成的Logstash Cluster将会把数据转发到 Elasticsearch 之中,并且Kibana提供针对ES数据的收集和处理工作,帮助我们做分析。
看起来这个架构已经相当好了,似乎没啥问题;但其实如果考虑上微服务在大数据量和请求量的冲击下,可能会大量数据日志导致Logstash Cluster之中的服务发生崩溃。这势必会导致我们的日志服务本身不稳定的问题,这种情况下,我们就需要考虑引入MQ队列的方式来优化。

架构三

架构三.png

就跟案例二的介绍说的一样,因为在正常情况下使用,架构二应该是能满足需要的,但问题在于我们大概率不是真的就一致都在平稳状态,面对请求爆发的时候我们原先的处理往往就不能满足需要了,在这种情况往往就需要加入Kafka来解决这个问题。上图的就是较为常见的架构模式。也就是ELFK + Kafka做缓冲的架构图。

ELK架构搭建

下面按照顺序先后安装 Kafka、Filebeat、Logstash 三大组件。

Logstash

还记得上面的介绍吗?Logstash在不需要filebeat的情况下其实也能完成工作,提供完整的日志采集服务将采集到的日志交给ES进行处理。所以在Logstash之中其实是包含了filebeat的工作的,以下先做一些简单的介绍。

logstash架构.png

Logstash会通过input线程找到目标的日志文件,然后通过日志文件本身的监听,将日志数据流交给filter进行处理,并将最终处理的结果交给output,显然的output不仅仅只会用es。

所以从整个流程的角度来看,我们Logstash的处理流程可以概括为以下的步骤:input -- > event --> filter event --> output。

  • input:通过input我们可以将数据从源之中不断采集
  • filter:数据采集、处理操作,可以将数据增加 / 删除其中的某些字段 (也就是对数据进行处理)
  • output:通过output可以将处理后的数据保存到其他的系统之中。
  • event:在管道之中流转的数据,可以通过使用Codes Plugins来对数据做编码和解码工作,比较流程的格式有 json 和 msgpack、plain(text)等

除了这几个基础的使用以外,Logstash还支持了一些input、filter、output等众多的相关插件 (提供针对不同的数据源相关的插件数据获取,包括有kafka topic、服务日志、mysql日志等),因为篇幅问题这里不会过多的赘述。

安装Logstash

# 下载 tar 包   (300+M,用wget准备下过年)
wget https://artifacts.elastic.co/downloads/logstash/logstash-7.13.0-linux-x86_64.tar.gz

# 解压
tar xvf logstash-7.13.0-linux-x86_64.tar.gz

# 重命名
mv logstash-7.13.0 logstash

增加配置

与一般的中间件的配置有所不同的是 Logstash 的配置文件需要我们自己创建,名字也是自定义的,我们可以取任意名字,当然了为了规范一些,一般来说会使用 kafka-topic_name 或者是 pipeline 的名字。并且更为规范的话,一般来说会存放到以下两个位置:

  • /etc/logstash/conf.d --- Linux (或者 LOGSTASH/config)
  • {LOGSTASH_HOME}/config --- Windows

配置的文件,我们可以构建起来 kafka-topic_name.conf / .yml 或者 pipline.conf / .yml 又或者是 server.conf / .yml。至于内部的配置,可以参考提供的例子。

# 将以下内容写入到到 OrderServiceLog.conf 中
input {                                          # 使用kafka插件从kakfa的topic之中获取日志
    kafka {
        bootstrap_servers => ["localhost:9092"]           # kafka path
        topics => ["order_service"] 					# kafka topic 名 (可以指定多个 topic)
        codec => "json"								   # kafka 消息格式  (默认是 plain)
        auto_offset_reset => "latest"                     # kafka log文件偏移量  
        decorate_events=> "basic"					    # 是否要携带kafka的元数据
        group_id => "logstash01"					    # kafka消费组id,以group作为单位消费消息
    }
}

filter {
    grok {
        match => { "message" => "(?m)^\[(?<log_date>\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2},\d{3})\]%{SPACE}\[(?<level>[A-Z]{4,5})\]" }                # 使用grok提取日志产生的时间和日志级别
    }
    mutate {								  # 实现日志级别的转换
        lowercase => [ "level" ]
        remove_field => ["beat"]
    }
}

output {
   elasticsearch {
        hosts => ["https://localhost:9211"]                 # 目标ES地址
        index =>  "%{[@metadata][kafka][topic]}_%{+YYYY-MM-dd}"
        user => "elastic" 							     # es账号和密码
        password => "123456"	
        ssl => true										# 是否使用ssl
        ssl_certificate_verification => false
        cacert => "/home/ubuntu/ES/logstash/config/es_ca.pem"	# SSL验证文件 如果没有SSL可以去掉该行
    }
}

上面已经基本介绍了大部分的参数的配置效果和内容,但显然的这还不够,下面将会补充一些信息

input的插件配置
  • auto_offset_reset:除了可以指定lastest以外,还可以配置为earliest和 none。

    • lastest:相当于从最后的消息开始消费
    • earliest:相当于从最初的消息开始消费 (如果之前消费过那么,这里的效果等同于重新消费--重放)
    • none:使用原先默认的offset,但是如果之前没有offset可以使用,这样配置就会报错
  • decorate_events:是否在转发消息的时候携带消息在es之中的元数据,默认的配置是 none,需要使用的时候需要配置为basic 或者 extended才会生效。

    如果要知道这两者的配置有什么区别需要知道kafka元数据的有哪些:

    在配置为 basic 之后,Kafka的元数据会包括有:

    • [@metadata][kafka][topic]: Kafka 主题名称
    • [@metadata][kafka][consumer_group]: 消费者组名称
    • [@metadata][kafka][partition]: 分区号
    • [@metadata][kafka][offset]: 消息在分区中的偏移量
    • [@metadata][kafka][key]: 消息的 key(如果存在)
    • [@metadata][kafka][timestamp]: 消息的时间戳

    而如果配置为extended的话,kakfa的元数据还会包含有以下的内容:

    • [@metadata][kafka][consumer][client_id]: 消费者客户端 ID
    • [@metadata][kafka][consumer][partition_assignment_strategy]: 分区分配策略
    • [@metadata][kafka][consumer][request_timeout_ms]: 请求超时时间(毫秒)
    • [@metadata][kafka][consumer][session_timeout_ms]: 会话超时时间(毫秒)
    • [@metadata][kafka][consumer][max_poll_interval_ms]: 最大轮询间隔(毫秒)
    • [@metadata][kafka][consumer][rebalance_timeout_ms]: 重平衡超时时间(毫秒)
    • [@metadata][kafka][consumer][committed_offset]: 已提交的偏移量
filter插件配置
filter {
    grok {
        match => { "message" => "(?m)^\[(?<log_date>\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2},\d{3})\]%{SPACE}\[(?<level>[A-Z]{4,5})\]" }                # 使用grok提取日志产生的时间和日志级别
    }
    mutate {								  # 实现日志级别的转换
        lowercase => [ "level" ]
        remove_field => ["beat"]
    }
}

filter之中使用提取日志产生的时间和日志级别,并且通过mutate来将日志级别的内容做转换,并且取出掉所有beat相关的字段。

grok

在示例之中,我们将match的匹配正则其实匹配的是我们常见的日志格式。

[2022-04-07 12:07:50,335] [ERROR] [ActorSystemImpl] [default-dispatcher-4] - 登录异常,账号或者密码错误!

(?m)^\[(?<log_date>\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2},\d{3})\]%{SPACE}\[(?<level>[A-Z]{4,5})\]

其中 (?m) 代表的是m条日志输出,而 (?<log_date>\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2},\d{3})显然匹配的是日期时间,%{SPACE}其实是区分大小写的识别,(?<level>[A-Z]{4,5})的实际匹配就是匹配4到5个大写字符

这里可以稍微补充一下 d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2},\d{3}
d{4} 代表的是四位连字符 (代表年份)
- 代表的是字符
\s 代表空白字符
显然的就可以知道,这个字符串匹配的是日期时间格式,精确到秒后三位 2023-04-15 10:30:45,123

如果想要了解更多,不妨可以参考一下官方文档之中的介绍:Grok filter plugin | Logstash Reference 8.15 | Elastic

mutate

mutate其实就是针对我们的匹配字段做一些修改操作,比如执行字段重命名、字段内容替换、大小写转换等功能问题。这个比较简单,如果想要了解更多可以参考官方文档。Mutate filter plugin | Logstash Reference 8.15 | Elastic

output插件配置

output插件其实基本就没啥好聊的了,文档本身介绍的就是ELK日志结构,在我们示例之中使用的更是EFLK架构,那毫无疑问日志最终是要被推送到ES上的,因此output指向的必然是ES。在上文示例之中就是以下的内容:

output {
   elasticsearch {
        hosts => ["https://localhost:9211"]
        index =>  "%{[@metadata][kafka][topic]}_%{+YYYY-MM-dd}"
        user => "elastic" 
        password => "elastic"
        ssl => true
        ssl_certificate_verification => false
        cacert => "/home/ubuntu/ES/logstash/config/es_ca.pem"
    }
}
  • hosts,ES 集群的地址,如果指定一个数组的话会负载到各个节点去。
  • index,指定保存数据的索引,其中我们使用了 %{[@metadata][kafka][topic]} 来获取数据是从哪个Kafka topic 来的,需要注意 Kafka input 插件需要 设置 decorate_events 为 basic 或者 extended 才可以生效。使用 %{+YYYY-MM-dd} 来获取当前日期。最终示例中索引的名字为 order_service_${date}。
  • ssl,开启使用 SSL 连接到 ES 集群。
  • ssl_certificate_verification,是否开启证书认证,这里设置 false。
  • cacert,指定链接到 ES 集群的 .cer 或者 .pem 证书。
启动logstash
nohup ./bin/logstash -f config/${udf_config_filename}.conf >> ./run.log 2>&1 &
// nohup ./bin/logstash -f config/kafkaLogEFLK.conf >> ./run.log 2>&1 &    // case use

Kibana (中间层)

我们可以先安装并配置Kafka (wget命令有点问题,建议是自己用windows下好,再搞过去)

# 使用 wget 下载   (实际上不推荐wget,慢的要死)
wget --no-check-certificate https://dlcdn.apache.org/kafka/3.8.0/kafka_2.12-3.8.0.tgz 
tar xvf kafka_2.12-3.8.0.tgz
mv kafka_2.12-3.8.0.tgz kafka

修改配置

kafka/config/server.properties 中加入以上配置即可,更多 kafka 调优配置你需要参考其官方文档。

image-20241014114245159

listeners=PLAINTEXT://localhost:9092

启动Kafka

创建一个脚本文件 start_kafka.sh,并填入以下的信息

function kill_all_app_by_name()
{
        pid=`ps -ef | grep $1 | grep -v "grep" | awk -F" " '{print $2}'`
        for i in $pid; do
                kill -9 $i
        done
}

kill_all_app_by_name "kafka.Kafka"
kill_all_app_by_name "QuorumPeerMain"


./bin/zookeeper-server-start.sh ./config/zookeeper.properties >> ./logs/zk.log 2>&1 &

sleep 1

./bin/kafka-server-start.sh ./config/server.properties >> ./logs/kafka.log 2>&1 &

如果只是先做一个简单的测试,我们可以先创建一个简单的topic,供后续检查

// 创建出一个简单的Kafka服务
./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic order_service

// 如果想要测试,还可以在终端之中启动生产者  (kafka路径下开启新的终端)
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic order_service
// 如果想要测试,还可以在终端之中启动消费者  (kafka路径下开启新的终端)
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic order_service

Filebeat (服务端)

所谓的FIleBeats其实非常简单,就是单纯一个本地日志采集程序,将本地的日志结果采集起来最终交付给外部的类似于Kafka、Logstash、ES等外部服务。 (需要注意的是,其实Filebeat本身是ES开发的一个工具,它本身内部包含有多个beat组件,针对不同的场景都有做日志处理,只不过其中最出名的在Linux系统下跑的是Filebeat罢了)
如果对其他的beat感兴趣,可以参考es官方提供的beats的介绍页面
Beats:Elasticsearch 的数据采集器 | Elastic

filebeat.png

如同上图所示,其实Filebeat由两个部分组成,首先是Input和Harvester,Filebest会开启一个或者多个Input线程来获取本地日志文件,每当找到一个实际需要监听的日志文件,就会自动的开启一个Harvester来监听和读取这些日志文件的内容,并且将日志的内容实时的转发给 libbeat ,libbeat则通过聚合日志时间和发送聚合后的数据到指定的外部服务之中。
更加当然的,ES毕竟作为一个完全开源的技术(毕竟基于Lucene是Apache的),还提供了官方的文档支持你做自定义的处理。

安装Filebeat

跟Kafka类似的,我们可以通过 wget 下载 (这个可以用wget,因为不大)

# 使用 wget 下载
wget https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-7.13.0-linux-x86_64.tar.gz

# 解压
tar xzvf filebeat-7.13.4-linux-x86_64.tar.gz

# 重命名
mv filebeat-7.13.4-linux-x86_64 filebeat

修改配置

filebeat的配置叫做filebeat.yml,但是需要注意的是,官方提供的示例过于完整了,一般来说用不到这么多的配置,我们可以先将官方提供的配置示例暂时保留起来(备份),然后先按照底下这个最简单的配置方式来配置。(备份完毕之后,可以先把官方配置全删了,直接粘贴即可)

filebeat.inputs:
- type: log
  enabled: true
  paths:
    - server.log # 改变到你的日志路径 (case use: /usr/workSoftware/kafka/kafka.log)

  #fields:
  #  level: debug
  #  review: 1

  multiline.type: pattern
  multiline.pattern: ^\[
  multiline.negate: true
  multiline.match: after

output.kafka:
  # initial brokers for reading cluster metadata
  hosts: ["kafka server path"]      # kafka服务的地址

  # message topic selection + partitioning
  topic: 'kafka topic name'     # kafka用来暂存日志的 topic
  partition.round_robin:
    reachable_only: false

  required_acks: 1
  compression: gzip
  max_message_bytes: 1000000

其中有一段配置比较重要,这里着重介绍一下 (需要注意的是,filebeat的目的是区分出一个日志事件的开始和结束,并将其作为一个完整的事件交给外部的服务,因此需要以下的配置来帮助他识别出日志的关系)

  • multiline.type,默认值为 pattern,定义了使用何种方式聚合数据。pattern 是使用正则匹配,其他选项为 count,将会聚合指定行数的日志为一个事件。
  • multiline.pattern,指定匹配日志的正则表达式,支持的正则策略可以参考官方文档
  • multiline.negate,定义是否否定 multiline.pattern 定义的正则,默认是 false。
  • multiline.match,定义如何合并多行日志。可选项为 after 或者 before。

配置的效果就是:使用正则表达式(type: pattern),如果与正则(^[)不匹配(negate: true)的行会追加到先前的一行后面(match: after)。如果与正则匹配的话,前面所有追加的内容将成为一个完整的日志事件,随后将会被发送到 output 对应的服务中,而当前匹配行将成为一个新日志事件的起始行。

# 修改权限  (也可能不用修改,建议先试试直接启动)
chmod go-w filebeat.yml

# 前台启动
./filebeat -e

# 后台启动,日志在 logs 目录下
./filebeat -c filebeat.yml >> /dev/null 2>&1 &

最后提醒一下,到这里能这么顺利的前提是,你必须事先对ES集群做了安全验证等一系列处理,也就是配置cerebro、kibana、es集群密码、ca验证文件等,这个部分可以参考其他的文档,或者自己搜索也行,很常见。

简单使用ELK

为了更有意思一些,我监听的目标是kafka本身会生成kafka.log日志。当我在shell之中往kafka发送消息的时候,成功在cerebro中看到新生成的索引

image-20241015010237295

通过终端,我尝试发送了多条数据给消费者获取。而filebeat成功将kafka.log的数据采集交给kafka的order_serice topic,并交付给Logstash,最终提交了ES,也成功在cerebro中查看到新生的索引。

那问题来了,这也不方便啊...

这就需要我们使用 kibana 了

  1. 创建索引pattern,让kibana过滤出日志索引

    image-20241015011106407

    image-20241015011139948

    根据我们在 Logstash 自定义的配置文件之中编写的 index pattern 结合实际的 pattern 编写我们的日志索引的 patterns 即可

    image-20241015011915579

    因为我们日志的索引后续跟上了当天的时间,这里会自动提示是否使用@timestamp作为筛选,当然是需要的。

    image-20241015011958292

    然后我们就成功创建出需要的 index_pattern 了

    image-20241015012115134

    此时,我们就可以使用 kibana 的分析功能来根据日期来查看某一天的日志信息了

    image-20241015012238181

    image-20241015012343282

    就此,我们就可以通过搜索框 + 时间范围筛选 点击refersh实现日志查询。最后补充一点,我们当然也可以在此筛选其他日志或者其他index_pattern的相关数据,创建出 index_pattern 后,只需要在此切换即可。
    image-20241015012455660