Logstash 配置

一、基础日志配置

#
input {
     kafka  {
     codec => "json"
     topics => "elk"
     bootstrap_servers => "xxx.xxx.xxx.xxx:9092"
     client_id => "elk"
    }
}



filter {
    #审计日志匹配规则


    if [source] == "/data/logs/audit.log" {
        grok {
            match => {
                "message" => "%{SYSLOGTIMESTAMP:run_time} %{WORD:audit_host} EEO_AUDIT\[%{NUMBER:audit_id}\]: \s*LOGINID=%{NUMBER:lo
gin_id}\s*USER=%{WORD:run_user}\s*PID=%{NUMBER:pid}\s*PWD=%{UNIXPATH:run_pwd}\s*CMD=%{GREEDYDATA:run_cmd}"
            }
        }
    }

    #任务计划匹配规则
    if [source] == "/var/log/cron" {
        grok {
            match => {
                "message" => "%{SYSLOGBASE} \(%{USER:cron_user}\) %{CRON_ACTION:cron_action} \(%{DATA:cron_message}\)"
            }
        }
    }

}


output {

    ##审计日志
    if [source] == "/data/logs/audit.log" {
        elasticsearch {
                   hosts => ["xxx.xxx.xxx.xxx:9200"]
                   index => "auth_elk_%{+YYYY.MM.dd}"
        }
        #终端调试
        #stdout { codec => rubydebug }
    }

    ##系统日志
    if [source] == "/var/log/messages" {
        elasticsearch {
                   hosts => ["xxx.xxx.xxx.xxx:9200"]
                   index => "messages_elk_%{+YYYY.MM.dd}"
        }
    }

    ##任务计划
    if [source] == "/var/log/cron" {
        elasticsearch {
                   hosts => ["xxx.xxx.xxx.xxx:9200"]
                   index => "crond_elk_%{+YYYY.MM.dd}"
        }
    }

    ##硬件日志
    if [source] == "/var/log/dmesg" {
        elasticsearch {
                   hosts => ["xxx.xxx.xxx.xxx:9200"]
                   index => "dmesg_elk_%{+YYYY.MM.dd}"
        }
    }

    ##安全日志
    if [source] == "/var/log/secure" {
        elasticsearch {
                   hosts => ["xxx.xxx.xxx.xxx:9200"]
                   index => "secure_elk_%{+YYYY.MM.dd}"
        }
    }
}

二、服务日志

1、nginx

#
input {
     kafka  {
     codec => "json"
     topics => "elk"
     bootstrap_servers => "xxx.xxx.xxx.xxx:9092"
     client_id => "elk"
    }
}

filter {
    #nginx 日志匹配规则
    if [source] == "/var/log/nginx/www.test.com_user.access.log"  or [source] == "/var/log/nginx/www.test.com_inner.access.log" or [source] == "/var/log/nginx/www.test.com_ecdn.access.log" or [source] == "/var/log/nginx/www.test.com_tcdn.access.log" or [source] == "/var/log/nginx/www.test.com_acdn.access.log" or [source] == "/var/log/nginx/m_eeo_cn.access.log"
    {
        grok {
            match => {
                "message" => '\[%{HTTPDATE:timestamp}\] (%{IPV4:xreal_ip}|-) (%{IPV4:remote_ip}|-) (?:%{IPV4:upstream_ip}\:%{IPORHOST:upstream_port}|-) %{WORD:http_method} %{NUMBER:http_code} (?:%{NUMBER:http_bytes}|-) HTTP/%{NUMBER:http_version} %{WORD:http_type}://%{NOTSPACE:http_hostname} "%{NOTSPACE:http_uri}" "(?:%{NOTSPACE:http_refer}|-)" (%{QS:agent}|-) (?:%{BASE10NUM:request_time}|-)\s*(?:%{BASE10NUM:upstream_response_time}|-) (?:%{NUMBER:upstream_status}|-) (?:%{WORD:upstream_cache_status}|-)'            
            }    
            #remove_field => ['type','_id','input_type','beat','version','offset','beats_input_codec_plain_applied']
        }

        date {
            match => ["timestamp", "yyyy-MM-dd HH:mm:ss,SSS"]
            target => "@timestamp"
            }    

        if [source_ip] != "-" {
            geoip {
                source => "source_ip"
                target => "geoip"
                database => "/etc/logstash/GeoLite2-City.mmdb"
                add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
                add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}"  ]
            }
        }

        mutate {
            split => ["http_uri", "?action="]
            add_field => ["api","%{http_uri[1]}"]
                           convert =>  ["http_code", "integer"]
                           convert =>  ["http_bytes", "integer"]
                           convert =>  ["request_time", "float"]
            convert => [ "[geoip][coordinates]", "float"]
        }
    }

    #错误日志匹配规则
    if [source] == "/var/log/nginx/www.test.com_user.error.log"  or [source] == "/var/log/nginx/www.test.com_inner.error.log" or [source] == "/var/log/nginx/www.test.com_ecdn.error.log" or [source]== "/var/log/nginx/www.test.com_tcdn.error.log" or [source] == "/var/log/nginx/www.test.com_acdn.error.log" or [source] == "/var/log/nginx/m_eeo_cn.error.log"
    {
        grok {
            match => {
            "message" => '%{YEAR:year}/%{NUMBER:month}/%{NUMBER:day} %{TIME:time} \[%{WORD:loglevel}\] %{GREEDYDATA:info}'
            }    
            #remove_field => ['type','_id','input_type','beat','version','offset','beats_input_codec_plain_applied']
        }

    }
}
output {

    #访问日志
    if [source] == "/var/log/nginx/www.test.com_user.access.log" {
        elasticsearch {
                   hosts => ["xxx.xxx.xxx.xxx:9200"]
                   index => "www.test.com_user.access_elk_%{+YYYY.MM.dd}"
        }
        #终端调试
        #stdout { codec => rubydebug }
    }

    if [source] == "/var/log/nginx/www.test.com_inner.access.log" {
            elasticsearch {
                           hosts => ["xxx.xxx.xxx.xxx:9200"]
                           index => "www.test.com_inner.access_elk_%{+YYYY.MM.dd}"
            }
    }

    if [source] == "/var/log/nginx/www.test.com_ecdn.access.log" {
            elasticsearch {
                           hosts => ["xxx.xxx.xxx.xxx:9200"]
                           index => "www.test.com_ecdn.access_elk_%{+YYYY.MM.dd}"
            }
    }

    if [source] == "/var/log/nginx/www.test.com_tcdn.access.log" {
            elasticsearch {
                           hosts => ["xxx.xxx.xxx.xxx:9200"]
                           index => "www.test.com_tcdn.access_elk_%{+YYYY.MM.dd}"
            }
    }

    if [source] == "/var/log/nginx/www.test.com_acdn.access.log" {
            elasticsearch {
                           hosts => ["xxx.xxx.xxx.xxx:9200"]
                           index => "www.test.com_acdn.access_elk_%{+YYYY.MM.dd}"
            }
    }

    ######################################################################
    #错误日志
    if [source] == "/var/log/nginx/www.test.com_user.error.log" {
        elasticsearch {
                   hosts => ["xxx.xxx.xxx.xxx:9200"]
                   index => "www.test.com_user.error_elk_%{+YYYY.MM.dd}"
        }
    }
    if [source] == "/var/log/nginx/www.test.com_inner.error.log" {
            elasticsearch {
                           hosts => ["xxx.xxx.xxx.xxx:9200"]
                           index => "www.test.com_inner.error_elk_%{+YYYY.MM.dd}"
            }
    }

    if [source] == "/var/log/nginx/www.test.com_ecdn.error.log" {
            elasticsearch {
                           hosts => ["xxx.xxx.xxx.xxx:9200"]
                           index => "www.test.com_ecdn.error_elk_%{+YYYY.MM.dd}"
            }
    }

    if [source] == "/var/log/nginx/www.test.com_tcdn.error.log" {
            elasticsearch {
                           hosts => ["xxx.xxx.xxx.xxx:9200"]
                           index => "www.test.com_tcdn.error_elk_%{+YYYY.MM.dd}"
            }
    }

    if [source] == "/var/log/nginx/www.test.com_acdn.error.log" {
            elasticsearch {
                           hosts => ["xxx.xxx.xxx.xxx:9200"]
                           index => "www.test.com_acdn.error_elk_%{+YYYY.MM.dd}"
            }
    }

}

2、php

#
input {
     kafka  {
     codec => "json"
     topics => "elk"
     bootstrap_servers => "xxx.xxx.xxx.xxx:9092"
     client_id => "elk"
    }
}

filter {
    #php 日志匹配规则
    if [source] == "/var/log/php-fpm/php-fpm.log" {
        grok {
            match => {
            "message" => "\[%{WORD:day}-%{WORD:month}-%{WORD:year} %{TIME:time}\] %{LOGLEVEL:loglevel}:%{GREEDYDATA:info}"
            }    
            add_field => {
            "timestamp" =>"%{day}-%{month}-%{year} %{time}"
            }
        }
    }
}
output {

    if [source] == "/var/log/php-fpm/php-fpm.log" {
        elasticsearch {
                   hosts => ["xxx.xxx.xxx.xxx:9200"]
                   index => "php-fpm_elk_%{+YYYY.MM.dd}"
        }
    }
}

3、mysql

4、squid

三、业务日志

results matching ""

    No results matching ""