Logstash+ElasticSearch处理mysql慢查询日志

遇到一个需求, 需要查询某些业务的慢查询日志. 结果DBA平台那边提供的慢查询日志不能解决实际的业务场景(上报的字段补全), 无奈, 自己挽起袖子上

参考了 这篇文章, 不过自己根据需求做了较多的变更

开始吧

1. 找到日志的位置

先确认是否开启了, 然后找到日志文件的位置

> show variables like '%slow%';
+---------------------+-------------------------------------+
| Variable_name       | Value                               |
+---------------------+-------------------------------------+
| log_slow_queries    | ON                                  |
| slow_launch_time    | 2                                   |
| slow_query_log      | ON                                  |
| slow_query_log_file | /data/mysqllog/20000/slow-query.log |
+---------------------+-------------------------------------+

2. 慢查询日志

格式基本是如下, 当然, 格式如果有差异, 需要根据具体格式进行小的修改

# Time: 160524  5:12:29
# User@Host: user_a[xxxx] @  [10.166.140.109]
# Query_time: 1.711086  Lock_time: 0.000040 Rows_sent: 385489  Rows_examined: 385489
use dbname;
SET timestamp=1464037949;
SELECT 1 from dbname;

3. 使用 logstash 采集

采集, 无非是用multiline进行多行解析

但是, 需要处理的几个问题

第一个是, 去除掉没用的信息

第二个, 慢查询sql, 是会反复出现的, 所以, 执行次数成了一个很重要的指标. 我们要做的, 就是降噪(将参数去掉, 涉及带引号的内容+数字), 将参数类信息过滤掉, 留下核心的sql, 然后计算出一个hash, 这样就可以在查询, 根据这个字段进行聚合. 这里用到了 mutate 以及 checksum

  # calculate unique hash
  mutate {
    add_field => {"sql_for_hash" => "%{sql}"}
  }
  mutate {
    gsub => [
        "sql_for_hash", "'.+?'", "",
        "sql_for_hash", "-?\d*\.{0,1}\d+", ""
    ]
  }
  checksum {
    algorithm => "md5"
    keys => ["sql_for_hash"]
  }

最后算出来的md5, 放入了logstash_checksum

第三个, 某些sql会非常大, 例如某些不规范的sql可能到几百M或是上G….会直接导致采集进程OOM, 所以, 处理时, 设定超过100k丢弃掉

第四个, 默认多行处理, 一条sql可能停留在采集端没有上报, 需要等到下一条sql进来, 这样是有问题的, 如果一直没有后续, 最后一条将不会进入引擎. 所以, 在配置中设定了超过5s自动上报

完整的logstash配置文件(具体使用可能需要根据自身日志格式做些小调整) 注意, 里面的pattern ALLWORD [\s\S]*

input {
  file {
    path => ["/data/mysqllog/20000/slow-query.log"]
    sincedb_path => "/data/LogNew/logstash/sincedb/mysql.sincedb"
    type => "mysql-slow-log"
    add_field => ["env", "PRODUCT"]
    codec => multiline {
      pattern => "^# User@Host:"
      negate => true
      what => previous
      max_bytes => "100kib"
      auto_flush_interval => 5
    }
  }
}
filter {
  if ("multiline_codec_max_bytes_reached" in [tags]) {
      drop {}
  }
  grok {
    # User@Host: logstash[logstash] @ localhost [127.0.0.1]
    # User@Host: logstash[logstash] @  [127.0.0.1]
    match => [ "message", "^# User@Host: %{ALLWORD:user}\[%{ALLWAORD}\] @ %{ALLWORD:dbhost}? \[%{IP:ip}\]" ]
  }
  grok {
    # Query_time: 102.413328  Lock_time: 0.000167 Rows_sent: 0  Rows_examined: 1970
    match => [ "message", "^# Query_time: %{NUMBER:duration:float}%{SPACE}Lock_time: %{NUMBER:lock_wait:float}%{SPACE}Rows_sent: %{NUMBER:results:int}%{SPACE}Rows_examined:%{SPACE}%{NUMBER:scanned:int}%{ALLWORD:sql}"]
  }

  # Capture the time the query happened
  grok {
    match => [ "message", "^SET timestamp=%{NUMBER:timestamp};" ]
  }
  # if codec multiline parse failure
  if ("_grokparsefailure" in [tags]) {
      drop {}
  }
  date {
    match => [ "timestamp", "UNIX" ]
  }

  mutate {
    gsub => [
        "sql", "\nSET timestamp=\d+?;\n", "",
        "sql", "\nuse [a-zA-Z0-9\-\_]+?;", "",
        "sql", "\n# Time: \d+\s+\d+:\d+:\d+", "",
        "sql", "\n/usr/local/mysql/bin/mysqld.+$", "",
        "sql", "\nTcp port:.+$", "",
        "sql", "\nTime .+$", ""
    ]
  }



  # calculate unique hash
  mutate {
    add_field => {"sql_for_hash" => "%{sql}"}
  }
  mutate {
    gsub => [
        "sql_for_hash", "'.+?'", "",
        "sql_for_hash", "-?\d*\.{0,1}\d+", ""
    ]
  }
  checksum {
    algorithm => "md5"
    keys => ["sql_for_hash"]
  }

  # Drop the captured timestamp field since it has been moved to the time of the event
  mutate {
    # TODO: remove the message field
    remove_field => ["timestamp", "message", "sql_for_hash"]
  }
}
output {
    #stdout{
    #    codec => rubydebug
    #}
    #if ("_grokparsefailure" not in [tags]) {
    #    stdout{
    #        codec => rubydebug
    #    }
    #}
    if ("_grokparsefailure" not in [tags]) {
        elasticsearch {
          hosts => ["192.168.1.1:9200"]
          index => "logstash-slowlog"
        }
    }
}

采集进去的内容

{
           "@timestamp" => "2016-05-23T21:12:59.000Z",
             "@version" => "1",
                 "tags" => [
        [0] "multiline"
    ],
                 "path" => "/Users/ken/tx/elk/logstash/data/slow_sql.log",
                 "host" => "Luna-mac-2.local",
                 "type" => "mysql-slow",
                  "env" => "PRODUCT",
                 "user" => "dba_bak_all_sel",
                   "ip" => "10.166.140.109",
             "duration" => 28.812601,
            "lock_wait" => 0.000132,
              "results" => 749414,
              "scanned" => 749414,
                  "sql" => "SELECT /*!40001 SQL_NO_CACHE */ * FROM `xxxxx`;",
    "logstash_checksum" => "3e3ccb89ee792de882a57e2bef6c5371"
}

4. 写查询

查询, 我们需要按logstash_checksum进行聚合, 然后按照次数由多到少降序展示, 同时, 每个logstash_checksum需要有一条具体的sql进行展示

通过 es 的 Top hits Aggregation 可以完美地解决这个查询需求

查询的query

body = {
    "from": 0,
    "size": 0,
    "query": {
        "filtered": {
            "query": {
                "match": {
                    "user": "test"
                }
            },
            "filter": {
                "range": {
                    "@timestamp": {
                        "gte": "now-1d",
                        "lte": "now"
                    }
                }
            }
        }
    },
    "aggs": {
        "top_errors": {
            "terms": {
                "field": "logstash_checksum",
                "size": 20
            },
            "aggs": {
                "top_error_hits": {
                    "top_hits": {
                        "sort": [
                            {
                                "@timestamp":{
                                    "order": "desc"
                                }
                            }
                        ],
                        "_source": {
                            "include": [
                               "user" , "sql", "logstash_checksum", "@timestamp", "duration", "lock_wait", "results", "scanned"
                            ]
                        },
                        "size" : 1
                    }
                }
            }
        }
    }
}

跟这个写法相关的几个参考链接: Terms Aggregation / Elasticsearch filter document group by field

5. 渲染页面

python的后台, 使用sqlparse包, 将sql进行格式化(换行/缩进/大小写), 再往前端传. sqlparse

>>> sql = 'select * from foo where id in (select id from bar);'
>>> print sqlparse.format(sql, reindent=True, keyword_case='upper')
SELECT *
FROM foo
WHERE id IN
  (SELECT id
   FROM bar);

然后在页面上, 使用js进行语法高亮 code-prettify


system

1344 Words

2016-05-24 00:00 +0000