このブログは、数年前にN高等学校を卒業し株式会社ArmorisにやってきたアルバイトKaepiが書いています。
あるもりすぶろぐの内容は個人の意見です。
概要
今回はGCP(Google Cloud Platform)のVMインスタンス(無料枠)にWOWHoneypotを設置し、そのログをElasticsearchで分析します。
今回の分析にはドメインが必要なので、実践される方はドメインの取得とネームサーバーの変更を行ってください。
WOWHoneypotについて導入から分析まで全3回に分けて投稿します。
1. WOWHoneypotのログを分析してみた - 導入編
2. WOWHoneypotのログを分析してみた - Logstash編(今回)
3. WOWHoneypotのログを分析してみた - 分析編
目次
- 環境構築
- ログの同期
- Elasticsearchの用意
- Logstashでデータの加工
- まとめ
環境構築
Name | Version |
---|---|
Elasticsearch | 7.15.2 |
前回の記事でGCPのVMインスタンス上にNginxとWOWHoneypotを構築しましたが、分析に使用するElasticsearch・Kibana・Logstashはコストとセキュリティを考慮して別のマシンで構築します。
今回の解説では
としています。
ログの同期
WOWHoneypotのアクセスログがVMインスタンスに記録されていくので、分析するために手元のUbuntuにログファイルをコピーする必要があります。
手動で移すのは大変なので、rsyncを利用してVMインスタンスからUbuntuにアクセスログを同期する作業を自動化します。
ssh
ローカルからリモートのログを同期したいので、UbuntuからVMインスタンスにssh接続できるようにします。
$ ssh-keygen
このコマンドを実行するとキーペアの保存場所とパスフレーズについて入力が求められまが、どちらも入力せずにEnterを押します。
そうすると/home/ユーザー名/.ssh
のフォルダの中にid_rsa(秘密鍵)
とid_rsa.pub(公開鍵)
が生成されるので、前回と同じ方法でGCPのコンソールから公開鍵を登録します。
rsync
rsyncとはLinux系のOSにデフォルトでインストールされているコマンドのことで、コンピューター間でファイル・ディレクトリを同期することができます。
ただのコピーではなくファイル・ディレクトリが更新されたときの差分だけを取得することができるので、ログの同期などで役立ちます。
まずWOWHoneypotのログを保存するディレクトリを用意します。
$ sudo mkdir /opt/data/wowhoneypot $ sudo chmod 777 /opt/data/wowhoneypot
ディレクトリを用意したら、以下のコマンドを実行してログが同期されるか確認します。
[]
で囲われている箇所については適宜読み替えてください。
$ rsync -av --inplace -e ssh [user1]@[host]:/home/[user2]/wowhoneypot/log /opt/data/wowhoneypot
このコマンドを実行することで、VMインスタンスの/home/user/wowhoneypot/log
のログがUbuntuの/opt/data/wowhoneypot/log
にコピーされます。
ただし、rsyncはファイル・ディレクトリの更新を検知して自動で同期してくれるわけではないので、定期的にrsyncを実行する必要があります。
そのため先程のrsyncコマンドをcronを利用して定期的に実行するようにします。
$ crontab -e
エディターに何を使うか聞かれるので普段使うエディターを選択します。
編集画面を開いたら一番下に以下を追記してください。
*/5 * * * * rsync -av --inplace -e ssh [user1]@[host]:/home/[user2]/wowhoneypot/log /opt/data/wowhoneypot
左側では実行する周期、右側では実行するコマンドを指定しています。
*/5 * * * *
は実行する間隔を設定していて、5分毎に実行されるようになっています。
補足
rsyncで同期したログがLogstashを通すと増殖する問題が発生したので、その原因と解決法をまとめておきます。
まず原因はrsyncの挙動にあり、コピー先に一時ファイルをコピー後、置き換えることで同期をしていますが、この置き換えという動作によって、Logstash側ですでに処理したログでもファイルの更新があったということで再度処理されてしまいます。
少しややこしいので例えを出すと
- rsyncで1から10までのログがコピーされる
- Logstashで1から10までのログが処理される
- 5分後、rsyncが5から15までのログをコピーする
- Logstashで5から15までのログを処理される
- 5から10までのログが2個になる
少し実態は違うかもしれませんが、おおよそこのようなことが起きてしまいます。
そのため解決策として、rsyncに--inplace
オプションを付けました。
これにより、rsyncの挙動が直接上書きコピーするようになるので、Logstash側で新規のログだけを処理するようになります。
Elasticsearchの用意
インストール
Elasticsearch+Kibanaの構築は前シリーズのフィッシングサイトの調査をしてみた
で解説しているので、こちらの記事を参考にしてください。
Logstashはaptで簡単にインストールすることができます。
$ sudo apt install logstash
Elasticsearchの設定
Indexの作成
KibanaのDev Toolからインデックスを作成します。
PUT /wowhoneypot?pretty
このままでもLogstashから投入されるデータは可視化できますが、geoipで取得した座標を地図に表示することができないので、緯度と経度のオブジェクトをgeo_point
の型にする必要があります。
(何もしないとgeoip.location.latとgeoip.location.lonがそれぞれFloatとして扱われてしまう)
Mapping
特定のフィールドの型を明示的に設定するときはmappingを使用します。
PUT /wowhoneypot/_mapping { "properties": { "geoip": { "properties": { "location": { "type": "geo_point" } } } } }
これで座標として認識されるようになります。
Index Templateの活用
また、Index Templatesを使用する方法もあります。
Stack ManagementからIndex ManagementのタブのIndex Templatesを選択して、Create templateで作成できます。
名前とテンプレートを適応するIndexを設定して、MappingsのタブからAdd fieldでフィールドを作成、geoip(Type:Object)とその下にlocation(Type:Geo-point)の2つを追加します。
Mappingと結果は変わりませんが、複数のIndexでgeoipを利用したいときはIndex Templateを使ったほうが効率的です。
前回のシリーズでは事前にすべてのフィールドをmappingで設定していましたが、設定していないフィールドは自動で型が設定されるので必要な箇所だけ設定しても動作します。
その他注意点として、Mappingは後から変更できないので、Indexを作成してデータを投入する前にMappingを作成する必要があります。
Logstashでデータの加工
Logstashはログなどのデータを加工して、Elasticsearchに投入するツールです。
フィルターの設定
サンプルデータ
[2022-01-25 13:34:30+0900] 127.0.0.1 34.145.70.227:80 "GET /owa/auth/x.js HTTP/1.1" 200 False R0VUIC9vd2EvYXV0aC94LmpzIEhUVFAvMS4xCkhvc3Q6IDM0LjE0NS43MC4yMjcKWC1SZWFsLUlQOiAxOTIuMjQxLjIxMS4xODkKWC1Gb3J3YXJkZWQtRm9yOiAxOTIuMjQxLjIxMS4xODkKVXNlci1BZ2VudDogTW96aWxsYS81LjAgemdyYWIvMC54CkFjY2VwdDogKi8qCkNvb2tpZTogWC1Bbm9uUmVzb3VyY2U9dHJ1ZTsgWC1Bbm9uUmVzb3VyY2UtQmFja2VuZD1sb2NhbGhvc3QvZWNwL2RlZmF1bHQuZmx0P34zOyBYLUJFUmVzb3VyY2U9bG9jYWxob3N0L293YS9hdXRoL2xvZ29uLmFzcHg/fjM7CkFjY2VwdC1FbmNvZGluZzogZ3ppcAoK
WOWHoneypotが出力するアクセスログはこのようになっています。
これを分析しやすいように要素ごとに切り分けてElasticsearchに投入する必要があるので、logstashのコンフィグを書いていきます。
$ sudo vim /etc/logstash/conf.d/logstash.conf
input { file { mode => "tail" path => ["/opt/data/wowhoneypot/log/access_log"] sincedb_path => "/opt/data/wowhoneypot/sincedb" start_position => "beginning" codec => plain { charset => "UTF-8" } } } filter { grok { match => ["message", "\A\[%{TIMESTAMP_ISO8601:timestamp}\]%{SPACE}%{IP:src_ip}%{SPACE}%{IPORHOST:http.hostname}:%{NUMBER:dest_port}%{SPACE}\"%{WORD:http.http_method}%{SPACE}(?:%{URIPATHPARAM:http.url}|%{URI:http.url})%{SPACE}%{DATA:http.protocol}\"%{SPACE}%{NUMBER:http.http_status}%{SPACE}%{WORD:mrrid}%{SPACE}%{GREEDYDATA:http.request_b64}"] remove_field => ["src_ip"] } date { match => [ "timestamp", "yyyy-MM-dd HH:mm:ssZ" ] remove_field => ["timestamp"] } ruby { init => "require 'base64'" code => "event.set('http.request', Base64.decode64(event.get('http.request_b64')))" } grok { match => ["http.request", "%{WORD:method} (?:%{URIPATHPARAM:url}|%{URI:url})%{DATA:protocol}\nHost: (?:%{IPORHOST:http.host}|%{IP:http.host})\nX-Real-IP: %{IP:src_ip}\nX-Forwarded-For: %{IP:ip}\n(?:Content-Length: %{WORD:length}\n)?User-Agent: %{GREEDYDATA:data}"] remove_field => ["method", "url", "protocol", "host", "length"] } mutate { gsub => ["data", "\n", "::"] } mutate { split => {"data" => "::"} } mutate { add_field => {"user-agent" => "%{[data][0]}"} } ruby { code => ' if event.get("http.http_method") == "POST" event.set("body", event.get("data")[event.get("data").length-1]) end ' } geoip { source => "ip" target => "geoip" database => ["/etc/logstash/geoip/GeoLite2-City.mmdb"] fields => ["location", "country_name", "city_name"] } mutate { remove_field => ["message", "http.request_b64", "http.host", "data", "ip", "host"] } } output { elasticsearch { hosts => ["localhost:9200"] index => "wowhoneypot" } stdout { codec => rubydebug } }
今回はこのような設定を用意したので、上から順に解説していきます。
1. input
このブロックでは主に処理するログファイルの扱いを設定しています。
- mode
ファイルの読み取りモードを指定しています。
Tailに設定するとログが追加されたときに動作するようになります。 - path
読み込むログファイルのパスです。 - sincedb_path
どこまでログを処理したかを記録するファイルのパスです。 これを設定すると、ログが追加されたときに追加された分のログを処理できるようになります。 - start_position
beginningにすることで、ファイルの先頭から読み込むようになります。 - codec
読み込むファイルの文字コードを指定しています。
2. filter
inputから受け取ったデータをどのように切り分けるかを設定するブロックです。
grok
あまり人間が読みやすいものではありませんが、実際に受け取ったログを切り分ける大半の作業はgrokが行っています。
仕組みとしては正規表現のようなもので、処理するデータに当てはまるようにパターンを記入しています。
このコードで以下のフィールドに切り分けています。次の行で
src_ip
フィールドを削除していますが、これはwowhoneypotが受け取った接続元のipがリバースプロキシによって127.0.0.1
になってしまっているためです。date
grokで日時のフィールドをtimestamp
として切り出しましたが、これとは別にLogstashが処理した時間を記録する@timestamp
というフィールドが追加されてしまいます。
なので、@timestamp
にtimestamp
のデータを上書きして、timestamp
のフィールドを削除するという作業を行っています。- ruby
ログの後半にあるbase64でエンコードされたhttp requestをデコードしています。 grok
2回目のgrokです。
上の行でデコードしたhttp requestからさらに情報を切り分けています。- HTTPメソッド
- URLパスパラメーター
- HTTPプロトコル
- ホストIP
- 接続元IP
- Content-Length
- UserAgent
- (POSTだった場合) Body
最初のgrokで切り出した要素と重なるところがあるので、不要なフィールドは削除しています。
- mutate
mutateは様々な処理を行えるブロックです。
大半の情報はgrokで切り出すことができますが、それだけでは対応できないところもあるので補助的な処理をここに記入しています。 - geoip
ipアドレスからおおよその所在地を調べることができる機能です。
使用するには事前にデータベースをダウンロードしておく必要があります。(使用したのはMaxMindのGeoLite2-City)
3. output
最後に、処理したデータをどこに出力するかを設定します。
- elasticsearch
名前の通りelasticsearchにデータを投入するための設定です。
Elasticsearchに接続するためのhostsと、投入先のindexを指定しています。 - stdout
実際に運用する際には不要ですが、filterのテストをする際に出力結果を確認するために標準出力も設定しています。
Logstashの起動
作成したlogstash.conf
をsystemdが読み込めるように権限を変更します。
$ sudo chmod 644 /etc/logstash/conf.d/logstash.conf
access_logが更新されたらElasticsearchに投入したいので、Logstashもsystemdを使用して常時起動させておきます。
デフォルトで/etc/systemd/system/logstash.service
が生成されていますが、少し使いづらいので1行書き換えます。
$ sudo vim /etc/systemd/system/logstash.service
[Unit] Description=logstash [Service] Type=simple User=logstash Group=logstash # Load env vars from /etc/default/ and /etc/sysconfig/ if they exist. # Prefixing the path with '-' makes it try to load, but if the file doesn't # exist, it continues onward. EnvironmentFile=-/etc/default/logstash EnvironmentFile=-/etc/sysconfig/logstash ExecStart=/usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/logstash.conf Restart=always WorkingDirectory=/ Nice=19 LimitNOFILE=16384 # When stopping, how long to wait before giving up and sending SIGKILL? # Keep in mind that SIGKILL on a process can cause data loss. TimeoutStopSec=infinity [Install] WantedBy=multi-user.target
変更後の内容は以上です。
ExecStart
の行をExecStart=/usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/logstash.conf
に置き換えています。
補足
Logstashを実行するときの権限問題で詰まった部分があったので補足をしておきます。
logstash.confの動作テストをするときにsudo /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/logstash.conf
など管理者権限で一度でも起動してしまうと、実行に必要なファイルがroot以外開けない状態で生成されてしまうので、logstash.serviceのUserとGroupをrootにする必要が出てしまいます。
Logstashの起動に戻ります。
$ sudo systemctl daemon-reload $ sudo systemctl restart logstash
serviceを書き換えたので、daemon-reloadとサービスの再起動をする必要があります。
ブラウザからKibanaを開いて、データが追加されていたらLogstash編は終了です。
まとめ
今回はGCPのVMインスタンスに構築したWOWHoneypotから取得したアクセスログを、Elasticsearch+Kibanaで可視化・分析できるようにrsyncとLogstashの構築と設定をしました。
rsyncとcronを使ってファイルの同期を行ったり、systemdを使ったソフトウェアのサービス化など、データの分析以外でも役に立つ知識なので、手段の一つとして知っておくと便利だと思います。
次回は蓄積したデータを分析して、発見した傾向などを紹介します。