log-analysis

elk分析

服务端    Kibana
服务端或者独立 JDK  ElasticSearch  redis2.6数据队列 Logstash匹配分析数据
客户端 JDK   Logstash(导入数据用安装一样分离开减轻客户端压力)
下载地址 https://www.elastic.co/downloads
https://mirrors.tuna.tsinghua.edu.cn/ELK/  国内源

ElasticSearch和Logstash依赖于JDK所以需要安装JDK
yum -y install java-1.8.0-openjdk java-1.8.0-openjdk-devel
java -version

redis 源码编译安装2.6以上版本 

Logstash安装
http://www.open-open.com/lib/view/open1473661753307.html
Logstash默认的对外服务的端口是9292
rpm -ivh logstash-2.0.0-1.noarch.rpm
测试/opt/logstash/bin/logstash -e 'input{stdin{}}output{stdout{codec=>rubydebug}}'
输入hello world


vim /etc/logstash/conf.d/agent.conf

input {
   file {
     type => "ugo_nginx_access"   ##日志文件类型,自定义好区分类似于分组这种概念
     path => "/export1/log/access_20150407+00.log"  ##日志文件路径
   }
   file {
     type => "nginx_access"
     path => "/usr/local/nginx/logs/python-access.log"
   }
}
output {
  #将收集到的日志存储到了redis中
  redis {
    host => "103.41.54.16"   
    port => 6379
    data_type => "list"
    key => "logstash"
  }
}

启动 /opt/logstash/bin/logstash agent -f /usr/local/logstash/conf/agent.conf   有时候服务启动没有数据

server
grok匹配测试  http://grokdebug.herokuapp.com/

/opt/logstash/bin/logstash agent -f /usr/local/logstash/conf/fserver.conf
input {
    redis {
        host => "127.0.0.1"
        port => "6379"
        data_type => "list"
        key => "logstash"
        type => "redis-input"
    }
}

filter {
    grok {
        match => { "message" => "%{COMBINEDAPACHELOG} %{QS:x_forwarded_for}" }   nginx日志匹配
    }
    geoip {
      source => "clientip"
      target => "geoip"
      database => "/opt/logstash/GeoLiteCity.dat"
      add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
      add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}" ]
    }

    mutate {
      convert => [ "[geoip][coordinates]", "float" ]
      convert => [ "response","integer" ]
      convert => [ "bytes","integer" ]
      replace => { "type" => "nginx_access" }
      remove_field => "message"
    }

    date {
      match => [ "timestamp","dd/MMM/yyyy:HH:mm:ss Z"]

    }
    mutate {
      remove_field => "timestamp"

    }


}
output {
    elasticsearch {
        hosts => ["127.0.0.1:9200"]
        manage_template => true   可以使用模板匹配地区,geoip,
        # http://blog.csdn.net/yanggd1987/article/details/50469113
        index => "logstash-nginx-access-%{+YYYY.MM.dd}"
    }
    stdout {codec => rubydebug}
}


 ##这个命令的意义就是,删除 logstash 2015年4月的所有文件。
curl -XDELETE 'http://10.1.1.99:9200/logstash-2015.04.*

ElasticSearch 安装
ElasticSearch默认的对外服务的HTTP端口是9200,节点间交互的TCP端口是9300。
rpm -ivh elasticsearch-2.0.0.rpm
vim /etc/elasticsearch/elasticsearch.yml 添加
node.name: node-1
network.host: 0.0.0.0
path.data: /data/elasticsearch/
http.port: 9200

mkdir -pv /data/elasticsearch
chown -R elasticsearch.elasticsearch /data/elasticsearch/
/etc/init.d/elasticsearch start

测试ElasticSearch服务是否正常,预期返回200的状态码:
curl -X GET http://localhost:9200
head插件
wget https://github.com/mobz/elasticsearch-head/archive/master.zip
unzip master.zip
mv elasticsearch-head-master/ /usr/share/elasticsearch/plugins/head/
http://112.126.80.182:9200/_plugin/head/

初始化数据集,为了保证上报事件的时间戳被正常识别为日期格式
curl -XPUT http://localhost:9200/logstash-qos -d '       索引地址固定的,需要先map,在导入数据
{
 "mappings" : {
  "_default_" : {
   "properties" : {
    "timestamp":{"type":"date"}
   }
  }
 }
}';


Kibana安装
tar -zxf kibana-4.2.0-linux-x64.tar.gz
vim ./kibana/config/kibana.yml 修改
elasticsearch.url: http://192.168.1.23:9200
运行./kibana/bin/kibana

kibana 分析报错http://elasticsearch.cn/question/232





扩展
http://www.99ya.net/archives/523.html亿万级日志数据实时分析平台

logstatsh http

https://www.elastic.co/blog/introducing-logstash-input-http-plugin

How do I use this plugin?

By default it will bind the webserver to all hosts ("0.0.0.0") and open the TCP port 8080 but it's
 possible configure these settings:

input {
  http {
    host => "127.0.0.1" # default: 0.0.0.0
    port => 31311 # default: 8080
  }
}
That's all you need!

What about security?
You can configure basic authentication by setting a username and password. All requests done to Logstash
will then have to set the right credentials or receive a 401 response. Only correctly authenticated requests
 will produce an event inside of Logstash. For SSL, it is necessary to specify the path to a Java Keystore 
 that contains the certificate that clients use to validate the server. Here's an example:

input {
   port => 3332
   user => myuser
   password => "$tr0ngP4ssWD!"
   ssl => on
   keystore => "/tmp/mykeystore.jks"
   keystore_password => "keystore_pass"
}


OK, now show me this plugin in action!

Step 1 - starting Logstash with http input:

bin/logstash -e "input { http { } } output { stdout { codec => rubydebug} }"
Step 2 - That's it!

To test it, let's issue two requests:

% curl -XPUT 'http://127.0.0.1:8080/twitter/tweet/1' -d 'hello'                               
% curl -H "content-type: application/json" -XPUT 'http://127.0.0.1:8080/twitter/tweet/1' -d '{
    "user" : "kimchy",
    "post_date" : "2009-11-15T14:12:12",
    "message" : "trying out Elasticsearch"
}'
Result in Logstash:

{
       "message" => "hello",
      "@version" => "1",
    "@timestamp" => "2015-05-29T14:49:00.392Z",
       "headers" => {
           "content_type" => "application/x-www-form-urlencoded",
         "request_method" => "PUT",
           "request_path" => "/twitter/tweet/1",
            "request_uri" => "/twitter/tweet/1",
           "http_version" => "HTTP/1.1",
        "http_user_agent" => "curl/7.37.1",
              "http_host" => "127.0.0.1:8080",
            "http_accept" => "*/*",
         "content_length" => "5"
    }
}
{
          "user" => "kimchy",
     "post_date" => "2009-11-15T14:12:12",
       "message" => "trying out Elasticsearch",
      "@version" => "1",
    "@timestamp" => "2015-05-29T14:49:04.105Z",
       "headers" => {
           "content_type" => "application/json",
         "request_method" => "PUT",
           "request_path" => "/twitter/tweet/1",
            "request_uri" => "/twitter/tweet/1",
           "http_version" => "HTTP/1.1",
        "http_user_agent" => "curl/7.37.1",
              "http_host" => "127.0.0.1:8080",
            "http_accept" => "*/*",
         "content_length" => "110"
    }
}
You can see that in the second request, since the content-type was application/json, the body was deserialized 
and expanded to the event root (notice the fields "user", "post_date" and "message").

Show me more concrete examples of how to use it!

Because, real world examples make everything clearer!

Elastic Watcher Integration
In this section, well show you how to integrate Elastic Watcher -- the new Elasticsearch plugin for alerting
 and notification -- with Logstash. Sending notifications to Logstash via this input provides you a powerful
  toolset to further transform notifications and use Logstashs rich collection of outputs.

Imagine that you have indices with Apache logs, and now we want to get a periodic update of how many requests
 are resulting in a 404 (Not Found) response.

The required steps for this are:

Installing Watcher
Creating a new notification on Watcher that every minute reports the number of events that have a 404 response 
status
Start Logstash with the HTTP input
Send data to Elasticsearch and watch updates on Logstash
Here we go!

1. Installing Watcher
cd elasticsearch-1.5.2
bin/plugin -i elasticsearch/watcher/latest
bin/plugin -i elasticsearch/license/latest
bin/elasticsearch # restart the server
2. Creating a watch
The Watcher plugin for elasticsearch provides an API to create and manipulate scheduled tasks, or "watches". 
A Watch will query the data in the elasticsearch cluster according to its schedule, look for certain scenarios 
(like the presence of an error event) and execute actions. Examples of actions are sending an email, writing a
 document to an index, calling an outside HTTP endpoint, and more..

For this test, I created a simple watch that:

every minute
counts number of HTTP requests that resulted in a 404
posts result to http://localhost:8080
This is the resulting JSON document I need to send to Watcher:

{
  "trigger" : {
    "schedule" : { "cron" : "0 0/1 * * * ?" }
  },
  "input" : {
    "search" : {
      "request" : {
        "indices" : [
          "logstash*"
          ],
        "body" : {
          "query" : {
            "term": { "response": 404 }
          }
        }
      }
    }
  },
  "actions" : {
    "my_webhook" : {
      "webhook" : {
        "auth" : {
          "basic" : {
            "username" : "guest",
            "password" : "guest"
          }
        },
        "method" : "POST",
        "host" : "127.0.0.1",
        "port" : 8080,
        "path": "/{{ctx.watch_id}}",
        "body" : "{{ctx.payload.hits.total}}"
      }
    }
  }
}
To install this watch you need to create it in Elasticsearch by executing a PUT request:

curl -XPUT 'http://localhost:9200/_watcher/watch/my-watch' -d @create_webhook.json
3. Logstash setup
wget http://download.elastic.co/logstash/logstash/logstash-1.5.2.tar.gz
tar -zxf logstash-1.5.2.tar.gz
cd logstash-1.5.2
bin/logstash -e "input { http { } } output { stdout { codec => rubydebug} }"
4. Results
After launching an ingestion process in another terminal, Logstash starts receiving 1 notification per 
minute in the form of a HTTP POST:

% bin/logstash -e "input { http { } } output { stdout { codec => rubydebug} }"    
Logstash startup completed
{
       "message" => "330",
      "@version" => "1",
    "@timestamp" => "2015-06-02T12:53:00.037Z",
       "headers" => {
               "content_type" => "application/x-www-form-urlencoded",
             "request_method" => "POST",
               "request_path" => "/my-watch",
                "request_uri" => "/my-watch?",
               "http_version" => "HTTP/1.1",
         "http_authorization" => "Basic Z3Vlc3Q6Z3Vlc3Q=",
        "http_accept_charset" => "UTF-8",
         "http_cache_control" => "no-cache",
                "http_pragma" => "no-cache",
            "http_user_agent" => "Java/1.8.0_20",
                  "http_host" => "127.0.0.1:8080",
                "http_accept" => "text/html, image/gif, image/jpeg, *; q=.2, */*; q=.2",
            "http_connection" => "keep-alive",
             "content_length" => "12"
    }
}
{
       "message" => "3103",
      "@version" => "1",
    "@timestamp" => "2015-06-02T12:54:00.030Z",
       "headers" => {
               "content_type" => "application/x-www-form-urlencoded",
             "request_method" => "POST",
               "request_path" => "/my-watch",
                "request_uri" => "/my-watch?",
               "http_version" => "HTTP/1.1",
         "http_authorization" => "Basic Z3Vlc3Q6Z3Vlc3Q=",
        "http_accept_charset" => "UTF-8",
         "http_cache_control" => "no-cache",
                "http_pragma" => "no-cache",
            "http_user_agent" => "Java/1.8.0_20",
                  "http_host" => "127.0.0.1:8080",
                "http_accept" => "text/html, image/gif, image/jpeg, *; q=.2, */*; q=.2",
            "http_connection" => "keep-alive",
             "content_length" => "13"
    }
}
{
       "message" => "6071",
      "@version" => "1",
    "@timestamp" => "2015-06-02T12:55:00.031Z",
       "headers" => {
               "content_type" => "application/x-www-form-urlencoded",
             "request_method" => "POST",
               "request_path" => "/my-watch",
                "request_uri" => "/my-watch?",
               "http_version" => "HTTP/1.1",
         "http_authorization" => "Basic Z3Vlc3Q6Z3Vlc3Q=",
        "http_accept_charset" => "UTF-8",
         "http_cache_control" => "no-cache",
                "http_pragma" => "no-cache",
            "http_user_agent" => "Java/1.8.0_20",
                  "http_host" => "127.0.0.1:8080",
                "http_accept" => "text/html, image/gif, image/jpeg, *; q=.2, */*; q=.2",
            "http_connection" => "keep-alive",
             "content_length" => "13"
    }
}

A more complex example
Now that we know how to trigger notification events from Watcher, we can leverage the plugin ecosystem in
 Logstash to escalate notifications depending in a certain criteria. This following config will:

continuously update the number of 404 requests in statsd
if the count reaches 10000 then send a message to HipChat, or
if reaches 40000, notify PagerDuty.
input {
  http { }
}
filter {
  if [headers][request_path] == "/my-watch" {
    mutate { convert => ["message", "integer" ] }
  }
}
output {
  if [headers][request_path] == "/my-watch" {
    if [message] > 40000 { # way too many, notify pagerduty
      pagerduty {
        description => "%{host} - Apache: Very high number of 404"
        details => {
          "timestamp" => "%{@timestamp}"
          "message" => "%{message}"
        }
        service_key => "apikeyforlogstashservice"
        incident_key => "logstash/apacheservice"
      }
    } else if [message] > 10000 {    # unusual amount, notify devs in hipchat
      hipchat {
         from => "logstash"
         room_id => "dev"
         token => "[api key]"
         format => "Very high number of 404 requests: %{message}"
      }
    }
    # always update count of 404 in statsd
    statsd { gauge => [ "http.status.404", "%{message}" ] }
  }
}

geoip

wget http://geolite.maxmind.com/download/geoip/database/GeoLiteCountry/GeoIP.dat.gz
wget http://geolite.maxmind.com/download/geoip/database/GeoLiteCity.dat.gz

elasticsearch

所有索引 curl -XGET 'http://localhost:9200/_cat/indices/*?v'
删除索引 curl -XDELETE 'http://127.0.0.1:9200/winlogbeat-2016*'
        curl -XDELETE 'http://127.0.0.1:9200/winlogbeat-2017.07.20'

查看线程资源  curl http://127.0.0.1:9200/_nodes/hot_threads
集群状态      curl -XGET 'http://localhost:9200/_cluster/stats?human&pretty'
进程状态      curl -XGET http://127.0.0.1:9200/_nodes/stats/thread_pool?pretty
集群参数      curl -XGET http://127.0.0.1:9200/_cluster/settings
索引参数      curl -XGET http://127.0.0.1:9200/logstash-nginx-access-2017.10.26/_settings

集群配置
egrep -v "^#|^$" /etc/elasticsearch/elasticsearch.yml 
cluster.name: es1 #集群名字 要一致
node.name: node-1  # 节点名字,不能重复
network.host: 0.0.0.0
path.data: /data/elasticsearch/
http.port: 9200
discovery.zen.ping.multicast.enabled: false  ##关闭组播
discovery.zen.ping.unicast.hosts: ["10.51.48.109", "10.171.32.72"] # #单播发现地址 自己的和其他节点的

logstash grop正则

grok匹配测试 http://grokdebug.herokuapp.com/

Example

下面是日志的样子

55.3.244.1 GET /index.html 15824 0.043

正则的例子

%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}

配置文件里是怎么写得?

input {

file {

path => “/var/log/http.log”

}

}

filter {

grok {

match => [ "message", "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}" ]

}

}

解析后,是个什么样子?

client: 55.3.244.1

method: GET

request: /index.html

bytes: 15824

duration: 0.043

自定义正则

(?<field_name>the pattern here)

(?<queue_id>[0-9A-F]{10,11})

当然你也可以把众多的正则,放在一个集中文件里面。

# in ./patterns/postfix

POSTFIX_QUEUEID [0-9A-F]{10,11}

filter {

grok {

patterns_dir => “./patterns”

match => [ "message", "%{SYSLOGBASE} %{POSTFIX_QUEUEID:queue_id}: %{GREEDYDATA:syslog_message}" ]

}

}

############

logstash已经自带了不少的正则,如果想偷懒的话,可以在内置正则里借用下。

USERNAME [a-zA-Z0-9._-]+

USER %{USERNAME}

INT (?:[+-]?(?:[0-9]+))

BASE10NUM (?<![0-9.+-])(?>[+-]?(?:(?:[0-9]+(?:.[0-9]+)?)|(?:.[0-9]+)))

NUMBER (?:%{BASE10NUM})

BASE16NUM (?<![0-9A-Fa-f])(?:[+-]?(?:0x)?(?:[0-9A-Fa-f]+))

BASE16FLOAT \b(?<![0-9A-Fa-f.])(?:[+-]?(?:0x)?(?:(?:[0-9A-Fa-f]+(?:.[0-9A-Fa-f]*)?)|(?:.[0-9A-Fa-f]+)))\b

POSINT \b(?:[1-9][0-9]*)\b

NONNEGINT \b(?:[0-9]+)\b

WORD \b\w+\b

NOTSPACE \S+

SPACE \s*

DATA .*?

GREEDYDATA .*

QUOTEDSTRING (?>(?<!\)(?>”(?>.|[^\"]+)+”|”"|(?>’(?>.|[^\']+)+’)|”|(?>(?>.|[^\]+)+)|`))

UUID [A-Fa-f0-9]{8}-(?:[A-Fa-f0-9]{4}-){3}[A-Fa-f0-9]{12}

# Networking

MAC (?:%{CISCOMAC}|%{WINDOWSMAC}|%{COMMONMAC})

CISCOMAC (?:(?:[A-Fa-f0-9]{4}.){2}[A-Fa-f0-9]{4})

WINDOWSMAC (?:(?:[A-Fa-f0-9]{2}-){5}[A-Fa-f0-9]{2})

COMMONMAC (?:(?:[A-Fa-f0-9]{2}:){5}[A-Fa-f0-9]{2})

IPV6 ((([0-9A-Fa-f]{1,4}:){7}([0-9A-Fa-f]{1,4}|:))|(([0-9A-Fa-f]{1,4}:){6}(:[0-9A-Fa-f]{1,4}|((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){5}(((:[0-9A-Fa-f]{1,4}){1,2})|:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){4}(((:[0-9A-Fa-f]{1,4}){1,3})|((:[0-9A-Fa-f]{1,4})?:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){3}(((:[0-9A-Fa-f]{1,4}){1,4})|((:[0-9A-Fa-f]{1,4}){0,2}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){2}(((:[0-9A-Fa-f]{1,4}){1,5})|((:[0-9A-Fa-f]{1,4}){0,3}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){1}(((:[0-9A-Fa-f]{1,4}){1,6})|((:[0-9A-Fa-f]{1,4}){0,4}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(:(((:[0-9A-Fa-f]{1,4}){1,7})|((:[0-9A-Fa-f]{1,4}){0,5}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:)))(%.+)?

IPV4 (?<![0-9])(?:(?:25[0-5]|2[0-4][0-9]|[0-1]?[0-9]{1,2})[.](?:25[0-5]|2[0-4][0-9]|[0-1]?[0-9]{1,2})[.](?:25[0-5]|2[0-4][0-9]|[0-1]?[0-9]{1,2})[.](?:25[0-5]|2[0-4][0-9]|[0-1]?[0-9]{1,2}))(?![0-9])

IP (?:%{IPV6}|%{IPV4})

HOSTNAME \b(?:[0-9A-Za-z][0-9A-Za-z-]{0,62})(?:.(?:[0-9A-Za-z][0-9A-Za-z-]{0,62}))*(.?|\b)

HOST %{HOSTNAME}

IPORHOST (?:%{HOSTNAME}|%{IP})

HOSTPORT (?:%{IPORHOST=~/./}:%{POSINT})

# paths

PATH (?:%{UNIXPATH}|%{WINPATH})

UNIXPATH (?>/(?>[\w_%!$@:.,-]+|.)*)+

TTY (?:/dev/(pts|tty([pq])?)(\w+)?/?(?:[0-9]+))

WINPATH (?>[A-Za-z]+:|\)(?:\[^\?*]*)+

URIPROTO [A-Za-z]+(+[A-Za-z+]+)?

URIHOST %{IPORHOST}(?::%{POSINT:port})?

# uripath comes loosely from RFC1738, but mostly from what Firefox

# doesn’t turn into %XX

URIPATH (?:/[A-Za-z0-9$.+!*'(){},~:;=@#%_-]*)+

#URIPARAM \?(?:[A-Za-z0-9]+(?:=(?:[^&]*))?(?:&(?:[A-Za-z0-9]+(?:=(?:[^&]*))?)?)*)?

URIPARAM \?[A-Za-z0-9$.+!*’|(){},~@#%&/=:;_?-\[\]]*

URIPATHPARAM %{URIPATH}(?:%{URIPARAM})?

URI %{URIPROTO}://(?:%{USER}(?::[^@]*)?@)?(?:%{URIHOST})?(?:%{URIPATHPARAM})?

# Months: January, Feb, 3, 03, 12, December

MONTH \b(?:Jan(?:uary)?|Feb(?:ruary)?|Mar(?:ch)?|Apr(?:il)?|May|Jun(?:e)?|Jul(?:y)?|Aug(?:ust)?|Sep(?:tember)?|Oct(?:ober)?|Nov(?:ember)?|Dec(?:ember)?)\b

MONTHNUM (?:0?[1-9]|1[0-2])

MONTHDAY (?:(?:0[1-9])|(?:[12][0-9])|(?:3[01])|[1-9])

# Days: Monday, Tue, Thu, etc…

DAY (?:Mon(?:day)?|Tue(?:sday)?|Wed(?:nesday)?|Thu(?:rsday)?|Fri(?:day)?|Sat(?:urday)?|Sun(?:day)?)

# Years?

YEAR (?>\d\d){1,2}

HOUR (?:2[0123]|[01]?[0-9])

MINUTE (?:[0-5][0-9])

# ’60′ is a leap second in most time standards and thus is valid.

SECOND (?:(?:[0-5][0-9]|60)(?:[:.,][0-9]+)?)

TIME (?!<[0-9])%{HOUR}:%{MINUTE}(?::%{SECOND})(?![0-9])

# datestamp is YYYY/MM/DD-HH:MM:SS.UUUU (or something like it)

DATE_US %{MONTHNUM}[/-]%{MONTHDAY}[/-]%{YEAR}

DATE_EU %{MONTHDAY}[./-]%{MONTHNUM}[./-]%{YEAR}

ISO8601_TIMEZONE (?:Z|[+-]%{HOUR}(?::?%{MINUTE}))

ISO8601_SECOND (?:%{SECOND}|60)

TIMESTAMP_ISO8601 %{YEAR}-%{MONTHNUM}-%{MONTHDAY}[T ]%{HOUR}:?%{MINUTE}(?::?%{SECOND})?%{ISO8601_TIMEZONE}?

DATE %{DATE_US}|%{DATE_EU}

DATESTAMP %{DATE}[- ]%{TIME}

TZ (?:[PMCE][SD]T|UTC)

DATESTAMP_RFC822 %{DAY} %{MONTH} %{MONTHDAY} %{YEAR} %{TIME} %{TZ}

DATESTAMP_OTHER %{DAY} %{MONTH} %{MONTHDAY} %{TIME} %{TZ} %{YEAR}

# Syslog Dates: Month Day HH:MM:SS

SYSLOGTIMESTAMP %{MONTH} +%{MONTHDAY} %{TIME}

PROG (?:[\w._/%-]+)

SYSLOGPROG %{PROG:program}(?:\[%{POSINT:pid}\])?

SYSLOGHOST %{IPORHOST}

SYSLOGFACILITY <%{NONNEGINT:facility}.%{NONNEGINT:priority}>

HTTPDATE %{MONTHDAY}/%{MONTH}/%{YEAR}:%{TIME} %{INT}

# Shortcuts

QS %{QUOTEDSTRING}

# Log formats

SYSLOGBASE %{SYSLOGTIMESTAMP:timestamp} (?:%{SYSLOGFACILITY} )?%{SYSLOGHOST:logsource} %{SYSLOGPROG}:

COMMONAPACHELOG %{IPORHOST:clientip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] “(?:%{WORD:verb} %{NOTSPACE:request}(?: HTTP/%{NUMBER:httpversion})?|%{DATA:rawrequest})” %{NUMBER:response} (?:%{NUMBER:bytes}|-)

COMBINEDAPACHELOG %{COMMONAPACHELOG} %{QS:referrer} %{QS:agent}

# Log Levels

LOGLEVEL ([A-a]lert|ALERT|[T|t]race|TRACE|[D|d]ebug|DEBUG|[N|n]otice|NOTICE|[I|i]nfo|INFO|[W|w]arn?(?:ing)?|WARN?(?:ING)?|[E|e]rr?(?:or)?|ERR?(?:OR)?|[C|c]rit?(?:ical)?|CRIT?(?:ICAL)?|[F|f]atal|FATAL|[S|s]evere|SEVERE|EMERG(?:ENCY)?|[Ee]merg(?:ency)?)

awstats

http://blog.csdn.net/wanglipo/article/details/18080819
wget https://prdownloads.sourceforge.net/awstats/awstats-7.5.tar.gz
tar -zxf awstats-7.5.tar.gz 
mv awstats-7.5 /usr/local/awstats
cd /usr/local/awstats/tools/
mkdir -pv /var/lib/awstats
chmod 777 /var/lib/awstats
perl awstats_configure.pl
----- AWStats awstats_configure 1.0 (build 1.9) (c) Laurent Destailleur -----
This tool will help you to configure AWStats to analyze statistics for
one web server. You can try to use it to let it do all that is possible
in AWStats setup, however following the step by step manual setup
documentation (docs/index.html) is often a better idea. Above all if:
- You are not an administrator user,
- You want to analyze downloaded log files without web server,
- You want to analyze mail or ftp log files instead of web log files,
- You need to analyze load balanced servers log files,
- You want to 'understand' all possible ways to use AWStats...
Read the AWStats documentation (docs/index.html).

-----> Running OS detected: Linux, BSD or Unix

-----> Check for web server install

Enter full config file path of your Web server.
Example: /etc/httpd/httpd.conf
Example: /usr/local/apache2/conf/httpd.conf
Example:c:\Programfiles\apachegroup\apache\conf\httpd.conf
Config file path ('none' to skip web server setup):
>/etc/httpd/conf/httpd.conf  根据自己的httpd服务安装的具体路径填写
-----> Check and complete web server config file
'/etc/httpd/conf/httpd.conf'
 Add 'Alias /awstatsclasses "/usr/local/awstats/wwwroot/classes/"'
 Add 'Alias /awstatscss
"/usr/local/awstats/wwwroot/css/"'
 Add 'Alias /awstatsicons
"/usr/local/awstats/wwwroot/icon/"'
 Add 'ScriptAlias /awstats/ "/usr/local/awstats/wwwroot/cgi-bin/"'
 Add '<Directory>' directive
 AWStats directives added to Apache config file.

-----> Update model config file '/usr/local/awstats/wwwroot/cgi-bin/awstats.model.conf'
 File awstats.model.conf updated.

-----> Need to create a new config file ?
Do you want me to build a new AWStats config/profile
file (required if first install) [y/N] ?y

-----> Define config file name to create
What is the name of your web site or profile analysis ?
Example: www.mysite.com
Example: demo
Your web site, virtual server or profile name:
>lingling 可以是任意的名字,也可以是完整的域名格式,只是为了区分你要分析的那份日志的来源的网站,自己注意不要混淆就好。

-----> Define config file path
In which directory do you plan to store your config file(s) ?
Default: /etc/awstats
Directory path to store config file(s) (Enter for default):
>
默认的awstats生成的配置文件目录,根据喜好可以更改。
-----> Create config file '/etc/awstats/awstats.lingling.conf'
 Config file /etc/awstats/awstats.lingling.conf created.

-----> Restart Web server with '/sbin/service httpd restart'
Stopping httpd:                                     [OK]
Starting httpd:                                       [OK]

-----> Add update process inside a scheduler
Sorry, configure.pl does not support automatic add to cron yet.
You can do it manually by adding the following command to your cron:
/usr/local/awstats/wwwroot/cgi-bin/awstats.pl -update -config=lingling
Or if you have several config files and prefer having only one command:
/usr/local/awstats/tools/awstats_updateall.pl now
Press ENTER to continue...

A SIMPLE config file has been created: /etc/awstats/awstats.lingling.conf
You should have a look inside to check and change manually main parameters.
You can then manually update your statistics for 'lingling' with command:
> perl awstats.pl -update -config=lingling
You can also read your statistics for 'lingling' with URL:
> http://localhost/awstats/awstats.pl?config=lingling

Press ENTER to finish...


1、由于httpdlog文件默认是/var/log/httpd/access.log
所以要修改/etc/awstats/awstats.lingling.conf文件里的LogFile
LogFile="/var/log/httpd/mylog.log"改为LogFile="/var/log/httpd/access_log"
或者LogFile="var/log/access_log.%YYYY-0%MM-0%DD-0.log"
2、然后,手动更新一下:
# cd /usr/local/awstats/wwwroot/cgi-bin/
# perl awstats.pl –update –config=lingling
3、打开浏览器,用awstats分析日志:
http://10.100.10.11/awstats/awstats.pl?config=lingling

4、可以将更新的命令作为执行计划,使其每天执行一次,方便分析前一天的日。
# crontab –e
10 1 * * * /usr/local/awstats/wwwroot/cgi-bin/awstats.pl -update -config=lingling > /dev/null 2&>1

三、用awstats分析tomcat的访问日志
1、要分析tomcat的日志,就要首先了解其日志格式。
并比较与httpd的访问日志格式有什么不同之处,然后就可以参照awstats分析httpd日志的格式来定义awstats分析tomcat的日志。
我的tomcat服务器上定义的访问日志格式如下:
<Valve className="org.apache.catalina.valves.
AccessLogValve" directory="logs"
prefix="localhost_access_log." suffix=".txt"
       pattern="%h %l %u %t &quot;%r&quot; %s %b" />
%...a: 远程IP地址  
%...A: 本地IP地址  
%...B: 已发送的字节数,不包含HTTP  
%...b: CLF格式的已发送字节数量,不包含HTTP头。  
例如当没有发送数据时,写入‘-’而不是0  
%e: 环境变量FOOBAR的内容  
%...f: 文件名字  
%...h: 远程主机  
%...H 请求的协议  
%i: Foobar的内容,发送给服务器的请求的标头行。  
%...l: 远程登录名字(来自identd,如提供的话)  
%...m 请求的方法  
%n: 来自另外一个模块的注解“Foobar”的内容  
%o: Foobar的内容,应答的标头行  
%...p: 服务器响应请求时使用的端口  
%...P: 响应请求的子进程ID  
%...q 查询字符串(如果存在查询字符串,则包含“?”后面的  
部分;否则,它是一个空字符串。)  
%...r: 请求的第一行  
%...s: 状态。对于进行内部重定向的请求,这是指*原来*请求  
的状态。如果用%...>s,则是指后来的请求。  
%...t: 以公共日志时间格式表示的时间(或称为标准英文格式)  
%t: 以指定格式format表示的时间  
%...T: 为响应请求而耗费的时间,以秒计  
%...u: 远程用户(来自auth;如果返回状态(%s)是401则可能是伪造的)  
%...U: 用户所请求的URL路径  
%...v: 响应请求的服务器的ServerName  
%...V: 依照UseCanonicalName设置得到的服务器名字  
最后的tomcat的访问日志内容如下:
203.156.200.162 - - [29/Aug/2012:11:16:58 +0800] "GET /front/magazine/getContent.htm?contentId=124504 HTTP/1.1" 
200 20001
2、由于我的tomcat服务器是在其他机器上,所以我将tomcat的服务日志copy到本机的/var/log/httpd/下即可。
copy的文件是:localhost_access_log.2012-08-29.txt
3、配置awstats分析此日志(tomcat 的域名并不是httpd的虚拟主机,所以没有写进httpd.conf文件里面)
# cd /usr/local/awstats/tools
# perl awstats_configure.pl
----- AWStats awstats_configure 1.0 (build 1.9) (c) Laurent Destailleur -----
This tool will help you to configure AWStats to analyze statistics for
one web server. You can try to use it to let it do all that is possible
in AWStats setup, however following the step by step manual setup
documentation (docs/index.html) is often a better idea. Above all if:
- You are not an administrator user,
- You want to analyze downloaded log files without web server,
- You want to analyze mail or ftp log files instead of web log files,
- You need to analyze load balanced servers log files,
- You want to 'understand' all possible ways to use AWStats...
Read the AWStats documentation (docs/index.html).

-----> Running OS detected: Linux, BSD or Unix

-----> Check for web server install

Enter full config file path of your Web server.
Example: /etc/httpd/httpd.conf
Example: /usr/local/apache2/conf/httpd.conf
Example: c:\Program files\apache group\apache\conf\httpd.conf
Config file path ('none' to skip web server setup):
>none
Your web server config file(s) could not be found.
You will need to setup your web server manually to declare AWStats
script as a CGI, if you want to build reports dynamically.
See AWStats setup documentation (file docs/index.html)

-----> Update model config file '/usr/local/awstats/wwwroot/cgi-bin/awstats.model.conf'
 File awstats.model.conf updated.

-----> Need to create a new config file ?
Do you want me to build a new AWStats config/profile
file (required if first install) [y/N] ? y

-----> Define config file name to create
What is the name of your web site or profile analysis ?
Example: www.mysite.com
Example: demo
Your web site, virtual server or profile name:
>buoqu.com
-----> Define config file path
In which directory do you plan to store your config file(s) ?
Default: /etc/awstats
Directory path to store config file(s) (Enter for default):
>

-----> Create config file '/etc/awstats/awstats.buoqu.com.conf'
 Config file /etc/awstats/awstats.buoqu.com.conf created.

-----> Add update process inside a scheduler
Sorry, configure.pl does not support automatic add to cron yet.
You can do it manually by adding the following command to your cron:
/usr/local/awstats/wwwroot/cgi-bin/awstats.pl -update -config=buoqu.com
Or if you have several config files and prefer having only one command:
/usr/local/awstats/tools/awstats_updateall.pl now
Press ENTER to continue...


A SIMPLE config file has been created: /etc/awstats/awstats.buoqu.com.conf
You should have a look inside to check and change manually main parameters.
You can then manually update your statistics for 'buoqu.com' with command:
> perl awstats.pl -update -config=buoqu.com
You can also build static report pages for 'buoqu.com' with command:
> perl awstats.pl -output=pagetype -config=buoqu.com

Press ENTER to finish...
4、修改要分析日志文件
# vim /etc/awstats/awstats.buoqu.com.conf
LogFile="/var/log/httpd/mylog.log"
改为
LogFile="/usr/local/awstats/tools/logresolvemerge.pl /usr/local/awstats/flashlog/china/localhost_access_log.%
YYYY-24-%MM-24-%DD-24.txt /usr/local/awstats/flashlog/usa/localhost_access_log.%YYYY-24-%MM-24-%DD-24.txt |"
5、重启httpd服务,并分析日志
# service httpd restart
# cd /usr/local/awstats/wwwroot/cgi-bin
# perl awstats.pl -update -config=buoqu.com

Setup ('/etc/awstats/awstats.buoqu.com.conf' file, web server or permissions) may be wrong.
Check config file, permissions and AWStats documentation (in 'docs' directory).
出错:日志格式不匹配。
解决:这个时候,就知道我为什么要先了解怎么定义tomcat的日志格式了。
修改文件/etc/awstats/awstats.buoqu.com.conf
# vim /etc/awstats/awstats.buoqu.com.conf
LogFormat = 1
LogFormat ="%host %other %logname %time1 %methodurl %code %bytesd"   自定义格式

# perl awstats.pl -update -config=buoqu.com
6、打开网址查看分析结果:

http://10.100.10.11/awstats/awstats.pl?config=buoqu.com


7、手动执行命令可写入crontab
①、如果,想在分析页面上直接刷新,可以开启AllowToUpdateStatsFromBrowser=1,默认情况下是关闭的。
②、若是想每个页面上都直接有“立即更新”的按钮,而不想每次都手动的修改配置文件的话,可以再awstats的基本配置文件里修改。
# cd /usr/local/awstats/wwwroot/cgi-bin
# vim awstats.model.conf
AllowToUpdateStatsFromBrowser=0改为AllowToUpdateStatsFromBrowser=1即可。
这样,以后的网页都可以直接点击刷新的。
注意:每次修改配置文件后要重启httpd服务
③、若是要在浏览器上直接刷新,那么apache用户就要有对数据文件操作的权限
# chown apache.apache –R /var/lib/awstats
# chmod 755 /var/log/httpd
效果如图:

四、添加一些插件,使awstats看起来更人性化和直观化。
IP显示地区
wget http://geolite.maxmind.com/download/geoip/database/GeoLiteCountry/GeoIP.dat.gz
wget http://geolite.maxmind.com/download/geoip/database/GeoLiteCity.dat.gz
gunzip GeoIP.dat.gz
gunzip GeoLiteCity.dat.gz
mv *.dat /opt/
yum install GeoIP perl-Geo-IP
perl -MCPAN -e "install Geo::IP::PurePerl"

vim /etc/awstats/awstats.www.test.com.conf
将以下语句的#注释去掉:
#LoadPlugin="tooltips"      在html报告中增加一些提示信息
#LoadPlugin="decodeutfkeys" 处理搜索引擎UTF8编码的关键字

#LoadPlugin="geoip GEOIP_STANDARD /pathto/GeoIP.dat" #1429行
#LoadPlugin="geoip_city_maxmind GEOIP_STANDARD /pathto/GeoIPCity.dat" #1438行
修改为:
LoadPlugin="geoip GEOIP_STANDARD /var/geoip/GeoIP.dat"
LoadPlugin="geoip_city_maxmind GEOIP_STANDARD /var/geoip/GeoLiteCity.dat"

 使用QQ纯真版IP
(1) awstatswwwroot下的plugin目录里下载
cd /usr/local/awstats/wwwroot/cgi-bin/plugins
# yum安装时目录为:/usr/share/awstats/wwwroot/cgi-bin/plugins ,没有则建立
wget http://www.haiyun.me/download/qqwry.pl
wget http://www.haiyun.me/download/qqhostinfo.pm
获取最新的IP信息的方式为:到update.cz88.net中下在windows安装版,之后在安装目录里有qqwry.dat,既是最新的数据。
(2)上传qqwryawstatswwwroot下的plugin目录里.
(3) 修改插件配置
#修改qqwry.pl内IP数据目录:
my $ipfile="${DIR}/plugins/qqwry.dat";
(4) 添加插件到awstats
#修改awstats配置加载扩展:
LoadPlugin="qqhostinfo"
删除旧的统计数据库  rm -rf /var/lib/awstats/*
重新生成一下数据库     /usr/local/awstats/wwwroot/cgi-bin/awstats.pl -update -config=
www.test.com

-------------------------------------------------
#!/usr/bin/env bash
rsync -avz /var/lib/awstats/ /usr/local/awstats/var-lib-awstats
find /usr/local/awstats/flashlog/ -mtime +1 -exec rm -f {} \;

Date=$(date +%Y-%m-%d -d "1 day ago")
#101.200.131.163 flash china
rsync -avz '-e ssh -p 27554' 10.44.28.154:/usr/local/tomcat/logs/localhost_access_log.$Date.txt
 /usr/local/awstats/flashlog/china/
rsync -avz '-e ssh -p 27554' 47.88.7.159:/usr/local/tomcat/logs/localhost_access_log.$Date.txt 
 /usr/local/awstats/flashlog/usa/

/usr/local/awstats/wwwroot/cgi-bin/awstats.pl -update -config=flash
# http://monitor.3mang.com/awstats/awstats.pl?config=flash

-----------------------------------------------------------
#awstats log   定时任务
0 1 * * * /bin/bash /usr/local/awstats/scplog.sh > /usr/local/awstats/scplog.log 2&>1

awstats分析多个日志
分析单个日志:
LogFile="/usr/local/nginx/logs/host.access.log"
分析多个日志:
1)分开写
LogFile="/usr/local/awstats/tools/logresolvemerge.pl /usr/local/nginx/logs/231.pcstars_access.log 
/usr/local/nginx/logs/232.pcstars_access.log /usr/local/nginx/logs/233.pcstars_access.log
 /usr/local/nginx/logs/234.pcstars_access.log /usr/local/nginx/logs/mg.pcstars_access.log|"
2)以匹配模式:
LogFile="/usr/local/awstats/tools/logresolvemerge.pl /usr/local/nginx/logs/*.pcstars_access.log|"
说明:使用 awstats 内建的工具logresolvemerge.pl 来合并日志,记的后面加一个"|",表示匹配你要一起合并分析的日志
完成awstats配置文件的设置之后,需要更新记录:
 /usr/local/awstats/tools/awstats_updateall.pl now

 /usr/local/awstats/wwwroot/cgi-bin/awstats.pl -update -config=www.nginx.log -configdir="/etc/awstats"

访问分析

PIWIK
https://piwik.org/


Google Analytics
https://developers.google.com/analytics/?hl=zh-cn

es_date

elasticsearch原生支持date类型,结合该类型和Kibana可以做出漂亮有用的图表。这里简单记录下使用的方法。

使用date类型可以用如下两种方式:

使用毫秒的时间戳,直接将毫秒值传入即可。

传入格式化的字符串,默认是ISO 8601标准,例如2015-02-27T00:07Z(零时区)、2015-02-27T08:07+08:00(东八区),这两个时间实际是同一个,只是时区不同,关于时间戳,可以参见我之前的文章。另外还可以自定义时间格式,参见es的文档。但个人不建议使用自定义格式,设置不当容易遇到时区问题。在php中获取ISO 8601标准的时间很简单,date('c',time())即可。

elasticsearch默认会自动识别date类型,如果想关闭该功能,修改mapping的设置'date_detection' => false即可 。

elasticsearch原生支持date类型,json格式通过字符来表示date类型。所以在用json提交日期至elasticsearch的时候,es会隐式转换,把es认为是date类型的字符串直接转为date类型。至于什么样的字符串es会认为可以转换成date类型,参考elasticsearch官网介绍https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-date-format.html。

date类型是包含时区信息的,如果我们没有在json代表日期的字符串中显式指定时区,对es来说没什么问题,但是如果通过kibana显示es里的数据时,就会出现问题,数据的时间会晚8个小时。因为kibana从es里读取的date类型数据,没有时区信息,kibana会默认当作0时区来解析,但是kibana在通过浏览器展示的时候,会通过js获取当前客户端机器所在的时区,也就是东八区,所以kibana会把从es得到的日期数据减去8小时。这里就会导致kibana经常遇到的“数据时间延迟8小时”的问题。

所以最佳实践方案就是:我们在往es提交日期数据的时候,直接提交带有时区信息的日期字符串,如:“2016-07-15T12:58:17.136+0800”。

logstash区分数据

场景,logstash采集2个数据源ABA数据源的入kafkaB数据源的都入es。

语法

Input{ 
  file { path => a  type => A }
  file { path => b  type => B }
}

output {
  if [type] == "A" {
    kafka {...}
  }
  if [type] == "B" {
    es {...}
  }
}
我的实现的场景:

bin/logstash -e 'input{
   file{
    type => "normal"
    path => "/data/log/test/abc*.log"
    start_position => beginning
    exclude => "*abc*.log"
  }

  file{
    type => "error"
    path => "/data/log/test/*error*.log"
    start_position => beginning
  }
}
output{
if [type] == "error" {
    kafka{
       bootstrap_servers => "127.0.0.1:9092,127.0.0.1:9092"
       topic_id => "loga"
      }
}
    stdout{codec=>rubydebug}
}'

winlogbeat

配置 winlogbeat.yml 配置ignore_older,否则会导入系统里很久之前的数据

winlogbeat.event_logs:
    - name: Application
      ignore_older: 48h
      provider:
          - Application Error
          - Application Hang
          - Windows Error Reporting
          - EMET
    - name: Security
      level: critical, error, warning
      event_id: 4624, 4625, 4700-4800, -4735
      ignore_older: 48h
    - name: System
      level: critical, error, warning
      ignore_older: 48h
    - name: Microsoft-Windows-Windows Defender/Operational
      include_xml: true
      ignore_older: 48h

导入es模板 scripts\import_dashboards.exe -es http://192.168.33.60:9200

安装服务

powershell管理员运行 .\install-service-winlogbeat.ps1

执行报错scripts is disabled on this system 需要先执行 Set-ExecutionPolicy RemoteSigned解除限制

net start/stop winlogbeat

命令行运行 .\winlogbeat.exe -c .\winlogbeat.yml

filebeat例子

https://www.elastic.co/guide/en/logstash/current/logstash-config-for-filebeat-modules.html#parsing-system

Apache 2 Logs

MySQL Logs

Nginx Logs

System Logs

Apache 2 Access Logsedit

Example Filebeat config:

filebeat.prospectors:

  • input_type: log

paths:

  • /var/log/apache2/access.log*

  • /var/log/apache2/other_vhosts_access.log*

exclude_files: [".gz$"]

output.logstash:

hosts: ["localhost:5044"]

Example Logstash pipeline config:

input {

beats {

# The port to listen on for filebeat connections.

port => 5044

# The IP address to listen for filebeat connections.

host => "0.0.0.0"

}

}

filter {

grok {

match => { "message" => ["%{IPORHOST:[apache2][access][remote_ip]} - %{DATA:[apache2][access][user_name]} \[%{HTTPDATE:[apache2][access][time]}\] \"%{WORD:[apache2][access][method]} %{DATA:[apache2][access][url]} HTTP/%{NUMBER:[apache2][access][http_version]}\" %{NUMBER:[apache2][access][response_code]} %{NUMBER:[apache2][access][body_sent][bytes]}( \"%{DATA:[apache2][access][referrer]}\")?( \"%{DATA:[apache2][access][agent]}\")?",

"%{IPORHOST:[apache2][access][remote_ip]} - %{DATA:[apache2][access][user_name]} \[%{HTTPDATE:[apache2][access][time]}\] \"-\" %{NUMBER:[apache2][access][response_code]} -" ] }

remove_field => "message"

}

mutate {

add_field => { "read_timestamp" => "%{@timestamp}" }

}

date {

match => [ "[apache2][access][time]", "dd/MMM/YYYY:H:m:s Z" ]

remove_field => "[apache2][access][time]"

}

useragent {

source => "[apache2][access][agent]"

target => "[apache2][access][user_agent]"

remove_field => "[apache2][access][agent]"

}

geoip {

source => "[apache2][access][remote_ip]"

target => "[apache2][access][geoip]"

}

}

output {

elasticsearch {

hosts => localhost

manage_template => false

index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}"

document_type => "%{[@metadata][type]}"

}

}

Apache 2 Error Logsedit

Example Filebeat config:

filebeat.prospectors:

  • input_type: log

paths:

  • /var/log/apache2/error.log*

exclude_files: [".gz$"]

output.logstash:

hosts: ["localhost:5044"]

Example Logstash pipeline config:

input {

beats {

# The port to listen on for filebeat connections.

port => 5044

# The IP address to listen for filebeat connections.

host => "0.0.0.0"

}

}

filter {

grok {

match => { "message" => ["\[%{APACHE_TIME:[apache2][error][timestamp]}\] \[%{LOGLEVEL:[apache2][error][level]}\]( \[client %{IPORHOST:[apache2][error][client]}\])? %{GREEDYDATA:[apache2][error][message]}",

"\[%{APACHE_TIME:[apache2][error][timestamp]}\] \[%{DATA:[apache2][error][module]}:%{LOGLEVEL:[apache2][error][level]}\] \[pid %{NUMBER:[apache2][error][pid]}(:tid %{NUMBER:[apache2][error][tid]})?\]( \[client %{IPORHOST:[apache2][error][client]}\])? %{GREEDYDATA:[apache2][error][message1]}" ] }

pattern_definitions => {

"APACHE_TIME" => "%{DAY} %{MONTH} %{MONTHDAY} %{TIME} %{YEAR}"

}

remove_field => "message"

}

mutate {

rename => { "[apache2][error][message1]" => "[apache2][error][message]" }

}

date {

match => [ "[apache2][error][timestamp]", "EEE MMM dd H:m:s YYYY", "EEE MMM dd H:m:s.SSSSSS YYYY" ]

remove_field => "[apache2][error][timestamp]"

}

}

output {

elasticsearch {

hosts => localhost

manage_template => false

index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}"

document_type => "%{[@metadata][type]}"

}

}

MySQL Logsedit

Here are some configuration examples for shipping and parsing MySQL error and slowlog logs.

MySQL Error Logsedit

Example Filebeat config:

filebeat.prospectors:

  • input_type: log

paths:

  • /var/log/mysql/error.log*

  • /var/log/mysqld.log*

exclude_files: [".gz$"]

output.logstash:

hosts: ["localhost:5044"]

Example Logstash pipeline config:

input {

beats {

# The port to listen on for filebeat connections.

port => 5044

# The IP address to listen for filebeat connections.

host => "0.0.0.0"

}

}

filter {

grok {

match => { "message" => ["%{LOCALDATETIME:[mysql][error][timestamp]} (\[%{DATA:[mysql][error][level]}\] )?%{GREEDYDATA:[mysql][error][message]}",

"%{TIMESTAMP_ISO8601:[mysql][error][timestamp]} %{NUMBER:[mysql][error][thread_id]} \[%{DATA:[mysql][error][level]}\] %{GREEDYDATA:[mysql][error][message1]}",

"%{GREEDYDATA:[mysql][error][message2]}"] }

pattern_definitions => {

"LOCALDATETIME" => "[0-9]+ %{TIME}"

}

remove_field => "message"

}

mutate {

rename => { "[mysql][error][message1]" => "[mysql][error][message]" }

}

mutate {

rename => { "[mysql][error][message2]" => "[mysql][error][message]" }

}

date {

match => [ "[mysql][error][timestamp]", "ISO8601", "YYMMdd H:m:s" ]

remove_field => "[apache2][access][time]"

}

}

output {

elasticsearch {

hosts => localhost

manage_template => false

index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}"

document_type => "%{[@metadata][type]}"

}

}

MySQL Slowlogedit

Example Filebeat config:

filebeat.prospectors:

  • input_type: log

paths:

  • /var/log/mysql/mysql-slow.log*

  • /var/lib/mysql/hostname-slow.log

exclude_files: [".gz$"]

multiline:

pattern: "^# User@Host: "

negate: true

match: after

output.logstash:

hosts: ["localhost:5044"]

Example Logstash pipeline config:

input {

beats {

# The port to listen on for filebeat connections.

port => 5044

# The IP address to listen for filebeat connections.

host => "0.0.0.0"

}

}

filter {

grok {

match => { "message" => ["^# User@Host: %{USER:[mysql][slowlog][user]}(\[[^\]]+\])? @ %{HOSTNAME:[mysql][slowlog][host]} \[(IP:[mysql][slowlog][ip])?\](\s*Id:\s* %{NUMBER:[mysql][slowlog][id]})?\n# Query_time: %{NUMBER:[mysql][slowlog][query_time][sec]}\s* Lock_time: %{NUMBER:[mysql][slowlog][lock_time][sec]}\s* Rows_sent: %{NUMBER:[mysql][slowlog][rows_sent]}\s* Rows_examined: %{NUMBER:[mysql][slowlog][rows_examined]}\n(SET timestamp=%{NUMBER:[mysql][slowlog][timestamp]};\n)?%{GREEDYMULTILINE:[mysql][slowlog][query]}"] }

pattern_definitions => {

"GREEDYMULTILINE" => "(.|\n)*"

}

remove_field => "message"

}

date {

match => [ "[mysql][slowlog][timestamp]", "UNIX" ]

}

mutate {

gsub => ["[mysql][slowlog][query]", "\n# Time: [0-9]+ [0-9][0-9]:[0-9][0-9]:[0-9][0-9](.[0-9]+)?$", ""]

}

}

output {

elasticsearch {

hosts => localhost

manage_template => false

index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}"

document_type => "%{[@metadata][type]}"

}

}

Nginx Logsedit

Here are some configuration examples for shipping and parsing Nginx access and error logs.

Nginx Access Logsedit

Example Filebeat config:

filebeat.prospectors:

  • input_type: log

paths:

  • /var/log/nginx/access.log*

exclude_files: [".gz$"]

output.logstash:

hosts: ["localhost:5044"]

Example Logstash pipeline config:

input {

beats {

# The port to listen on for filebeat connections.

port => 5044

# The IP address to listen for filebeat connections.

host => "0.0.0.0"

}

}

filter {

grok {

match => { "message" => ["%{IPORHOST:[nginx][access][remote_ip]} - %{DATA:[nginx][access][user_name]} \[%{HTTPDATE:[nginx][access][time]}\] \"%{WORD:[nginx][access][method]} %{DATA:[nginx][access][url]} HTTP/%{NUMBER:[nginx][access][http_version]}\" %{NUMBER:[nginx][access][response_code]} %{NUMBER:[nginx][access][body_sent][bytes]} \"%{DATA:[nginx][access][referrer]}\" \"%{DATA:[nginx][access][agent]}\""] }

remove_field => "message"

}

mutate {

rename => { "@timestamp" => "read_timestamp" }

}

date {

match => [ "[nginx][access][time]", "dd/MMM/YYYY:H:m:s Z" ]

remove_field => "[nginx][access][time]"

}

useragent {

source => "[nginx][access][agent]"

target => "[nginx][access][user_agent]"

remove_field => "[nginx][access][agent]"

}

geoip {

source => "[nginx][access][remote_ip]"

target => "[nginx][access][geoip]"

}

}

output {

elasticsearch {

hosts => localhost

manage_template => false

index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}"

document_type => "%{[@metadata][type]}"

}

}

Nginx Error Logsedit

Example Filebeat config:

filebeat.prospectors:

  • input_type: log

paths:

  • /var/log/nginx/error.log*

exclude_files: [".gz$"]

output.logstash:

hosts: ["localhost:5044"]

Example Logstash pipeline config:

input {

beats {

# The port to listen on for filebeat connections.

port => 5044

# The IP address to listen for filebeat connections.

host => "0.0.0.0"

}

}

filter {

grok {

match => { "message" => ["%{DATA:[nginx][error][time]} \[%{DATA:[nginx][error][level]}\] %{NUMBER:[nginx][error][pid]}#%{NUMBER:[nginx][error][tid]}: (\*%{NUMBER:[nginx][error][connection_id]} )?%{GREEDYDATA:[nginx][error][message]}"] }

remove_field => "message"

}

mutate {

rename => { "@timestamp" => "read_timestamp" }

}

date {

match => [ "[nginx][error][time]", "YYYY/MM/dd H:m:s" ]

remove_field => "[nginx][error][time]"

}

}

output {

elasticsearch {

hosts => localhost

manage_template => false

index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}"

document_type => "%{[@metadata][type]}"

}

}

System Logsedit

Here are some configuration examples for shipping and parsing system logs.

System Authorization Logsedit

Example Filebeat config:

filebeat.prospectors:

  • input_type: log

paths:

  • /var/log/auth.log*

  • /var/log/secure*

exclude_files: [".gz$"]

multiline:

pattern: "^\s"

match: after

output.logstash:

hosts: ["localhost:5044"]

Example Logstash pipeline config:

input {

beats {

# The port to listen on for filebeat connections.

port => 5044

# The IP address to listen for filebeat connections.

host => "0.0.0.0"

}

}

filter {

grok {

match => { "message" => ["%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sshd(?:\[%{POSINT:[system][auth][pid]}\])?: %{DATA:[system][auth][ssh][event]} %{DATA:[system][auth][ssh][method]} for (invalid user )?%{DATA:[system][auth][user]} from %{IPORHOST:[system][auth][ssh][ip]} port %{NUMBER:[system][auth][ssh][port]} ssh2(: %{GREEDYDATA:[system][auth][ssh][signature]})?",

"%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sshd(?:\[%{POSINT:[system][auth][pid]}\])?: %{DATA:[system][auth][ssh][event]} user %{DATA:[system][auth][user]} from %{IPORHOST:[system][auth][ssh][ip]}",

"%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sshd(?:\[%{POSINT:[system][auth][pid]}\])?: Did not receive identification string from %{IPORHOST:[system][auth][ssh][dropped_ip]}",

"%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sudo(?:\[%{POSINT:[system][auth][pid]}\])?: \s*%{DATA:[system][auth][user]} :( %{DATA:[system][auth][sudo][error]} ;)? TTY=%{DATA:[system][auth][sudo][tty]} ; PWD=%{DATA:[system][auth][sudo][pwd]} ; USER=%{DATA:[system][auth][sudo][user]} ; COMMAND=%{GREEDYDATA:[system][auth][sudo][command]}",

"%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} groupadd(?:\[%{POSINT:[system][auth][pid]}\])?: new group: name=%{DATA:system.auth.groupadd.name}, GID=%{NUMBER:system.auth.groupadd.gid}",

"%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} useradd(?:\[%{POSINT:[system][auth][pid]}\])?: new user: name=%{DATA:[system][auth][user][add][name]}, UID=%{NUMBER:[system][auth][user][add][uid]}, GID=%{NUMBER:[system][auth][user][add][gid]}, home=%{DATA:[system][auth][user][add][home]}, shell=%{DATA:[system][auth][user][add][shell]}$",

"%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} %{DATA:[system][auth][program]}(?:\[%{POSINT:[system][auth][pid]}\])?: %{GREEDYMULTILINE:[system][auth][message]}"] }

pattern_definitions => {

"GREEDYMULTILINE"=> "(.|\n)*"

}

remove_field => "message"

}

date {

match => [ "[system][auth][timestamp]", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ]

}

geoip {

source => "[system][auth][ssh][ip]"

target => "[system][auth][ssh][geoip]"

}

}

output {

elasticsearch {

hosts => localhost

manage_template => false

index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}"

document_type => "%{[@metadata][type]}"

}

}

Syslogedit

Example Filebeat config:

filebeat.prospectors:

  • input_type: log

paths:

  • /var/log/messages*

  • /var/log/syslog*

exclude_files: [".gz$"]

multiline:

pattern: "^\s"

match: after

output.logstash:

hosts: ["localhost:5044"]

Example Logstash pipeline config:

input {

beats {

# The port to listen on for filebeat connections.

port => 5044

# The IP address to listen for filebeat connections.

host => "0.0.0.0"

}

}

filter {

grok {

match => { "message" => ["%{SYSLOGTIMESTAMP:[system][syslog][timestamp]} %{SYSLOGHOST:[system][syslog][hostname]} %{DATA:[system][syslog][program]}(?:\[%{POSINT:[system][syslog][pid]}\])?: %{GREEDYMULTILINE:[system][syslog][message]}"] }

pattern_definitions => { "GREEDYMULTILINE" => "(.|\n)*" }

remove_field => "message"

}

date {

match => [ "[system][syslog][timestamp]", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ]

}

}

output {

elasticsearch {

hosts => localhost

manage_template => false

index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}"

document_type => "%{[@metadata][type]}"

}

}

elk优化

refresh_interval for indexing
Elasticsearch 是一个近实时搜索引擎。它实际上是每 1 秒钟刷新一次数据。对于日志分析应用,我们用不着这么实时,
所以 logstash 自带的模板修改成了 5 秒钟。你还可以根据需要继续放大这个刷新间隔以提高数据写入性能。
修改 refresh_interval ,那么只需要新写一个tmpl.json:
{
  "order" : 1,
  "template" : "logstash-*",
  "settings" : {
    "index.refresh_interval" : "30s",
    "index.number_of_replicas" : 0     #关闭副本
  }
}
然后运行 curl -XPUT http://localhost:9200/_template/template_newid -d '@/root/tmpl.json' 即可。
logstash 默认的模板, order0idlogstash,通过 logstash/outputs/elasticsearch 的配置选项 template_name 修改。
新模板要比order

logstash5 root启动

vim /etc/logstash/startup.options

LS_USER=root

/usr/share/logstash/bin/system-install 重新生成启动脚本

重启logstash

es集群重启

elasticsearch集群,有时候可能需要修改配置,增加硬盘,扩展内存等操作,需要对节点进行维护升级。但是业务不能停,
如果直接kill掉节点,可能导致数据丢失。而且集群会认为该节点挂掉了,就开始转移数据,当重启之后,它又会恢复数据,
如果你当前的数据量已经很大了,这是很耗费机器和网络资源的。

官方提供的安全重启集群节点的方法:

第一步:先暂停集群的shard自动均衡。

curl -XPUT http://127.0.0.1:9200/_cluster/settings -d'
{
"transient" : {
"cluster.routing.allocation.enable" : "none"
}
}'
第二步:shutdown你要升级的节点

curl -XPOST http://127.0.0.1:9200/_cluster/nodes/_local/_shutdown
elasticsearch  2.0版本之后移除了shutdown接口,所以用pid直接停止即可。

第三步:升级重启该节点,并确认该节点重新加入到了集群中

第四步:重复2-3步,升级重启其它要升级的节点。

第五步:重启启动集群的shard均衡

curl -XPUT http://127.0.0.1:9200/_cluster/settings -d'
{
"transient" : {
"cluster.routing.allocation.enable" : "all"
}
}'
到此整个集群安全升级并且重启结束。

logio

yum install npm -y
npm install -g log.io  安装

cd /root/.log.io
cat log_server.conf  模板文件  web_server.conf(当前目录下)
exports.config = {
  host: '0.0.0.0',
  port: 28777,
  auth: {
    user: "admin",
    pass: "xxxx"
  }
}

cat harvester.conf
exports.config = {
  nodeName: "application_server",
  logStreams: {
    tomcat_pingtai: [
      "/usr/local/tomcat_pingtai/logs/catalina.out",
    ],
    tomcat_pingtaitest: [
      "/usr/local/tomcat_pingtaitest/logs/catalina.out",
    ],
    tomcat_cuishou: [
      "/usr/local/tomcat_cuishou/logs/catalina.out",
    ],
    tomcat_cuishoutest: [
      "/usr/local/tomcat_cuishoutest/logs/catalina.out",
    ],
    tomcat_blacklist: [
      "/usr/local/tomcat_blacklist/logs/catalina.out",
    ]
  },
  server: {
    host: '0.0.0.0',
    port: 28777
  }
}


nohup log.io-server  & Launch server
nohup log.io-harvester  & Start log harvester 

浏览器打开 http://localhost:28778

nginx 代理
/etc/nginx/conf.d/default.conf 
location /logio {
        proxy_pass http://192.168.1.2:28778/;
        auth_basic "secret";
        auth_basic_user_file /etc/nginx/passwd.db;
    }
    location /socket.io {
        proxy_pass http://192.168.1.2:28778/socket.io;
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "upgrade";
    }

htpasswd -c /etc/nginx/passwd.db admin
chmod 400 /etc/nginx/passwd.db
chown nginx.nginx /etc/nginx/passwd.db

/etc/init.d/nginx reload