公司不同服务的不同 log 散落在不同的机器上,管理查看非常困难,于是考虑使用 ELK 来收集管理 log,ElasicsearchKibana 稍微学了下基本用没有什么问题,但是 Logstash 却苦苦 get 不到它的点在哪,于是,在某一个深夜下决心好好去研究下它的使用方法…

本文就详细介绍下 logstash 的用法,文中的例子是使用 logstash 收集 nginx log,然后输出到 elasticsearch 中。

Logstash 基本概念

先看一张图来说明 Logstash 的数据流向(图中没有介绍 codec 的过程,一般来说,不用 codec 你完全可以使用基本功能来完成大部分工作):

logstash 数据流向

途中虚线框中的部分就是 logstash 做的事情,如果不去了解内部机制 logstash 就像是一个提供了 输入输出接口的黑盒,把我们的各种 log 收集起来(连接到 input 接口),logstash 做一些处理然后发送给连接到 output 接口的输入(可以是 file、stdout、elasticsearch),这个过程被官方定义为 pipline。下面咱们看一下黑盒的内部结构,目前咱们只介绍三部分:

  • input
  • filter
  • output

input

所谓输入,就是我们要收集的各种 log,这部分可以是 syslog、file、tcp 甚至是 stdin。

我们想收集的是 nginx 的 log,一般 nginx log 我们都重定向到一个 log 文件中,所以,这里 input 我们选择 file。

完整的 input list 请参考:Logstash Input Plugin

在配置文件中可以这么配置 input:

input {
    file {
        path => ["/var/log/nginx/api_access_log"]
        type => "dataapi"
        start_position => "beginning"
    }

    file {
        path => ["/var/log/nginx/internal_api_access_log"]
        type => "internalapi"
        start_position => "beginning"
    }
}

可以看到是支持多个文件的,可以根据不同的 type 来区分不同的文件。

filter

因为我们要处理不同服务的不同 log 然后统一输出到一个地方,不同服务的 log 的存储格式是不一样的,所以不同的 log 需要统一处理,格式化为一种标准的格式(比如 json)然后再统一输出。这个转化就需要我们定义 filter 来完成。

常用 filter 有 grok、kv、date 等,完整的 filter 列表请参考:Logstash Filter Plugin

下面介绍几个常见的 filter:

* grok(这也是我当时始终不能理解的部分)

可以理解为 grep,实际就是用正则的方式去匹配我们的不同 log,然后用分组把不同的信息提取出来,看我的例子:

我们的 nginx log format 为:

log_format api '$http_remoteip - - [$time_local] "$request" $status $body_bytes_sent "$http_referer" "$http_user_agent"';

需要说明的一点是,因为我们使用了阿里云的负载均衡,如果用 remote_addr 获取客户端 IP 的到的是负载均衡的 IP,所以我们使用了 remoteip 这个 header 内容,具体获取这个变量的方法就是前面加 http_,所以是 http_remoteip

具体的格式分析:

所以到了 access_log 中的日志是这样的:

218.108.146.138 - - [08/Apr/2017:16:20:19 +0800] "GET /search?_id=20xxx2&_token=test_token&limit=10&q=%E8%88%9E%E8%B1%A1%E4%BA%91&start=0&type=all HTTP/1.0" 200 1094 "http://data.xxxx.com/" "Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/57.0.2987.110 Safari/537.36"

OK,我们只要根据上面的内容写正则表达式就可以了,相信这难不倒大部分程序员,下面我贴出我的:

%{IPORHOST:clientip} - - \[%{HTTPDATE:request_time}\] \"(?:%{WORD:method} %{URIPATH:url}(?:%{URIPARAM:params})?(?: HTTP/%{NUMBER:httpversion})?|%{DATA:rawrequest})\" %{NUMBER:response} (?:%{NUMBER:bytes:int}|-) \"%{DATA:referrer}\" \"%{DATA:agent}\"

呃…… 说好的正则呢?为什么是这些啊,好的这也是我当时不解的地方,我解释下这个地方。

观察上面的 log,都是一些 IP、DATE、UA、URL 等等的字符串(有些是数字),抛开 nignx,相信 apache 的访问日志基本上也都会是这些东西,那么问题来了,每次我们接入一个 nginx 服务要写一大堆正则,下次我接入了 apache,同样的东西我可能回重复写一大堆,heh,bad small,要复用对不对,这些 logstash 早就想到了,所以,官方把常用的正则放到了变量里方便我们使用,具体可参考:Logstash Grok Patterns

这时候再看上面提到的匹配字符串,最开始是客户端 IP,logstash 给我们一个变量叫 IPORHOST,在 pattern 列表中查看定义为:

IPORHOST (?:%{HOSTNAME}|%{IP})

实际是两个变量的拼接,用到了正则中的 |,在文档中找提到的两个变量为:

IPV6 ((([0-9A-Fa-f]{1,4}:){7}([0-9A-Fa-f]{1,4}|:))|(([0-9A-Fa-f]{1,4}:){6}(:[0-9A-Fa-f]{1,4}|((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){5}(((:[0-9A-Fa-f]{1,4}){1,2})|:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){4}(((:[0-9A-Fa-f]{1,4}){1,3})|((:[0-9A-Fa-f]{1,4})?:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){3}(((:[0-9A-Fa-f]{1,4}){1,4})|((:[0-9A-Fa-f]{1,4}){0,2}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){2}(((:[0-9A-Fa-f]{1,4}){1,5})|((:[0-9A-Fa-f]{1,4}){0,3}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){1}(((:[0-9A-Fa-f]{1,4}){1,6})|((:[0-9A-Fa-f]{1,4}){0,4}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(:(((:[0-9A-Fa-f]{1,4}){1,7})|((:[0-9A-Fa-f]{1,4}){0,5}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:)))(%.+)?
IPV4 (?<![0-9])(?:(?:25[0-5]|2[0-4][0-9]|[0-1]?[0-9]{1,2})[.](?:25[0-5]|2[0-4][0-9]|[0-1]?[0-9]{1,2})[.](?:25[0-5]|2[0-4][0-9]|[0-1]?[0-9]{1,2})[.](?:25[0-5]|2[0-4][0-9]|[0-1]?[0-9]{1,2}))(?![0-9])

IP (?:%{IPV6}|%{IPV4})
HOSTNAME \b(?:[0-9A-Za-z][0-9A-Za-z-]{0,62})(?:\.(?:[0-9A-Za-z][0-9A-Za-z-]{0,62}))*(\.?|\b)

实际就是三个正则:匹配 IPV6、IPV4 和 HOSTNAME,我们只需要引用变量就好,不用自己手写正则,很方便有没有。

grok 表达式(可以叫做 pattern)的语法是这样的:

%{PATTERN_NAME:capture_name:data_type}
  • PATTERN_NAME:上面提到的的正则的变量
  • capture_name:要把内容提取到哪个 field 中,可以理解为正则的分组名
  • data_type:提取出的内容的数据类型,默认为字符串,常用的还有 intfloat

当然了,上面的变量你完全可以选择不实用而使用自己手写的正则比如把某个像 123.456 这样的值提取到 request_time 则个 field 中):

(?<request_time>\d+(?:\.\d+)?)

那么 grok 怎么配置呢?看下面:

filter {
    grok {
        match => {
            "message" => "%{IPORHOST:clientip} - - \[%{HTTPDATE:request_time}\] \"(?:%{WORD:method} %{URIPATH:url}(?:%{URIPARAM:params})?(?: HTTP/%{NUMBER:httpversion})?|%{DATA:rawrequest})\" %{NUMBER:response} (?:%{NUMBER:bytes:int}|-) \"%{DATA:referrer}\" \"%{DATA:agent}\""
        }
    }
}

上面就是我的配置文件。那么怎么验证我们的 pattern 对不对呢?有个 logstash pattern debug 工具可以帮助我们。

提取的内容打印出来(在 output 介绍中会介绍怎么打印到屏幕上)是类似这样的:

{
          "agent" => "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_3) AppleWebKit/602.4.8 (KHTML, like Gecko) Version/10.0.3 Safari/602.4.8",
         "method" => "GET",
         "_token" => "test_token",
        "message" => "100.97.90.0 - - [02/Apr/2017:22:47:32 +0800] \"GET /project/weibo?_id=25xxx67&_token=test_token&id=K_ITJUZI_PROJECT:29767 HTTP/1.0\" 200 503 \"http://data.fellowplus.com/\" \"Mo
zilla/5.0 (Macintosh; Intel Mac OS X 10_12_3) AppleWebKit/602.4.8 (KHTML, like Gecko) Version/10.0.3 Safari/602.4.8\"",
         "params" => "?_id=25xx67&_token=test_token&id=xxxxx",
            "url" => "/project/weibo",
           "tags" => [],
           "path" => "/var/log/nginx/api_access_log",
       "referrer" => "http://data.xxxx.com/",
     "@timestamp" => 2017-04-06T03:37:18.215Z,
       "response" => "200",
          "bytes" => "503",
       "clientip" => "100.97.90.0",
       "@version" => "1",
           "host" => "iZ232o4s4u6Z",
    "httpversion" => "1.0",
            "_id" => "25xxx67",
             "id" => "xxxxx",
      "request_time" => "02/Apr/2017:22:47:32 +0800"
}

* kv

先看一个问题:

可以看到把我们的 URL 和 参数分别提取到了不同的变量中:

{
    "params" => "?_id=25xx67&_token=test_token&id=xxxxx",
    "url" => "/project/weibo",
}

但是我想做一个用户 ID 的统计,根据 _id 去统计访问量,这时候我们就需要把 _id 提取到一个 field 中,一种方式是修改上面的正则,从 request 中提取,这样的问题是假如后面你不能穷举请求中到底有多少参数,所以 logstash 给了我们另外一个 filter:kv。

kv 看名字就知道是 key-value 的意思,跟我们的参数一样啊,先看代码:

filter {
   kv {
        source => "params"
        field_split => "&"
        trimkey => "\?"
        prefix => "arg_"
    }
}
  • source:要处理哪个 field 中的内容
  • field_split:字符串要用什么字符拆分,url param 当然要用 & 拆分啊
  • trimkey:看上面 params 字符串中是包含 ? 的,这时候会把 _id 的内容提取到 ?_id 中,随意我们需要 trim,后面的字符就是我们要清理的字符,trimkey 是每个 key 都做这样的处理
  • prefix:elastic 中 _id 是一个关键字,所以我们不能使用 \_id 这个 field,那么加个前缀就好了呗,所以 prefix = 'arg\_' 后,我们的 _id 就变成了 arg__id,每个 key 都会做这样的处理

* date

再看另外一个问题:

如果把你的提取内容打印出来你会发现有两个变量:

{
    ...
    "@timestamp" => 2017-04-06T03:37:18.215Z,
    "request_time" => "02/Apr/2017:22:47:32 +0800",
    ...
}

@timestamp 这个 field 是日期类型,但是是提取时间而不是请求时间,request_time 是请求时间但是是字符串类型,不是 date 类型,elasticsearch 是不能当时间序列来处理的,所以,不对啊老师……

别急,下面介绍另外一个 filter:date,先看配置配置:

filter {
    date {
            match => [ "request_time" , "dd/MMM/yyyy:HH:mm:ss +0800" ]
            timezone => "Asia/Shanghai"
    }
}

这时,你就会发现 @timestamp 变成了 request_time 的值,而且是 date 类型,而且格式是按我们指定的格式显示的。

* geoip

看配置文件:

filter {
    geoip {
        source => "clientip"
    }
}
  • source:仍然是我们要处理的 field 名字

这个其实就是根据 IP 获取地点的 filter,处理后的结果:

{
       "message" => "183.60.92.253",
      "@version" => "1",
    "@timestamp" => "2014-08-07T10:32:55.610Z",
          "host" => "raochenlindeMacBook-Air.local",
         "geoip" => {
                      "ip" => "183.60.92.253",
           "country_code2" => "CN",
           "country_code3" => "CHN",
            "country_name" => "China",
          "continent_code" => "AS",
             "region_name" => "30",
               "city_name" => "Guangzhou",
                "latitude" => 23.11670000000001,
               "longitude" => 113.25,
                "timezone" => "Asia/Chongqing",
        "real_region_name" => "Guangdong",
                "location" => [
            [0] 113.25,
            [1] 23.11670000000001
        ]
    }
}

output

终于到输出了,先看配置:

output {
    stdout {codec => rubydebug}
    elasticsearch {
        hosts => ["xxx.xxx.xxx.xxx:9200"]
        index => "logstash-dataapi"
        document_type => "%{type}"
        flush_size => 20000
        idle_flush_time => 10
    }
}

和 input 一样,output 也可以配置多个,上面的配置是打印到屏幕同时输入到 elasticsearch 中。

elasticsearch 中有几个参数需要说明一下:

  • hosts:就是 elasticsearch 的 host 和 port
  • index:要存储的 index
  • document_type:还记得 input 中配置的 type 吗?我们配置了两个 log 文件,我们需要让他们都输入到一个 index 中,但是 要靠 _type 来区分,所以,这个地方配置我们的 type 就好了,写成:%{type}
  • flush_size: 代表不是每次都去请求 elasticsearch,而是先攒着,攒到配置中的条数然后发送给 elasticsearch
  • idle_flush_time:这个是配合上面使用的,如果在这个配置的时间内,没有攒够需要的条数,仍然会向 elasticsearch 发送数据

好了,介绍完了,下面贴一下完整的配置文件:

input {
    file {
        path => ["/var/log/nginx/api_access_log"]
        type => "dataapi"
        start_position => "beginning"
    }

    file {
        path => ["/var/log/nginx/internal_api_access_log"]
        type => "internalapi"
        start_position => "beginning"
    }
}

filter {
    grok {
        match => {
            "message" => "%{IPORHOST:clientip} - - \[%{HTTPDATE:request_time}\] \"(?:%{WORD:method} %{URIPATH:url}(?:%{URIPARAM:params})?(?: HTTP/%{NUMBER:httpversion})?|%{DATA:rawrequest})\" %{NUMBER:response} (?:%{NUMBER:bytes:int}|-) \"%{DATA:referrer}\" \"%{DATA:agent}\""
        }
    }

    date {
        match => [ "request_time" , "dd/MMM/yyyy:HH:mm:ss +0800" ]
        timezone => "Asia/Shanghai"
    }

    kv {
        source => "params"
        field_split => "&"
        trimkey => "\?"
        prefix => "arg_"
    }

    geoip {
        source => "clientip"
    }
}

output {
    # stdout {codec => rubydebug}
    elasticsearch {
        hosts => ["xxx.xxx.xxx.xxx:9200"]
        index => "logstash-dataapi"
        document_type => "%{type}"
        flush_size => 20000
        idle_flush_time => 10
    }
}

我们的整体分布是这样的:

一般一台机器启动一个 logstash 服务,input 配置不同的 log。

logstash 分布图

Enjoy it。

参考