#input {
# stdin {}
#}
# 从filebeat接受数据
input {
beats {
port => 5044
host => “0.0.0.0”
}
}
filter {
# 添加一个调试的开关
mutate{add_field => {“[@metadata][debug]”=>true}}
grok {
# 过滤nginx日志
#match => { “message” => “%{NGINXACCESS_TEST2}” }
#match => { “message” => %{IPORHOST:clientip} # (?<http_x_forwarded_for>[^\#]*) # \[%{HTTPDATE:[@metadata][webtime]}\] # %{NOTSPACE:hostname} # %{WORD:verb} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion} # %{NUMBER:response} # (?:%{NUMBER:bytes}|-) # (?:”(?:%{NOTSPACE:referrer}|-)”|%{NOTSPACE:referrer}|-) # (?:”(?<http_user_agent>[^#]*)”) # (?:”(?:%{NUMBER:connection}|-)”|%{NUMBER:connection}|-) # (?:”(?<cookies>[^#]*)”) # %{NUMBER:request_time:float} # (?:%{NUMBER:upstream_response_time:float}|-) }
#match => { “message” => (?:%{IPORHOST:clientip}|-) (?:%{TWO_IP:http_x_forwarded_for}|%{IPV4:http_x_forwarded_for}|-) \[%{HTTPDATE:[@metadata][webtime]}\] (?:%{HOSTNAME:hostname}|-) %{WORD:method} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion} %{NUMBER:response} (?:%{NUMBER:bytes}|-) (?:”(?:%{NOTSPACE:referrer}|-)”|%{NOTSPACE:referrer}|-) %{QS:agent} (?:”(?:%{NUMBER:connection}|-)”|%{NUMBER:connection}|-) (?:”(?<cookies>[^#]*)”) %{NUMBER:request_time:float} (?:%{NUMBER:upstream_response_time:float}|-) }
match => { “message” => (?:%{IPORHOST:clientip}|-) %{FORWORD:http_x_forwarded_for} \[%{HTTPDATE:[@metadata][webtime]}\] (?:%{HOSTNAME:hostname}|-) %{WORD:method} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion} %{NUMBER:response} (?:%{NUMBER:bytes}|-) (?:”(?:%{NOTSPACE:referrer}|-)”|%{NOTSPACE:referrer}|-) %{QS:agent} (?:”(?:%{NUMBER:connection}|-)”|%{NUMBER:connection}|-) %{QS:cookie} %{NUMBER:request_time:float} (?:%{NUMBER:upstream_response_time:float}|-) }
}
# 将默认的@timestamp(beats收集日志的时间)的值赋值给新字段@read_tiimestamp
ruby {
#code => “event.set(@read_timestamp,event.get(@timestamp))”
#将时区改为东8区
code => “event.set(@read_timestamp,event.get(@timestamp).time.localtime + 8*60*60)”
}
# 将nginx的日志记录时间格式化
# 格式化时间 20/May/2015:21:05:56 +0000
date {
locale => “en”
match => [“[@metadata][webtime]”,”dd/MMM/yyyy:HH:mm:ss Z”]
}
# 将bytes字段由字符串转换为数字
mutate {
convert => {“bytes” => “integer”}
}
# 将cookie字段解析成一个json
#mutate {
# gsub => [“cookies”,\;,,]
#}
# 如果有使用到cdn加速http_x_forwarded_for会有多个ip,第一个ip是用户真实ip
if[http_x_forwarded_for] =~ “, “{
ruby {
code => event.set(“http_x_forwarded_for”, event.get(“http_x_forwarded_for”).split(“,”)[0])
}
}
# 解析ip,获得ip的地理位置
geoip {
source => “http_x_forwarded_for”
# # 只获取ip的经纬度、国家、城市、时区
fields => [“location”,”country_name”,”city_name”,”region_name”]
}
# 将agent字段解析,获得浏览器、系统版本等具体信息
useragent {
source => “agent”
target => “useragent”
}
#指定要删除的数据
#mutate{remove_field=>[“message”]}
# 根据日志名设置索引名的前缀
ruby {
code => event.set(“@[metadata][index_pre]”,event.get(“source”).split(“/”)[-1])
}
# 将@timestamp 格式化为2019.04.23
ruby {
code => event.set(“@[metadata][index_day]”,event.get(“@timestamp”).time.localtime.strftime(“%Y.%m.%d”))
}
# 设置输出的默认索引名
mutate {
add_field => {
#”[@metadata][index]” => “%{@[metadata][index_pre]}_%{+YYYY.MM.dd}”
“[@metadata][index]” => “%{@[metadata][index_pre]}_%{@[metadata][index_day]}”
}
}
# 将cookies字段解析成json
# mutate {
# gsub => [
# “cookies”, “;”, “,”,
# “cookies”, “=”, “:”
# ]
# #split => {“cookies” => “,”}
# }
# json_encode {
# source => “cookies”
# target => “cookies_json”
# }
# mutate {
# gsub => [
# “cookies_json”, ,, “,”,
# “cookies_json”, :, “:”
# ]
# }
# json {
# source => “cookies_json”
# target => “cookies2”
# }
# 如果grok解析存在错误,将错误独立写入一个索引
if “_grokparsefailure” in [tags] {
#if “_dateparsefailure” in [tags] {
mutate {
replace => {
#”[@metadata][index]” => “%{@[metadata][index_pre]}_failure_%{+YYYY.MM.dd}”
“[@metadata][index]” => “%{@[metadata][index_pre]}_failure_%{@[metadata][index_day]}”
}
}
# 如果不存在错误就删除message
}else{
mutate{remove_field=>[“message”]}
}
}
output {
if [@metadata][debug]{
# 输出到rubydebuyg并输出metadata
stdout{codec => rubydebug{metadata => true}}
}else{
# 将输出内容转换成 “.”
stdout{codec => dots}
# 将输出到指定的es
elasticsearch {
hosts => [“192.168.15.160:9200”]
index => “%{[@metadata][index]}”
document_type => “doc”
}
}
}