← 返回首页
📊

ELK/EFK日志管道:Filebeat/Logstash架构

📂 architecture ⏱ 3 min 426 words

ELK/EFK日志管道:Filebeat/Logstash架构

ELK/EFK架构概览

ELK(Elasticsearch + Logstash + Kibana)和EFK(Elasticsearch + Fluentd + Kibana)是主流的日志收集和分析平台。Filebeat替代Logstash作为轻量级采集器。

ELK架构:
┌─────────────────────────────────────────────────┐
│                日志源                            │
│     应用日志 | 系统日志 | 容器日志 | 中间件日志    │
└──────────────────────┬──────────────────────────┘
                       │
┌──────────────────────▼──────────────────────────┐
│           采集层(Filebeat/Fluentd)             │
│  轻量级 | 资源占用低 | 支持多种Input             │
└──────────────────────┬──────────────────────────┘
                       │
┌──────────────────────▼──────────────────────────┐
│           处理层(Logstash/Fluentd)             │
│  解析 | 转换 | 过滤 | 富化                       │
└──────────────────────┬──────────────────────────┘
                       │
┌──────────────────────▼──────────────────────────┐
│           存储层(Elasticsearch)                │
│  索引 | 搜索 | 聚合 | 分片 | 副本                 │
└──────────────────────┬──────────────────────────┘
                       │
┌──────────────────────▼──────────────────────────┐
│           展示层(Kibana)                       │
│  Dashboard | Discover | Visualize | Alert       │
└─────────────────────────────────────────────────┘

Filebeat配置

基础配置

# filebeat.yml
filebeat.inputs:
  # 容器日志
  - type: container
    enabled: true
    paths:
      - '/var/lib/docker/containers/*/*.log'
    processors:
      - add_docker_metadata:
          host: "unix:///var/run/docker.sock"
  
  # 应用日志
  - type: log
    enabled: true
    paths:
      - '/var/log/app/*.log'
    fields:
      service: myapp
      environment: production
    json.keys_under_root: true
    json.add_error_key: true

  # Nginx日志
  - type: log
    enabled: true
    paths:
      - '/var/log/nginx/access.log'
    fields:
      service: nginx
    fields_under_root: true

# 输出配置
output.elasticsearch:
  hosts: ["elasticsearch:9200"]
  index: "logs-%{[service]}-%{+yyyy.MM.dd}"
  username: "elastic"
  password: "${ELASTIC_PASSWORD}"

# 或输出到Logstash
output.logstash:
  hosts: ["logstash:5044"]

# 处理器
processors:
  - add_host_metadata:
      when.not.contains.tags: forwarded
  - add_cloud_metadata: ~
  - add_kubernetes_metadata:
      host: ${NODE_NAME}
      matchers:
        - logs_path:
            logs_path: "/var/log/containers/"

Logstash配置

Pipeline配置

# logstash.conf
input {
  beats {
    port => 5044
  }
  
  kafka {
    bootstrap_servers => "kafka:9092"
    topics => ["app-logs"]
    group_id => "logstash-consumers"
    codec => json
  }
}

filter {
  # 解析Nginx日志
  if [service] == "nginx" {
    grok {
      match => { 
        "message" => "%{COMBINEDAPACHELOG}" 
      }
    }
    geoip {
      source => "clientip"
    }
  }
  
  # 解析Java异常
  if [message] =~ /^\t/ {
    multiline {
      pattern => "^\\t"
      what => "previous"
    }
  }
  
  # 时间戳解析
  date {
    match => [ "timestamp", "ISO8601", "yyyy-MM-dd HH:mm:ss" ]
    target => "@timestamp"
  }
  
  # 字段转换
  mutate {
    convert => {
      "response_time" => "float"
      "status_code" => "integer"
    }
    remove_field => ["agent", "ecs"]
  }
  
  # 敏感信息脱敏
  mutate {
    gsub => [
      "message", "\b\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}\b", "****-****-****-****",
      "message", "\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b", "***@***.***"
    ]
  }
}

output {
  elasticsearch {
    hosts => ["elasticsearch:9200"]
    index => "logs-%{[service]}-%{+YYYY.MM.dd}"
    user => "elastic"
    password => "${ELASTIC_PASSWORD}"
  }
}

EFK(Fluentd)替代方案

# fluentd配置
apiVersion: v1
kind: ConfigMap
metadata:
  name: fluentd-config
data:
  fluent.conf: |
    <source>
      @type tail
      @id in_tail_container_logs
      path /var/log/containers/*.log
      pos_file /var/log/fluentd-containers.log.pos
      tag kube.*
      read_from_head true
      <parse>
        @type json
        time_key time
        time_format %Y-%m-%dT%H:%M:%S.%NZ
      </parse>
    </source>
    
    <filter kube.**>
      @type kubernetes_metadata
    </filter>
    
    <match kube.**>
      @type elasticsearch
      host elasticsearch
      port 9200
      logstash_format true
      logstash_prefix logs
      <buffer>
        @type file
        path /var/log/fluentd-buffers/kubernetes.system.buffer
        flush_mode interval
        flush_thread_count 2
        flush_interval 5s
        retry_type exponential_backoff
        retry_forever true
        retry_max_interval 30
        chunk_limit_size 2M
        queue_limit_length 8
        overflow_action drop_oldest_chunk
      </buffer>
    </match>

索引管理

// 索引生命周期管理(ILM)
PUT _ilm/policy/logs-policy
{
  "policy": {
    "phases": {
      "hot": {
        "min_age": "0ms",
        "actions": {
          "rollover": {
            "max_primary_shard_size": "30gb",
            "max_age": "1d"
          }
        }
      },
      "warm": {
        "min_age": "7d",
        "actions": {
          "shrink": {
            "number_of_shards": 1
          },
          "forcemerge": {
            "max_num_segments": 1
          }
        }
      },
      "cold": {
        "min_age": "30d",
        "actions": {
          "freeze": {}
        }
      },
      "delete": {
        "min_age": "90d",
        "actions": {
          "delete": {}
        }
      }
    }
  }
}

最佳实践

  1. 轻量采集:使用Filebeat替代Logstash作为采集器,降低资源占用
  2. 结构化日志:应用输出JSON格式日志,避免复杂的Grok解析
  3. 索引策略:按日期和应用创建索引,配合ILM管理生命周期
  4. 采样高量日志:高QPS日志使用采样,避免存储爆炸