WOWHoneypotのログを分析してみた - Logstash編

このブログは、数年前にN高等学校を卒業し株式会社ArmorisにやってきたアルバイトKaepiが書いています。

あるもりすぶろぐの内容は個人の意見です。

概要

今回はGCP(Google Cloud Platform)のVMインスタンス(無料枠)にWOWHoneypotを設置し、そのログをElasticsearchで分析します。
今回の分析にはドメインが必要なので、実践される方はドメインの取得とネームサーバーの変更を行ってください。

WOWHoneypotについて導入から分析まで全3回に分けて投稿します。
1. WOWHoneypotのログを分析してみた - 導入編
2. WOWHoneypotのログを分析してみた - Logstash編(今回)
3. WOWHoneypotのログを分析してみた - 分析編

目次

  1. 環境構築
  2. ログの同期
  3. Elasticsearchの用意
  4. Logstashでデータの加工
  5. まとめ

環境構築

Name Version
Elasticsearch 7.15.2

前回の記事でGCPVMインスタンス上にNginxとWOWHoneypotを構築しましたが、分析に使用するElasticsearch・Kibana・Logstashはコストとセキュリティを考慮して別のマシンで構築します。
今回の解説では

としています。

ログの同期

WOWHoneypotのアクセスログVMインスタンスに記録されていくので、分析するために手元のUbuntuにログファイルをコピーする必要があります。
手動で移すのは大変なので、rsyncを利用してVMインスタンスからUbuntuアクセスログを同期する作業を自動化します。

ssh

ローカルからリモートのログを同期したいので、UbuntuからVMインスタンスssh接続できるようにします。

$ ssh-keygen

このコマンドを実行するとキーペアの保存場所とパスフレーズについて入力が求められまが、どちらも入力せずにEnterを押します。
そうすると/home/ユーザー名/.sshのフォルダの中にid_rsa(秘密鍵)id_rsa.pub(公開鍵)が生成されるので、前回と同じ方法でGCPのコンソールから公開鍵を登録します。

rsync

rsyncとはLinux系のOSにデフォルトでインストールされているコマンドのことで、コンピューター間でファイル・ディレクトリを同期することができます。
ただのコピーではなくファイル・ディレクトリが更新されたときの差分だけを取得することができるので、ログの同期などで役立ちます。

まずWOWHoneypotのログを保存するディレクトリを用意します。

$ sudo mkdir /opt/data/wowhoneypot
$ sudo chmod 777 /opt/data/wowhoneypot

ディレクトリを用意したら、以下のコマンドを実行してログが同期されるか確認します。
[]で囲われている箇所については適宜読み替えてください。

$ rsync -av --inplace -e ssh [user1]@[host]:/home/[user2]/wowhoneypot/log /opt/data/wowhoneypot

このコマンドを実行することで、VMインスタンス/home/user/wowhoneypot/logのログがUbuntu/opt/data/wowhoneypot/logにコピーされます。

ただし、rsyncはファイル・ディレクトリの更新を検知して自動で同期してくれるわけではないので、定期的にrsyncを実行する必要があります。
そのため先程のrsyncコマンドをcronを利用して定期的に実行するようにします。

$ crontab -e

エディターに何を使うか聞かれるので普段使うエディターを選択します。
編集画面を開いたら一番下に以下を追記してください。

*/5 * * * * rsync -av --inplace -e ssh [user1]@[host]:/home/[user2]/wowhoneypot/log /opt/data/wowhoneypot

左側では実行する周期、右側では実行するコマンドを指定しています。
*/5 * * * *は実行する間隔を設定していて、5分毎に実行されるようになっています。

補足

rsyncで同期したログがLogstashを通すと増殖する問題が発生したので、その原因と解決法をまとめておきます。
まず原因はrsyncの挙動にあり、コピー先に一時ファイルをコピー後、置き換えることで同期をしていますが、この置き換えという動作によって、Logstash側ですでに処理したログでもファイルの更新があったということで再度処理されてしまいます。
少しややこしいので例えを出すと

  1. rsyncで1から10までのログがコピーされる
  2. Logstashで1から10までのログが処理される
  3. 5分後、rsyncが5から15までのログをコピーする
  4. Logstashで5から15までのログを処理される
  5. 5から10までのログが2個になる

少し実態は違うかもしれませんが、おおよそこのようなことが起きてしまいます。
そのため解決策として、rsync--inplaceオプションを付けました。
これにより、rsyncの挙動が直接上書きコピーするようになるので、Logstash側で新規のログだけを処理するようになります。

Elasticsearchの用意

インストール

Elasticsearch+Kibanaの構築は前シリーズのフィッシングサイトの調査をしてみたで解説しているので、こちらの記事を参考にしてください。

Logstashはaptで簡単にインストールすることができます。

$ sudo apt install logstash

Elasticsearchの設定

Indexの作成

KibanaのDev Toolからインデックスを作成します。

PUT /wowhoneypot?pretty

このままでもLogstashから投入されるデータは可視化できますが、geoipで取得した座標を地図に表示することができないので、緯度と経度のオブジェクトをgeo_pointの型にする必要があります。
(何もしないとgeoip.location.latとgeoip.location.lonがそれぞれFloatとして扱われてしまう)

Mapping

特定のフィールドの型を明示的に設定するときはmappingを使用します。

PUT /wowhoneypot/_mapping
{
  "properties": {
    "geoip": {
      "properties": {
        "location": {
          "type": "geo_point"
        }
      }
    }
  }
}

これで座標として認識されるようになります。

Index Templateの活用

また、Index Templatesを使用する方法もあります。
Stack ManagementからIndex ManagementのタブのIndex Templatesを選択して、Create templateで作成できます。
名前とテンプレートを適応するIndexを設定して、MappingsのタブからAdd fieldでフィールドを作成、geoip(Type:Object)とその下にlocation(Type:Geo-point)の2つを追加します。
Mappingと結果は変わりませんが、複数のIndexでgeoipを利用したいときはIndex Templateを使ったほうが効率的です。

前回のシリーズでは事前にすべてのフィールドをmappingで設定していましたが、設定していないフィールドは自動で型が設定されるので必要な箇所だけ設定しても動作します。
その他注意点として、Mappingは後から変更できないので、Indexを作成してデータを投入する前にMappingを作成する必要があります。

Logstashでデータの加工

Logstashはログなどのデータを加工して、Elasticsearchに投入するツールです。

フィルターの設定

サンプルデータ

[2022-01-25 13:34:30+0900] 127.0.0.1 34.145.70.227:80 "GET /owa/auth/x.js HTTP/1.1" 200 False R0VUIC9vd2EvYXV0aC94LmpzIEhUVFAvMS4xCkhvc3Q6IDM0LjE0NS43MC4yMjcKWC1SZWFsLUlQOiAxOTIuMjQxLjIxMS4xODkKWC1Gb3J3YXJkZWQtRm9yOiAxOTIuMjQxLjIxMS4xODkKVXNlci1BZ2VudDogTW96aWxsYS81LjAgemdyYWIvMC54CkFjY2VwdDogKi8qCkNvb2tpZTogWC1Bbm9uUmVzb3VyY2U9dHJ1ZTsgWC1Bbm9uUmVzb3VyY2UtQmFja2VuZD1sb2NhbGhvc3QvZWNwL2RlZmF1bHQuZmx0P34zOyBYLUJFUmVzb3VyY2U9bG9jYWxob3N0L293YS9hdXRoL2xvZ29uLmFzcHg/fjM7CkFjY2VwdC1FbmNvZGluZzogZ3ppcAoK

WOWHoneypotが出力するアクセスログはこのようになっています。
これを分析しやすいように要素ごとに切り分けてElasticsearchに投入する必要があるので、logstashのコンフィグを書いていきます。

$ sudo vim /etc/logstash/conf.d/logstash.conf
input {
    file {
        mode => "tail"
        path => ["/opt/data/wowhoneypot/log/access_log"]
        sincedb_path => "/opt/data/wowhoneypot/sincedb"
        start_position => "beginning"
        codec => plain {
            charset => "UTF-8"
        }
    }
}

filter {
    grok { 
        match => ["message", "\A\[%{TIMESTAMP_ISO8601:timestamp}\]%{SPACE}%{IP:src_ip}%{SPACE}%{IPORHOST:http.hostname}:%{NUMBER:dest_port}%{SPACE}\"%{WORD:http.http_method}%{SPACE}(?:%{URIPATHPARAM:http.url}|%{URI:http.url})%{SPACE}%{DATA:http.protocol}\"%{SPACE}%{NUMBER:http.http_status}%{SPACE}%{WORD:mrrid}%{SPACE}%{GREEDYDATA:http.request_b64}"]
        remove_field => ["src_ip"]
    }
    date {
        match => [ "timestamp", "yyyy-MM-dd HH:mm:ssZ" ]
        remove_field => ["timestamp"]
    }
    ruby {
        init => "require 'base64'"
        code => "event.set('http.request', Base64.decode64(event.get('http.request_b64')))"
    }
    grok {
        match => ["http.request", "%{WORD:method} (?:%{URIPATHPARAM:url}|%{URI:url})%{DATA:protocol}\nHost: (?:%{IPORHOST:http.host}|%{IP:http.host})\nX-Real-IP: %{IP:src_ip}\nX-Forwarded-For: %{IP:ip}\n(?:Content-Length: %{WORD:length}\n)?User-Agent: %{GREEDYDATA:data}"]
        remove_field => ["method", "url", "protocol", "host", "length"]
    }
    mutate { gsub => ["data", "\n", "::"] }
    mutate { split => {"data" => "::"} }
    mutate { add_field => {"user-agent" => "%{[data][0]}"} }
    ruby {
        code => '
        if event.get("http.http_method") == "POST"
            event.set("body", event.get("data")[event.get("data").length-1])
        end
        '
    }
    geoip {
        source => "ip"
        target => "geoip"
        database => ["/etc/logstash/geoip/GeoLite2-City.mmdb"]
        fields => ["location", "country_name", "city_name"]
    }
    mutate {
        remove_field => ["message", "http.request_b64", "http.host", "data", "ip", "host"]
    }
}

output {
    elasticsearch {
        hosts => ["localhost:9200"]
        index => "wowhoneypot"
    }
    stdout { codec => rubydebug }
}

今回はこのような設定を用意したので、上から順に解説していきます。

1. input

このブロックでは主に処理するログファイルの扱いを設定しています。

  • mode
    ファイルの読み取りモードを指定しています。
    Tailに設定するとログが追加されたときに動作するようになります。
  • path
    読み込むログファイルのパスです。
  • sincedb_path
    どこまでログを処理したかを記録するファイルのパスです。 これを設定すると、ログが追加されたときに追加された分のログを処理できるようになります。
  • start_position
    beginningにすることで、ファイルの先頭から読み込むようになります。
  • codec
    読み込むファイルの文字コードを指定しています。

2. filter

inputから受け取ったデータをどのように切り分けるかを設定するブロックです。

  • grok
    あまり人間が読みやすいものではありませんが、実際に受け取ったログを切り分ける大半の作業はgrokが行っています。
    仕組みとしては正規表現のようなもので、処理するデータに当てはまるようにパターンを記入しています。
    このコードで以下のフィールドに切り分けています。

    次の行でsrc_ipフィールドを削除していますが、これはwowhoneypotが受け取った接続元のipがリバースプロキシによって127.0.0.1になってしまっているためです。

  • date
    grokで日時のフィールドをtimestampとして切り出しましたが、これとは別にLogstashが処理した時間を記録する@timestampというフィールドが追加されてしまいます。
    なので、@timestamptimestampのデータを上書きして、timestampのフィールドを削除するという作業を行っています。

  • ruby
    ログの後半にあるbase64エンコードされたhttp requestをデコードしています。
  • grok
    2回目のgrokです。
    上の行でデコードしたhttp requestからさらに情報を切り分けています。

    • HTTPメソッド
    • URLパスパラメーター
    • HTTPプロトコル
    • ホストIP
    • 接続元IP
    • Content-Length
    • UserAgent
    • (POSTだった場合) Body

    最初のgrokで切り出した要素と重なるところがあるので、不要なフィールドは削除しています。

  • mutate
    mutateは様々な処理を行えるブロックです。
    大半の情報はgrokで切り出すことができますが、それだけでは対応できないところもあるので補助的な処理をここに記入しています。
  • geoip ipアドレスからおおよその所在地を調べることができる機能です。
    使用するには事前にデータベースをダウンロードしておく必要があります。(使用したのはMaxMindのGeoLite2-City)

3. output

最後に、処理したデータをどこに出力するかを設定します。

  • elasticsearch
    名前の通りelasticsearchにデータを投入するための設定です。
    Elasticsearchに接続するためのhostsと、投入先のindexを指定しています。
  • stdout
    実際に運用する際には不要ですが、filterのテストをする際に出力結果を確認するために標準出力も設定しています。

Logstashの起動

作成したlogstash.confをsystemdが読み込めるように権限を変更します。

$ sudo chmod 644 /etc/logstash/conf.d/logstash.conf

access_logが更新されたらElasticsearchに投入したいので、Logstashもsystemdを使用して常時起動させておきます。
デフォルトで/etc/systemd/system/logstash.serviceが生成されていますが、少し使いづらいので1行書き換えます。

$ sudo vim /etc/systemd/system/logstash.service
[Unit]
Description=logstash

[Service]
Type=simple
User=logstash
Group=logstash
# Load env vars from /etc/default/ and /etc/sysconfig/ if they exist.
# Prefixing the path with '-' makes it try to load, but if the file doesn't
# exist, it continues onward.
EnvironmentFile=-/etc/default/logstash
EnvironmentFile=-/etc/sysconfig/logstash
ExecStart=/usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/logstash.conf
Restart=always
WorkingDirectory=/
Nice=19
LimitNOFILE=16384

# When stopping, how long to wait before giving up and sending SIGKILL?
# Keep in mind that SIGKILL on a process can cause data loss.
TimeoutStopSec=infinity

[Install]
WantedBy=multi-user.target

変更後の内容は以上です。 ExecStartの行をExecStart=/usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/logstash.confに置き換えています。

補足

Logstashを実行するときの権限問題で詰まった部分があったので補足をしておきます。
logstash.confの動作テストをするときにsudo /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/logstash.confなど管理者権限で一度でも起動してしまうと、実行に必要なファイルがroot以外開けない状態で生成されてしまうので、logstash.serviceのUserとGroupをrootにする必要が出てしまいます。
Logstashの起動に戻ります。

$ sudo systemctl daemon-reload
$ sudo systemctl restart logstash

serviceを書き換えたので、daemon-reloadとサービスの再起動をする必要があります。
ブラウザからKibanaを開いて、データが追加されていたらLogstash編は終了です。

まとめ

今回はGCPVMインスタンスに構築したWOWHoneypotから取得したアクセスログを、Elasticsearch+Kibanaで可視化・分析できるようにrsyncとLogstashの構築と設定をしました。
rsyncとcronを使ってファイルの同期を行ったり、systemdを使ったソフトウェアのサービス化など、データの分析以外でも役に立つ知識なので、手段の一つとして知っておくと便利だと思います。
次回は蓄積したデータを分析して、発見した傾向などを紹介します。