ELK/EFK日志管道:Filebeat/Logstash架构
ELK/EFK日志管道:Filebeat/Logstash架构
ELK/EFK架构概览
ELK(Elasticsearch + Logstash + Kibana)和EFK(Elasticsearch + Fluentd + Kibana)是主流的日志收集和分析平台。Filebeat替代Logstash作为轻量级采集器。
ELK架构:
┌─────────────────────────────────────────────────┐
│ 日志源 │
│ 应用日志 | 系统日志 | 容器日志 | 中间件日志 │
└──────────────────────┬──────────────────────────┘
│
┌──────────────────────▼──────────────────────────┐
│ 采集层(Filebeat/Fluentd) │
│ 轻量级 | 资源占用低 | 支持多种Input │
└──────────────────────┬──────────────────────────┘
│
┌──────────────────────▼──────────────────────────┐
│ 处理层(Logstash/Fluentd) │
│ 解析 | 转换 | 过滤 | 富化 │
└──────────────────────┬──────────────────────────┘
│
┌──────────────────────▼──────────────────────────┐
│ 存储层(Elasticsearch) │
│ 索引 | 搜索 | 聚合 | 分片 | 副本 │
└──────────────────────┬──────────────────────────┘
│
┌──────────────────────▼──────────────────────────┐
│ 展示层(Kibana) │
│ Dashboard | Discover | Visualize | Alert │
└─────────────────────────────────────────────────┘
Filebeat配置
基础配置
# filebeat.yml
filebeat.inputs:
# 容器日志
- type: container
enabled: true
paths:
- '/var/lib/docker/containers/*/*.log'
processors:
- add_docker_metadata:
host: "unix:///var/run/docker.sock"
# 应用日志
- type: log
enabled: true
paths:
- '/var/log/app/*.log'
fields:
service: myapp
environment: production
json.keys_under_root: true
json.add_error_key: true
# Nginx日志
- type: log
enabled: true
paths:
- '/var/log/nginx/access.log'
fields:
service: nginx
fields_under_root: true
# 输出配置
output.elasticsearch:
hosts: ["elasticsearch:9200"]
index: "logs-%{[service]}-%{+yyyy.MM.dd}"
username: "elastic"
password: "${ELASTIC_PASSWORD}"
# 或输出到Logstash
output.logstash:
hosts: ["logstash:5044"]
# 处理器
processors:
- add_host_metadata:
when.not.contains.tags: forwarded
- add_cloud_metadata: ~
- add_kubernetes_metadata:
host: ${NODE_NAME}
matchers:
- logs_path:
logs_path: "/var/log/containers/"
Logstash配置
Pipeline配置
# logstash.conf
input {
beats {
port => 5044
}
kafka {
bootstrap_servers => "kafka:9092"
topics => ["app-logs"]
group_id => "logstash-consumers"
codec => json
}
}
filter {
# 解析Nginx日志
if [service] == "nginx" {
grok {
match => {
"message" => "%{COMBINEDAPACHELOG}"
}
}
geoip {
source => "clientip"
}
}
# 解析Java异常
if [message] =~ /^\t/ {
multiline {
pattern => "^\\t"
what => "previous"
}
}
# 时间戳解析
date {
match => [ "timestamp", "ISO8601", "yyyy-MM-dd HH:mm:ss" ]
target => "@timestamp"
}
# 字段转换
mutate {
convert => {
"response_time" => "float"
"status_code" => "integer"
}
remove_field => ["agent", "ecs"]
}
# 敏感信息脱敏
mutate {
gsub => [
"message", "\b\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}\b", "****-****-****-****",
"message", "\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b", "***@***.***"
]
}
}
output {
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "logs-%{[service]}-%{+YYYY.MM.dd}"
user => "elastic"
password => "${ELASTIC_PASSWORD}"
}
}
EFK(Fluentd)替代方案
# fluentd配置
apiVersion: v1
kind: ConfigMap
metadata:
name: fluentd-config
data:
fluent.conf: |
<source>
@type tail
@id in_tail_container_logs
path /var/log/containers/*.log
pos_file /var/log/fluentd-containers.log.pos
tag kube.*
read_from_head true
<parse>
@type json
time_key time
time_format %Y-%m-%dT%H:%M:%S.%NZ
</parse>
</source>
<filter kube.**>
@type kubernetes_metadata
</filter>
<match kube.**>
@type elasticsearch
host elasticsearch
port 9200
logstash_format true
logstash_prefix logs
<buffer>
@type file
path /var/log/fluentd-buffers/kubernetes.system.buffer
flush_mode interval
flush_thread_count 2
flush_interval 5s
retry_type exponential_backoff
retry_forever true
retry_max_interval 30
chunk_limit_size 2M
queue_limit_length 8
overflow_action drop_oldest_chunk
</buffer>
</match>
索引管理
// 索引生命周期管理(ILM)
PUT _ilm/policy/logs-policy
{
"policy": {
"phases": {
"hot": {
"min_age": "0ms",
"actions": {
"rollover": {
"max_primary_shard_size": "30gb",
"max_age": "1d"
}
}
},
"warm": {
"min_age": "7d",
"actions": {
"shrink": {
"number_of_shards": 1
},
"forcemerge": {
"max_num_segments": 1
}
}
},
"cold": {
"min_age": "30d",
"actions": {
"freeze": {}
}
},
"delete": {
"min_age": "90d",
"actions": {
"delete": {}
}
}
}
}
}
最佳实践
- 轻量采集:使用Filebeat替代Logstash作为采集器,降低资源占用
- 结构化日志:应用输出JSON格式日志,避免复杂的Grok解析
- 索引策略:按日期和应用创建索引,配合ILM管理生命周期
- 采样高量日志:高QPS日志使用采样,避免存储爆炸