A-A+

ELK简单入门 – Logstash配置文件

2020年10月11日 技术, 默认 暂无评论 阅读 524 次

本文基于的版本如下:

这次主要介绍一下Logstash的配置文件,这关系到数据存储至Elasticsearch时的结构。

本文的数据来源为Kafka,所以不会涉及到其它的数据源。

使用到的插件为:input、filter、output。

配置文件的主体格式如下:

input {    
#输入
}

filter {
#过滤、格式化数据
}

output { 
#输出
}

首先,介绍一下input,配置如下:

input {    
    kafka {
        bootstrap_servers => "192.168.149.101:9092"
        auto_offset_reset => "latest"
        topics => ["host"]
        consumer_threads => 5
        decorate_events => "true"
    }
}

filter {

}

output { 

}

参数详解如下:

1. bootstrap_servers:kafka的服务器地址,如果是集群,可以以逗号分隔,如:

192.168.149.101:9092,192.168.149.102:9092,192.168.149.103:9092 

2. topics:要订阅的主题列表。

3. auto_offset_reset:它的值含义如下:

earliest
当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费

latest
当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据

none
topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常

4. consumer_threads:消费线程数。

5. decorate_events:是否将当前topic、offset、group、partition等信息也带到message中。

其次,介绍一下filter。

在介绍filter配置之前,需要明确一下输入的消息格式,本文主要是分析日志,日志的输入格式如下:

<Property name="alarm_pattern">
    %d{yyyy-MM-dd HH:mm:ss.SSS} %d{yyyy-MM-dd HH:00:00.000127.0.0.1 EXAMPLES_APP %-5level ALARM SYSTEM %logger{36} - %msg%n
</Property>
<Property name="audit_pattern">
    %d{yyyy-MM-dd HH:mm:ss.SSS} %d{yyyy-MM-dd HH:00:00.000127.0.0.1 EXAMPLES_APP %-5level AUDIT %X{CREATE_USER_ID} %logger{36} - %msg%n
</Property>

其实就是:

触发时间 按小时分段 来源IP 来源应用 日志级别 日志种类 触发人 所属类 - 日志内容 

如果没有filter,这部分日志都会作为内容(message)传输给Elasticsearch存储。而我们一般所需要的,应该是按照一定的格式转换为JSON传输给Elasticsearch。

所以解析上面的格式,就需要如下配置:

input {    

}

filter {
    grok{
            match => { "message" => "%{TIMESTAMP_ISO8601:targettime1} %{TIMESTAMP_ISO8601:targettime2} %{IPV4:ip} %{WORD:appid} %{LOGLEVEL:level} %{WORD:logtype} %{WORD:createrid} %{JAVALOGMESSAGE:log}" }
            add_tag => ["%{logtype}"]
    }
    date {
        match => [ "targettime1" , "yyyy-MM-dd HH:mm:ss,S""ISO8601" ]
        target => "targettime1"
    }
    date {
        match => [ "targettime2" , "yyyy-MM-dd HH:mm:ss,S""ISO8601" ]
        target => "targettime2"
    }
    mutate{
        remove_field => ["message"]
    }
}

output { 

}

详细的解释如下:

1. grok: 通过正则表达式,将数据解析成结构化和可查询化的数据。  

    1) match中的message,就是从kafka传入的消息主体,即日志内容。其中的正则表达式是插件默认提供的,可以见:

https://github.com/logstash-plugins/logstash-patterns-core

   2) %{TIMESTAMP_ISO8601:targettime1}的意思就是,将“触发时间”解析出来,并赋给了参数“targettime1”。

   3) add_tag的意思是,增加一个标签,这个标签的值等于message解析后的logtype。     

2. date:格式化指定日志字段,然后转存到指定的字段中。

3. mutate:对字段做处理 重命名、删除、替换和修改。在本文中,是将原始的message给删除掉了,因为解析完毕,不再需要原始的日志内容了。

最后,介绍一下output。

主要的配置如下:

input {    

}

filter {

}

output { 
    if "AUDIT" in [tags] {
        elasticsearch {
            hosts => ["127.0.0.1:9200"]
            index => "logstash.audit.%{+YYYY}"
        }
    }
    if "ALARM" in [tags] {
        elasticsearch {
            hosts => ["127.0.0.1:9200"]
            index => "logstash.alarm.%{+YYYY}"
        }
    }
}

详细的解释如下:

如果标签(这时候已经包含logtype的值)包含指定的字符串,则输出的该标签对应的elasticsearch中。

会在elasticsearch中,创建两个索引,如下:

以上,如有错误,欢迎指正。

觉的不错?可以关注我的公众号↑↑↑
标签:

给我留言

Copyright © 字痕随行 保留所有权利.   Theme  Ality 京ICP备14039894号

用户登录

分享到: