ELK简单入门 – Logstash配置文件
本文基于的版本如下:

这次主要介绍一下Logstash的配置文件,这关系到数据存储至Elasticsearch时的结构。
本文的数据来源为Kafka,所以不会涉及到其它的数据源。
使用到的插件为:input、filter、output。
配置文件的主体格式如下:
input {
#输入
}
filter {
#过滤、格式化数据
}
output {
#输出
}
首先,介绍一下input,配置如下:
input {
kafka {
bootstrap_servers => "192.168.149.101:9092"
auto_offset_reset => "latest"
topics => ["host"]
consumer_threads => 5
decorate_events => "true"
}
}
filter {
}
output {
}
参数详解如下:
1. bootstrap_servers:kafka的服务器地址,如果是集群,可以以逗号分隔,如:
192.168.149.101:9092,192.168.149.102:9092,192.168.149.103:9092
2. topics:要订阅的主题列表。
3. auto_offset_reset:它的值含义如下:
earliest
当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
latest
当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
none
topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
4. consumer_threads:消费线程数。
5. decorate_events:是否将当前topic、offset、group、partition等信息也带到message中。
其次,介绍一下filter。
在介绍filter配置之前,需要明确一下输入的消息格式,本文主要是分析日志,日志的输入格式如下:
<Property name="alarm_pattern">
%d{yyyy-MM-dd HH:mm:ss.SSS} %d{yyyy-MM-dd HH:00:00.000} 127.0.0.1 EXAMPLES_APP %-5level ALARM SYSTEM %logger{36} - %msg%n
</Property>
<Property name="audit_pattern">
%d{yyyy-MM-dd HH:mm:ss.SSS} %d{yyyy-MM-dd HH:00:00.000} 127.0.0.1 EXAMPLES_APP %-5level AUDIT %X{CREATE_USER_ID} %logger{36} - %msg%n
</Property>
其实就是:
触发时间 按小时分段 来源IP 来源应用 日志级别 日志种类 触发人 所属类 - 日志内容
如果没有filter,这部分日志都会作为内容(message)传输给Elasticsearch存储。而我们一般所需要的,应该是按照一定的格式转换为JSON传输给Elasticsearch。
所以解析上面的格式,就需要如下配置:
input {
}
filter {
grok{
match => { "message" => "%{TIMESTAMP_ISO8601:targettime1} %{TIMESTAMP_ISO8601:targettime2} %{IPV4:ip} %{WORD:appid} %{LOGLEVEL:level} %{WORD:logtype} %{WORD:createrid} %{JAVALOGMESSAGE:log}" }
add_tag => ["%{logtype}"]
}
date {
match => [ "targettime1" , "yyyy-MM-dd HH:mm:ss,S", "ISO8601" ]
target => "targettime1"
}
date {
match => [ "targettime2" , "yyyy-MM-dd HH:mm:ss,S", "ISO8601" ]
target => "targettime2"
}
mutate{
remove_field => ["message"]
}
}
output {
}
详细的解释如下:
1. grok: 通过正则表达式,将数据解析成结构化和可查询化的数据。
1) match中的message,就是从kafka传入的消息主体,即日志内容。其中的正则表达式是插件默认提供的,可以见:
https://github.com/logstash-plugins/logstash-patterns-core
2) %{TIMESTAMP_ISO8601:targettime1}的意思就是,将“触发时间”解析出来,并赋给了参数“targettime1”。
3) add_tag的意思是,增加一个标签,这个标签的值等于message解析后的logtype。
2. date:格式化指定日志字段,然后转存到指定的字段中。
3. mutate:对字段做处理 重命名、删除、替换和修改。在本文中,是将原始的message给删除掉了,因为解析完毕,不再需要原始的日志内容了。
最后,介绍一下output。
主要的配置如下:
input {
}
filter {
}
output {
if "AUDIT" in [tags] {
elasticsearch {
hosts => ["127.0.0.1:9200"]
index => "logstash.audit.%{+YYYY}"
}
}
if "ALARM" in [tags] {
elasticsearch {
hosts => ["127.0.0.1:9200"]
index => "logstash.alarm.%{+YYYY}"
}
}
}
详细的解释如下:
如果标签(这时候已经包含logtype的值)包含指定的字符串,则输出的该标签对应的elasticsearch中。
会在elasticsearch中,创建两个索引,如下:

以上,如有错误,欢迎指正。
