Log Collector Configuration Guide for Logstash and Filebeat

This document provides comprehensive deployment configurations for Logstash as both standalone collector and Filebeat log shipper, including Docker containerization, Kubernetes cluster deployment templates, and multiline log parsing implementations.

This article was published 744 days ago, some content may be outdated. If you have any questions, please leave a comment.

Log Format

1
2024-01-29 16:11:11.189 |INFO | 1.1.1.1|2345 | com.smart.service.receive.impl.ReceiveServiceImpl:903 | Capability>Total 04 steps | 6df2f14fca4b40f6be89b9ef19382c42adasfasf

Logstash Configuration Example

docker deployment

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
[root@master logstash]# cat docker-compose.yaml

version: '3'
services:
  logstash:
    image: docker.elastic.co/logstash/logstash:8.12.0
    container_name: logstash
    volumes:
      - ./conf/logstash.yml:/usr/share/logstash/config/logstash.yml
      - ./conf/conf.d:/usr/share/logstash/config/conf.d/
      - ./logs:/opt
    ports:
      - 5044:5044

config files

logstash.yml

1
2
3
http.host: "0.0.0.0"
xpack.monitoring.elasticsearch.hosts: [ "http://192.168.142.106:9200" ]
path.config: /usr/share/logstash/config/conf.d/*.conf
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
input {
    file {
       type => "info_log"
       path => "/opt/kaikai.log"
       discover_interval => 10 # listen interval
       start_position => "end"
       # sincedb_path => "/usr/share/logstash/sincedb_kaikai"
       #start_position => "beginning"
       codec => multiline {
           pattern => "^%{TIMESTAMP_ISO8601}"
           negate => true
           what => "previous"
       }
    }

    file {
      type => "error_log"
      path => "/opt/error.log"
      discover_interval => 10
      start_position => "beginning"
      codec => multiline {
          pattern => "^%{TIMESTAMP_ISO8601}"
          negate => true
          what => "previous"
      }
    }
}

filter {

    grok {
      match => { "[log][file][path]" => "/(?<logfilename>[^/]+)\.log$" }  # get file name logfilename
    }
    grok {
        match => { "message" => "%{DATA:time}\|%{DATA:level}\|%{DATA:ip}\|%{DATA:pid}\|%{DATA:source}\|%{GREEDYDATA:content}"}
    }
    if "_grokparsefailure" in [tags] {
        mutate {
            add_field => { "content" => "%{message}" }
            add_field => { "level" => "ERROR" }
        }
    }

}

output {
    stdout { codec => rubydebug }
    elasticsearch {
        hosts => ["192.168.142.106:9200"]
        index => "%{logfilename}-%{+YYYY-MM-dd}"   # index by file name
    }

}

k8s deployment

logstash.yaml

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
apiVersion: v1
kind: ConfigMap
metadata:
  name: log-file-config
data:
  logstash.yml: |
    http.host: "0.0.0.0"
    xpack.monitoring.elasticsearch.hosts: [ "http://192.168.142.106:9200" ]
    #xpack.monitoring.elasticsearch.hosts: [ "http://192.168.142.106:9200" ]
    path.config: /usr/share/logstash/config/conf.d/*.conf
  collect.conf: |
    input {
        beats {
            port => 5044
        }
    }

    filter {

        grok {
            match => { "[log][file][path]" => ["/(?<logfilename>[^/]+)\.log$"] }
        }
        grok {
            match => { "message" => "%{DATA:time}\|%{DATA:level}\|%{DATA:ip}\|%{DATA:pid}\|%{DATA:source}\|%{GREEDYDATA:content}" }
        }
        if "_grokparsefailure" in [tags] {
            mutate {
                add_field => { "content" => "%{message}" }
                add_field => { "level" => "ERROR" }
            }
        }


    }

    output {
        stdout { codec => rubydebug }
        elasticsearch {
            hosts => ["192.168.142.106:9200"]
            index => "%{logfilename}-%{+YYYY-MM-dd}"
        }


    }
---
kind: Deployment
apiVersion: apps/v1
metadata:
  name: logstash
  labels:
    app: logstash
spec:
  replicas: 4
  selector:
    matchLabels:
      app: logstash
  template:
    metadata:
      labels:
        app: logstash
      annotations:
        appName: logstash
        appType: java
    spec:
      containers:

        - name: logstash-logging
          image: registry.cn-beijing.aliyuncs.com/kaikai136/logstash:8.12.0
          volumeMounts:
          - name: logstash-config
            mountPath: /usr/share/logstash/config/logstash.yml
            subPath: logstash.yml
          - name: logstash-config
            mountPath: /usr/share/logstash/config/conf.d/collect.conf
            subPath: collect.conf

      volumes:
        - name: logstash-config
          configMap:
            name: log-file-config
            items:
              - key: logstash.yml
                path: logstash.yml
              - key: collect.conf
                path: collect.conf

      imagePullSecrets:
        - name: my-harbor


---
apiVersion: v1
kind: Service
metadata:
  name: logstash-svc
  labels:
    app: logstash-svc
spec:
  ports:
  - port: 5044
    targetPort: 5044
    protocol: TCP
    name: http
    nodePort: 32467
  type: NodePort
  selector:
    app: logstash

filebeat

collector test

filebeat.yaml

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
apiVersion: v1
kind: ConfigMap
metadata:
  name: filebeat-config
data:
  filebeat.yml: |
    filebeat.inputs:
      - type: log
        enabled: true
        paths:
          - /logs/*_info.log
        scan_frequency: 1s  # set scan frequency to 1 second
        harvester_buffer_size: 32768  # increase harvester buffer size
        backoff_factor: 2
        ignore_older: 24h  # ignore files older than 24 hours
        close_inactive: 5m  # close harvester inactive for 5 minutes
        clean_inactive: 72h  # clean inactive harvester after 72 hours
        close_removed: true  # close harvester when file is removed
        clean_removed: true  # clean removed harvester
        close_eof: true  # close harvester when file reaches EOF
        multiline.pattern: '^[0-9]{4}'  # match multiline logs
        multiline.negate: true
        multiline.match: after
        var.convert_timezone: true  # convert timezone
        encoding: UTF-8  # set encoding
        fields:
          wisentIp: 0.0.0.0  # add custom field
          log_type: info_log
      - type: log
        enabled: true
        paths:
          - /logs/*_error.log
        scan_frequency: 1s  # set scan frequency to 1 second
        harvester_buffer_size: 32768  # increase harvester buffer size
        backoff_factor: 2
        ignore_older: 24h  # ignore files older than 24 hours
        close_inactive: 5m  # close harvester inactive for 5 minutes
        clean_inactive: 72h  # clean inactive harvester after 72 hours
        close_removed: true  # close harvester when file is removed
        clean_removed: true  # clean removed harvester
        close_eof: true  # close harvester when file reaches EOF
        multiline.pattern: '^[0-9]{4}'  # match multiline logs
        multiline.negate: true
        multiline.match: after
        var.convert_timezone: true  # convert timezone
        encoding: UTF-8  # set encoding
        fields:
          wisentIp: 0.0.0.0  # add custom field
          log_type: error_log



    queue.mem:
      events: 4096  # memory queue size
      flush.min_events: 2048  # minimum flush events
      flush.timeout: 1s  # flush timeout

    #queue.disk:
    #  max_size: 1024mb  # maximum disk usage
    #  segment_size: 10mb  # size of each segment
    #  max_retries: 3  # maximum retries
    logging.level: debug
    filebeat.shutdown_timeout: 30s  # ensure enough time to process current events when shutting down Filebeat
    throttle: 5s  # set the time Filebeat waits before being throttled


    logging.level: info  # set logging level to info for detailed run information
    logging.to_files: true
    logging.files:
      path:  /usr/share/filebeat/logs
      name: filebeat
      keepfiles: 7
      permissions: 0644

    output.logstash:
      hosts: ["logstash-svc.default.svc.cluster.local:5044"]
---

kind: Deployment
apiVersion: apps/v1
metadata:
  name: filebeat
  labels:
    app: filebeat
spec:
  replicas: 1
  selector:
    matchLabels:
      app: filebeat
  template:
    metadata:
      labels:
        app: filebeat
      annotations:
        appName: filebeat
        appType: java
    spec:
      containers:

        - name: filebeat-logging
          image: registry.cn-beijing.aliyuncs.com/kaikai136/filebeat:8.12.0
          volumeMounts:
          - name: filebeat-config
            mountPath: /usr/share/filebeat/filebeat.yml
            subPath: filebeat.yml
          - name: myhostpath
            mountPath: /logs

      volumes:
        - name: filebeat-config
          configMap:
            name: filebeat-config
            items:
              - key: filebeat.yml
                path: filebeat.yml
        - name: myhostpath
          hostPath:
            path: /opt/kaikai/file-logstash/filebeat_log
            type: DirectoryOrCreate


      imagePullSecrets:
        - name: my-harbor
Facing the sea with spring blossoms.
Built with Hugo
Theme Stack designed by Jimmy