2026年1月14日水曜日

AI エージェントを作ってみた感想

AI エージェントを作ってみた感想

これまでにBotはいくつか作っていました

https://blog.kakakikikeke.com/2025/10/no-life-no-bot.html

それもエージェントと言えばエージェントなのかもしれませんがもう少しエンジニアっぽい AI エージェントを作ってみたので感想やらつらつら残しておきます

作ったもの

  • 不正プロセス監視エージェント
  • 不正ログ監視エージェント
  • 株価予測エージェント

不正プロセス監視エージェントのソースコード

#!/usr/bin/env python3
import json
import os
import subprocess
import time
from collections import deque

from google import genai
from slack_sdk import WebClient

GEMINI_API_KEY = "xxx"
SLACK_TOKEN = "xoxb-xxx"
SLACK_CHANNEL = "#general"
CUSTOM_BASE_URL = "https://your-llm-domain/endpoint"

# バッチ処理の設定
BATCH_MODE = (
    os.getenv("BATCH_MODE", "true").lower() == "true"
)  # デフォルトはバッチモード
BATCH_SIZE = int(os.getenv("BATCH_SIZE", "5"))  # 一度に判定するプロセス数
BATCH_TIMEOUT = int(os.getenv("BATCH_TIMEOUT", "10"))  # タイムアウト時間(秒)
MONITOR_INTERVAL = int(os.getenv("MONITOR_INTERVAL", "30"))  # プロセス取得間隔(秒)

print(f"Mode: {'BATCH' if BATCH_MODE else 'SINGLE'}")
print(
    f"BATCH_SIZE: {BATCH_SIZE}, BATCH_TIMEOUT: {BATCH_TIMEOUT}s, MONITOR_INTERVAL: {MONITOR_INTERVAL}s"
)

client = genai.Client(
    api_key=GEMINI_API_KEY,
    http_options=genai.types.HttpOptions(base_url=CUSTOM_BASE_URL),
)

slack = WebClient(token=SLACK_TOKEN)

# 既に通知済みのプロセスを追跡(重複通知を避けるため)
notified_processes = set()


# --- プロセス情報取得 ---
def get_processes():
    """
    ps コマンドでシステム上の全プロセスを取得する。
    戻り値: [{"pid": "...", "user": "...", "cpu": "...", "mem": "...", "cmd": "..."}, ...]
    """
    try:
        # aux フラグで詳細情報を取得
        cmd = ["ps", "aux"]
        result = subprocess.run(
            cmd,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            text=True,
            check=True,
        )

        processes = []
        lines = result.stdout.strip().split("\n")

        # ヘッダースキップ
        for line in lines[1:]:
            parts = line.split()
            if len(parts) >= 11:
                processes.append(
                    {
                        "user": parts[0],
                        "pid": parts[1],
                        "cpu": parts[2],
                        "mem": parts[3],
                        "vsz": parts[4],
                        "rss": parts[5],
                        "cmd": " ".join(parts[10:]),
                    }
                )

        return processes

    except Exception as e:
        print(f"Error getting processes: {e}")
        return []


# --- LLM にプロセスの異常判定を依頼 ---
def analyze_process_single(process_info: dict) -> bool:
    """
    単一のプロセスを LLM で判定する。
    怪しいなら True を返す。
    """
    prompt = f"""
あなたは Linux サーバーのセキュリティ監視 AI です。
以下のプロセス情報が不審かどうか、以下の形式で答えてください。

プロセス情報:
- PID: {process_info['pid']}
- ユーザー: {process_info['user']}
- CPU使用率: {process_info['cpu']}%
- メモリ使用率: {process_info['mem']}%
- VSZ: {process_info['vsz']}
- RSS: {process_info['rss']}
- コマンド: {process_info['cmd']}

判断:
- 不審なら "SUSPICIOUS"
- そうでなければ "OK"

通常のシステムプロセス(kernel threads、systemd関連など)は OK と判定してください。
考察や理由は出力しないでください。
"""

    response = client.models.generate_content(
        model="gemini-2.5-pro",
        contents=prompt,
    )

    if response.text is None:
        print("No response text from LLM")
        return False

    result_text = response.text.strip().upper()
    return "SUSPICIOUS" in result_text


def analyze_processes_batch(processes: list) -> dict:
    """
    複数のプロセスを一度に LLM で判定する。
    戻り値: {pid: bool} の辞書
    """
    if not processes:
        return {}

    # プロセス情報をまとめてプロンプト作成
    processes_text = "\n".join(
        [
            f"{i+1}. PID: {p['pid']}, User: {p['user']}, CPU: {p['cpu']}%, Mem: {p['mem']}%, Cmd: {p['cmd']}"
            for i, p in enumerate(processes)
        ]
    )

    prompt = f"""
あなたは Linux サーバーのセキュリティ監視 AI です。
以下のプロセス情報が不審かどうか、それぞれを判定してください。

プロセス一覧:
{processes_text}

判定結果を以下の JSON フォーマットで出力してください:
{{
  "1": "OK",
  "2": "SUSPICIOUS",
  "3": "OK",
  ...
}}

JSONのキーはプロセス番号、値は "SUSPICIOUS" または "OK" です。
通常のシステムプロセス(kernel threads、systemd関連など)は OK と判定してください。
"""

    response = client.models.generate_content(
        model="gemini-2.5-pro",
        contents=prompt,
    )

    if response.text is None:
        print("No response text from LLM")
        return {p["pid"]: False for p in processes}

    # JSON を抽出して解析
    result_dict = {}
    try:
        # レスポンスから JSON を抽出
        json_str = response.text
        # JSONブロックの開始と終了を探す
        start_idx = json_str.find("{")
        end_idx = json_str.rfind("}") + 1
        if start_idx >= 0 and end_idx > start_idx:
            json_str = json_str[start_idx:end_idx]
            parsed = json.loads(json_str)

            # プロセスごとに判定結果をマッピング
            for idx, process in enumerate(processes, 1):
                result = parsed.get(str(idx), "OK").upper()
                result_dict[process["pid"]] = "SUSPICIOUS" in result
        else:
            # JSON が見つからない場合は全て False
            result_dict = {p["pid"]: False for p in processes}
    except json.JSONDecodeError as e:
        print(f"JSON parse error: {e}")
        result_dict = {p["pid"]: False for p in processes}

    return result_dict


# --- Slack 通知 ---
def notify(process_info: dict):
    message = f"""
:rotating_light: *Suspicious process detected!*
PID: {process_info['pid']}
User: {process_info['user']}
CPU: {process_info['cpu']}% | Mem: {process_info['mem']}%
Command: {process_info['cmd']}
"""
    slack.chat_postMessage(channel=SLACK_CHANNEL, text=message)


# --- フィルタリング関数 ---
def should_check_process(process_info: dict) -> bool:
    """
    監視対象外のプロセスをフィルタリング
    """
    cmd = process_info["cmd"]
    user = process_info["user"]

    # self プロセスは除外
    if "process_monitor_agent" in cmd:
        return False

    # root 関連の明らかに安全なプロセスは除外
    safe_keywords = [
        "kernel",
        "[",
        "]",
        "systemd",
        "sshd",
        "agetty",
        "bash",
        "sh",
        "grep",
        "ps",
        "vim",
        "nano",
        "sudo",
        "root",
        "ls -l",
        "sd-pam",
        "unbound",
        "cron",
        "sleep",
        "less",
        "/sbin/init",
        "canonical-livepatch",
        "snapd",
        "ubuntu-advantage/timer.py",
        "multipathd",
        "journalctl",
        "bpftrace",
        # 開発ツール関連
        "tmux",
        "mysqld",
        "redis-server",
        "code-server",
        "mosquitto",
        "ollama",
        "nginx",
        # postfix 関連
        "pickup -l -t unix -u -c",
        "tlsmgr -l -t unix -u -c",
        "qmgr -l -t unix -u",
        "/usr/lib/postfix/sbin/master",
        # docker 関連
        "dockerd",
        "docker-proxy",
        # apt 関連
        "apt-get",
        "packagekitd",
        "/usr/lib/update-notifier/apt-check",
        "tar -df alternatives.tar.0 -C /var/lib/dpkg alternatives",
        # EDR 関連
        "cbram",
        "cybereason-sensor",
        "cybereason-activeconsole",
    ]
    if any(keyword in cmd for keyword in safe_keywords):
        return False

    return True


# --- プロセス監視メイン処理 ---
def monitor_processes():
    """
    定期的にプロセスを取得して監視
    """
    print("Process monitor started.")

    if BATCH_MODE:
        # バッチモード
        monitor_processes_batch()
    else:
        # 単発モード
        monitor_processes_single()


def monitor_processes_single():
    """単発モード: 各プロセスを即座に判定"""
    while True:
        try:
            processes = get_processes()

            # フィルタリング
            filtered_processes = [p for p in processes if should_check_process(p)]

            for process in filtered_processes:
                try:
                    suspicious = analyze_process_single(process)
                    if suspicious:
                        pid = process["pid"]
                        if pid not in notified_processes:
                            print(f"[!] Suspicious process: {process}")
                            notify(process)
                            notified_processes.add(pid)
                    else:
                        print(f"[OK] {process['cmd']}")

                except Exception as e:
                    print(f"Error analyzing process: {e}")

            time.sleep(MONITOR_INTERVAL)

        except Exception as e:
            print(f"Error in monitor loop: {e}")
            time.sleep(MONITOR_INTERVAL)


def monitor_processes_batch():
    """バッチモード: プロセスをバッチでまとめて判定"""
    process_buffer = deque()
    last_batch_time = time.time()

    while True:
        try:
            current_time = time.time()
            processes = get_processes()

            # フィルタリング
            filtered_processes = [p for p in processes if should_check_process(p)]

            # バッファに追加
            for process in filtered_processes:
                process_buffer.append(process)

            # バッチ処理の条件: BATCH_SIZE に達したか、BATCH_TIMEOUT を超えたか
            should_process = len(process_buffer) >= BATCH_SIZE or (
                len(process_buffer) > 0
                and current_time - last_batch_time >= BATCH_TIMEOUT
            )

            if should_process or current_time - last_batch_time >= MONITOR_INTERVAL:
                try:
                    # バッファ内のプロセスを一度に処理
                    processes_to_process = list(process_buffer)
                    process_buffer.clear()
                    last_batch_time = current_time

                    if processes_to_process:
                        results = analyze_processes_batch(processes_to_process)

                        for process in processes_to_process:
                            pid = process["pid"]
                            is_suspicious = results.get(pid, False)

                            if is_suspicious:
                                if pid not in notified_processes:
                                    print(f"[!] Suspicious process: {process}")
                                    notify(process)
                                    notified_processes.add(pid)
                            else:
                                print(f"[OK] {process['cmd']}")

                except Exception as e:
                    print(f"Error analyzing processes: {e}")

            time.sleep(1)  # CPU負荷を減らすため1秒待機

        except Exception as e:
            print(f"Fatal error: {e}")
            time.sleep(MONITOR_INTERVAL)


if __name__ == "__main__":
    while True:
        try:
            monitor_processes()
        except Exception as e:
            print(f"Fatal error: {e}, restarting in 5 seconds...")
            time.sleep(5)

これを systemd 配下で動作させています

簡単に流れの紹介

  1. ps aux の結果を取得
  2. LLM (gemini) に投げて各プロセスが怪しいか判定してもらう
  3. 怪しい場合は Slack に通知

他のログ監視も同じような仕組みです
株価予測エージェントは上記の流れとだいぶ違うのは違うのですが LLM に過去の株価情報を渡して予測してもらうだけです

感想

以下不正プロセス監視エージェントを使ってみた感想や所感です

お金がかかるので呼び出し方を工夫しなければならない

  • LLM にわたす情報を複数行にまとめて API をコールする
  • 1行ずつだと時間もかかるしお金もかかる
  • ローカル LLM が一つの解決策だがそれなりのマシンスペックは必要

判断するまでの速度

  • API (ネットワーク) + LLM の推論の時間が必ずかかるので判断までに数秒かかる
  • 完全なリアルタイムを実現するのは難しい
  • 一瞬で消えてしまうプロセスをキャッチできないケースがある
  • 速度もローカル LLM で解決できそうだが結局高スペックマシンでもそれなりの推論時間がかかるので数ミリ秒レベルのレスポンス速度は期待できない

精度が曖昧

  • 本当にアウトな場合のテストができない
  • アウトじゃないけどアウトと判定してしまう
  • アウトじゃないプロセスは無視するような仕組みが必要になる (コード内参照)
  • それ(特定のプロセス無視/ホワイトリスト形式)でいいのかという問題もある (同一プロセス名だと攻撃されても検知できなくなる)
  • 前はセーフだったけど次はアウトにしちゃう
  • 「怪しい」という LLM への命令が曖昧すぎるが故の過剰検知とハレーション

そもそもAI エージェントの具体的なユースケースは

  • 人間には難しい「判断」や「解釈」が必要なタスク+状況に応じて行動を変える仕事
  • ログから「人間レベルの洞察」を出す
  • 毎日の情報を「読む → 取捨選択 → 解釈 → 提案」する
  • インシデント予兆検知
  • コードのレポート・改善案
  • などなど、とにかく得意なのは「判断」と「解釈」あと「生成」

(現状は)何かに特化した AI エージェントしか作れない

  • 本当にやりたいことは「何でも」やってくれる汎用 AI エージェント
  • でも現状はやりたいことだけをやってくれる特化 AI エージェントしか作れない
  • もっというと「やりたいこと」すら AI エージェントが見つけて勝手にやってほしい
  • 特化エージェントを組み合わせて汎用っぽく見せかける

AI を使わなくてもいい問題

  • 今回作成した AI エージェントのようなログ監視やプロセス監視の製品はある
  • 判断が静的 (特定の文字列を含む場合にアラート/CPUが90%以上になったらなど) だがそれで十分なケースが多い

そもそも「AI エージェント」の定義は

  • 広義には「人の代わりに作業をやってくれるツール」だと思っている
  • 自分は今回のようなプロセス版のエージェントも VSCode の Copilot Chat のようなエディタもエージェントだと思っている
  • エージェントにもいろいろなタイプがあるので言葉に惑わされてはいけない

既製品を使ってもいい

ブースティング

  • 複数の LLM に判断させてその結果で最終的な判断をする
  • エヴァンゲリオンのマギシステム的なイメージ
  • ただしお金が倍々に増えていくので注意

最後に

AI もあるので AI エージェント自体を作ることは非常に簡単ですが本当に有用なものを作るのは難しいと言うか無くても何とかなるというのが正直なところです

株価予測エージェントは評価機能もあるので評価の結果が良ければ実践投入してどうなるか試してみようかなと思います
売買も自動でできると更に良いかなと思っています

0 件のコメント:

コメントを投稿