Slackでサーバの死活監視を行う

Incoming Webhookを利用してSlackのチャンネルへサーバから通知を飛ばして、死活監視を行ったのでそれについてまとめます。
実行結果は以下のようになります。

f:id:kisaragi211:20200504135431p:plain]

準備

自身専用のワークスペースとサーバ管理用のチャンネルがあると良いともいます。
チャンネルへ通知を飛ばすためにはWebhookURLというものが必要になります。
そのためSlackチャンネル内のAppの部分でIncoming Webhookと検索して追加します。
その後設定の画面にいくとWebHookURLが書いてある行があるのでコピーしておきます。

コード作成

公式のセットアップ手順でいくとcurlを使ってSlackへメッセージを送信していますが、今回私はPythonを使ってメッセージを飛ばすことにします。
公式のコマンドは最終的に以下のようになっています。

curl -X POST --data-urlencode "payload={\"channel\": \"#server\", \"username\": \"webhookbot\", \"text\": \"これは webhookbot という名のボットから #server に投稿されています。\", \"icon_emoji\": \":ghost:\"}" {WebhookURL}

これはjson形式のデータをPOSTしているだけなのでもちろんPythonでもできます。
以下が私が作成したプログラムです。

#!/usr/bin/python3

import requests, json

WEBHOOK_URL = "{WebhookURL}"
CHANNEL = "#server"
USERNAME = "webhookbot"

def doPost(url, channel, username, text):
    payload = json.dumps({
        "channel": channel,
        "username": username,
        "text": text,
    })

    requests.post(url, payload)

if __name__ == '__main__':
    text = "サーバは動作しています\n"

    doPost(WEBHOOK_URL, CHANNEL, USERNAME, text)

WBHOOK_URLには自身のURLを設定してください。
コードの内容としては単純なものでスクリプトをそのままPythonに置き換えただけです。
jsonライブラリでjson形式にしたデータをrequest.post関数を使ってPOSTしています。
これによってcurlでpayloadをPOSTした時と同じ動作になります。

このスクリプトをcronで定期的にサーバで動作させることで死活監視が行えます。
その際通知がひどいことになるので専用チャンネルを作ってミュートをしておくと良いと思います。

私は上記のコードに加えてログインチェックを行う処理も追加して動作させています。
以下が+αしたコードになります。

#!/usr/bin/python3

import requests, json, subprocess, ipaddress

ALLOW_IP_LIST = ['x.x.x.x/x']
WEBHOOK_URL = "{WebhookURL}"
CHANNEL = "#server"
USERNAME = "webhookbot"

def check_login():
    ret = '----- check_login -----\n'
    deallow_ip_flag = True
    cmd = 'grep Accepted /var/log/auth.log | tail -10'
    proc = subprocess.run(cmd, shell=True, stdout=subprocess.PIPE)

    for line in proc.stdout.decode('utf-8').split('\n'):
        if not line:
            ret += 'all ok\n'
            break
        line = line.split()
        month = line[0]
        day = line[1]
        time = line[2]
        user = line[8]
        ip = ipaddress.ip_address(line[10])
        deallow_ip_flag = True
        for nw in ALLOW_IP_LIST:
            nw = ipaddress.ip_network(nw)
            if ip in nw:
                deallow_ip_flag = False
        if deallow_ip_flag:
            ret += '{}-{}-{} {} {}\n'.format(month, day, time, user, ip)
            break
    return [deallow_ip_flag, ret]

def doPost(url, channel, username, text):
    payload = json.dumps({
        "channel": channel,
        "username": username,
        "text": text,
    })

    requests.post(url, payload)

if __name__ == '__main__':
    text = "サーバは動作しています\n"
    ret = check_login()

    if ret[0]:
        text = '<!channel>\n' + text

    text += ret[1]

    doPost(WEBHOOK_URL, CHANNEL, USERNAME, text)

動作について説明します。
これはauth.logのログイン成功時の行を抽出してその中に想定IP以外のIPが存在するかを確認します。
ALLOW_IP_LISTは自宅のIPや携帯のIPなど自身が使用するであろう想定のIPを書いておきます。
成功時のログを見て、想定IPしかなかった場合はall ok というメッセージをチャンネルへ送信します。
もし想定していないIPのログインが確認された場合、{日付・時間・ログインユーザ名・IP}という簡易ログをチャンネルへのメンションを付加して送信します。
チャンネルへのメンションは"<!channnel>"で行うことができます。
チャンネルへメンションするとミュートにしている場合でも、通知の欄が赤くなるので通常よりは異常に気付きやすくなると思います。

まとめ

これらのソースはgithub上で管理していて今後も機能を少しずつ充実させていこうと思っています。

github.com

少しでも参考になれば良いと思います。

+αのログインチェックは中々お粗末なものではあるので信頼性には欠けますが、死活監視のついでに行う処理としては良いのでないかと思います。