Amazon Bedrock Knowledge Base同期を自動化するAPI開発記録 – Kiroと一緒に作った開発体験

AWS

はじめに

Amazon Bedrockを使ったRAG(Retrieval-Augmented Generation)システムを運用していると、Knowledge Baseのデータ同期が課題になることがあります。今回、手動で行っていた同期作業を自動化するため、AI開発アシスタント「Kiro」と一緒に専用のAPIを開発しました。

この記事では、Kiroとのペアプログラミング体験を含めて、開発の背景から実装、そして運用まで、実際に遭遇した問題とその解決方法を詳しく解説します。

開発の背景と課題

既存の運用フロー

私たちのシステムでは、以下のような運用を行っていました:

  1. 夜間バッチでS3にファイルアップロード
  • ローカルマシンでbatファイルやPythonスクリプトを実行
  • 大量のドキュメントを定期的にS3バケットに配置
  1. 手動でKnowledge Base同期
  • AWSコンソールにログイン
  • Bedrock Knowledge Baseの画面で手動同期を実行
  • 同期完了まで待機

課題

  • 手動作業の負荷: 毎回コンソールにアクセスして同期を実行
  • タイミングの制約: 夜間アップロード後、翌朝まで同期されない
  • スケーラビリティの問題: 複数のKnowledge Baseや頻繁な更新に対応困難
  • 自動化の欠如: CI/CDパイプラインに組み込めない

解決策:同期API の開発 – Kiroとの協働開発

Kiroとの開発プロセス

今回の開発では、AI開発アシスタント「Kiro」と一緒にペアプログラミングを行いました。Kiroは以下のような支援を提供してくれました:

  • 要件定義の整理: 曖昧な要求を具体的な仕様に落とし込み
  • アーキテクチャ設計: ベストプラクティスに基づいた設計提案
  • コード実装: 実際のPythonコードの生成と最適化
  • トラブルシューティング: エラー解析と解決策の提案
  • ドキュメント作成: 包括的な運用ドキュメントの生成

設計方針

Kiroと議論しながら、以下の方針でAPIを設計しました:

  1. RESTful API: 標準的なHTTP APIとして実装
  2. サーバーレス: AWS Lambda + API Gatewayで運用コストを最小化
  3. セキュア: 適切なIAM権限とAPI Key認証
  4. 監視可能: 構造化ログとCloudWatch統合
  5. エラーハンドリング: 詳細なエラー情報と適切なHTTPステータス

アーキテクチャ

[クライアント] → [API Gateway] → [Lambda] → [Bedrock API]
                      ↓
                [CloudWatch Logs]

Kiroは、このシンプルなアーキテクチャが運用コストと複雑さのバランスを取る最適解であることを提案してくれました。

実装の詳細

API仕様

エンドポイント: POST /sync

リクエスト形式:

{
  "knowledgeBaseId": "XXXXXXXXXX",
  "dataSourceId": "YYYYYYYYYY"
}

成功レスポンス:

{
  "status": "success",
  "jobId": "HDVT5KDVYH",
  "knowledgeBaseId": "XXXXXXXXXX",
  "timestamp": "2025-09-15T01:39:33.027825Z",
  "message": "同期ジョブが正常に開始されました"
}

主要コンポーネント

1. Lambda関数のメインハンドラー

"""
Amazon Bedrock Knowledge Base 同期API
Lambda関数のメインハンドラー
"""

import json
import logging
import os
import sys
from typing import Dict, Any

# Lambda環境でのモジュール検索パスを設定
if '/var/task' not in sys.path:
    sys.path.insert(0, '/var/task')

try:
    from validation import validate_sync_request
    from bedrock_service import BedrockService
    from response_handler import ResponseHandler
    from request_tracker import RequestTrackerManager
    from exceptions import (
        ValidationError,
        BedrockSyncError,
        BedrockServiceUnavailableError,
        TimeoutError
    )
except ImportError as e:
    # インポートエラーの詳細をログに出力
    print(f"Import error: {e}")
    print(f"Python path: {sys.path}")
    print(f"Current working directory: {os.getcwd()}")
    raise

# ログ設定
logger = logging.getLogger()
logger.setLevel(os.environ.get('LOG_LEVEL', 'INFO'))


def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]:
    """
    Lambda関数のメインハンドラー

    Args:
        event: API Gatewayからのイベントデータ
        context: Lambda実行コンテキスト

    Returns:
        API Gatewayレスポンス形式の辞書
    """
    # リクエストトラッカーの初期化
    request_id = context.aws_request_id
    tracker = RequestTrackerManager.create_tracker(request_id)

    try:
        # 1. リクエスト検証
        with tracker.track_operation("request_validation"):
            sync_request = validate_sync_request(event)
            tracker.log_request_received(sync_request, client_info)
            tracker.log_validation_success()

        # 2. Bedrock API呼び出し
        with tracker.track_operation("bedrock_api_call"):
            bedrock_service = BedrockService()

            job_id = bedrock_service.start_ingestion_job(
                knowledge_base_id=sync_request.knowledge_base_id,
                data_source_id=sync_request.data_source_id,
                request_id=request_id
            )

            tracker.log_bedrock_call_success(job_id, {'job_id': job_id})

        # 3. 成功レスポンスの生成
        response = ResponseHandler.create_success_response(
            job_id=job_id,
            knowledge_base_id=sync_request.knowledge_base_id,
            message="同期ジョブが正常に開始されました"
        )

        return response

    except ValidationError as e:
        # バリデーションエラーの処理
        tracker.log_error(e, "validation", "リクエストバリデーションエラー")
        response = ResponseHandler.create_validation_error_response(
            error_code=e.error_code,
            message=e.message
        )
        return response

    except BedrockSyncError as e:
        # Bedrock関連エラーの処理
        tracker.log_error(e, "bedrock_api", "Bedrock APIエラー")
        response = ResponseHandler.create_error_response(e)
        return response

2. バリデーション機能

class RequestValidator:
    """リクエスト検証クラス"""

    # JSONスキーマ定義
    SYNC_REQUEST_SCHEMA = {
        "type": "object",
        "properties": {
            "knowledgeBaseId": {
                "type": "string",
                "pattern": r"^[A-Z0-9]{10}$",  # Bedrock Knowledge Base ID形式
                "minLength": 10,
                "maxLength": 10
            },
            "dataSourceId": {
                "type": "string", 
                "pattern": r"^[A-Z0-9]{10}$",  # Bedrock Data Source ID形式
                "minLength": 10,
                "maxLength": 10
            }
        },
        "required": ["knowledgeBaseId", "dataSourceId"],
        "additionalProperties": False
    }

    @staticmethod
    def parse_json(json_string: str) -> Dict[str, Any]:
        """
        JSON文字列を解析

        Args:
            json_string: JSON文字列

        Returns:
            解析されたデータ

        Raises:
            ValidationError: JSON解析に失敗した場合
        """
        try:
            if not json_string or json_string.strip() == "":
                raise ValidationError(
                    "リクエストボディが空です",
                    "EMPTY_REQUEST_BODY"
                )

            data = json.loads(json_string)

            if not isinstance(data, dict):
                raise ValidationError(
                    "リクエストボディはJSONオブジェクトである必要があります",
                    "INVALID_REQUEST_FORMAT"
                )

            return data

        except json.JSONDecodeError as e:
            logger.error(f"JSON解析エラー: {str(e)}")
            raise ValidationError(
                "無効なJSON形式です",
                "INVALID_JSON_FORMAT"
            )

3. Bedrockサービス統合

class BedrockService:
    """
    Amazon Bedrock APIとの統合を管理するサービスクラス
    """

    def __init__(self, region_name: Optional[str] = None, max_retries: int = 3):
        """
        BedrockServiceを初期化

        Args:
            region_name: AWSリージョン名(省略時は環境変数から取得)
            max_retries: API呼び出しの最大リトライ回数
        """
        self.region_name = region_name
        self._client = None
        self._error_handler = BedrockErrorHandler(max_retries=max_retries)
        self._initialize_client()

    def _initialize_client(self) -> None:
        """
        Bedrockクライアントを初期化

        Raises:
            BedrockServiceUnavailableError: クライアント初期化に失敗した場合
        """
        try:
            client_config = {}
            if self.region_name:
                client_config['region_name'] = self.region_name

            self._client = boto3.client('bedrock-agent', **client_config)

            # クライアントの動作確認(軽量なAPI呼び出し)
            self._validate_client_connection()

            logger.info(f"Bedrockクライアントを初期化しました (リージョン: {self._client.meta.region_name})")

        except Exception as e:
            logger.error(f"Bedrockクライアントの初期化に失敗: {str(e)}")
            raise BedrockServiceUnavailableError(
                details={'error': str(e), 'region': self.region_name}
            )

    def start_ingestion_job(self, knowledge_base_id: str, data_source_id: str, request_id: str) -> str:
        """
        Knowledge Base同期ジョブを開始

        Args:
            knowledge_base_id: Knowledge Base ID
            data_source_id: Data Source ID
            request_id: リクエストID(ログ用)

        Returns:
            開始されたジョブのID

        Raises:
            BedrockSyncError: 同期ジョブの開始に失敗した場合
        """
        try:
            logger.info(f"同期ジョブ開始: KB={knowledge_base_id}, DS={data_source_id}")

            response = self._client.start_ingestion_job(
                knowledgeBaseId=knowledge_base_id,
                dataSourceId=data_source_id
            )

            job_id = response['ingestionJob']['ingestionJobId']
            logger.info(f"同期ジョブが正常に開始されました: job_id={job_id}")

            return job_id

        except ClientError as e:
            logger.error(f"Bedrock API呼び出しエラー: {str(e)}")
            raise map_boto3_error_to_custom_error(e)
        except Exception as e:
            logger.error(f"予期しないエラー: {str(e)}")
            raise BedrockSyncError(f"同期ジョブの開始に失敗しました: {str(e)}")

4. レスポンスハンドラー

class ResponseHandler:
    """レスポンス生成ハンドラークラス"""

    @staticmethod
    def create_success_response(
        job_id: str,
        knowledge_base_id: str,
        message: str = "同期ジョブが正常に開始されました"
    ) -> Dict[str, Any]:
        """
        成功レスポンスを作成

        Args:
            job_id: BedrockジョブID
            knowledge_base_id: Knowledge Base ID
            message: 成功メッセージ

        Returns:
            API Gateway形式の成功レスポンス
        """
        try:
            response_data = SyncResponse.create_success(job_id, knowledge_base_id, message)

            logger.info(f"成功レスポンス作成: job_id={job_id}, kb_id={knowledge_base_id}")

            return ResponseHandler._create_api_response(
                status_code=HTTPStatusCodes.OK,
                body=response_data.to_dict()
            )

        except Exception as e:
            logger.error(f"成功レスポンス作成中にエラー: {str(e)}")
            # フォールバック用の基本的な成功レスポンス
            return ResponseHandler._create_api_response(
                status_code=HTTPStatusCodes.OK,
                body={
                    "status": "success",
                    "jobId": job_id,
                    "knowledgeBaseId": knowledge_base_id,
                    "message": message,
                    "timestamp": ResponseHandler._get_current_timestamp()
                }
            )

    @staticmethod
    def _create_api_response(
        status_code: int,
        body: Dict[str, Any],
        additional_headers: Optional[Dict[str, str]] = None
    ) -> Dict[str, Any]:
        """
        API Gateway形式のレスポンスを作成

        Args:
            status_code: HTTPステータスコード
            body: レスポンスボディ
            additional_headers: 追加ヘッダー

        Returns:
            API Gateway形式のレスポンス
        """
        headers = ResponseHandler._get_default_headers()

        if additional_headers:
            headers.update(additional_headers)

        return {
            'statusCode': status_code,
            'headers': headers,
            'body': json.dumps(body, ensure_ascii=False)
        }

    @staticmethod
    def _get_default_headers() -> Dict[str, str]:
        """
        デフォルトヘッダーを取得

        Returns:
            デフォルトヘッダー辞書
        """
        return {
            'Content-Type': 'application/json',
            'Access-Control-Allow-Origin': '*',
            'Access-Control-Allow-Methods': 'POST, OPTIONS',
            'Access-Control-Allow-Headers': 'Content-Type, Authorization'
        }

5. 構造化ログの例

{
    "timestamp": "2025-09-15T01:39:33.027825Z",
    "level": "INFO",
    "request_id": "12345678-1234-1234-1234-123456789012",
    "action": "bedrock_call_success",
    "knowledge_base_id": "XXXXXXXXXX",
    "data_source_id": "YYYYYYYYYY",
    "details": {
        "job_id": "HDVT5KDVYH",
        "operation_time_seconds": 1.089
    }
}

開発中に遭遇した問題と解決 – Kiroとのトラブルシューティング

開発過程では様々な問題に遭遇しましたが、Kiroの支援により効率的に解決できました。

1. Runtime.ImportModuleError – Kiroによる迅速な問題特定

問題: Lambda関数で相対インポートエラーが発生

Runtime.ImportModuleError: Unable to import module 'lambda_function': 
attempted relative import with no known parent package

原因:

  • Python 3.13を使用
  • 相対インポート(from .module import)を使用

解決策:

  • ランタイムをPython 3.11に変更
  • 相対インポートを絶対インポートに修正
# 修正前(エラーになる)
from .bedrock_service import BedrockService
from .validation import validate_sync_request
from .response_handler import ResponseHandler

# 修正後(正常動作)
from bedrock_service import BedrockService
from validation import validate_sync_request
from response_handler import ResponseHandler

# さらに、Lambda環境でのモジュール検索パスを設定
import sys
if '/var/task' not in sys.path:
    sys.path.insert(0, '/var/task')

try:
    from validation import validate_sync_request
    from bedrock_service import BedrockService
    from response_handler import ResponseHandler
    # ... 他のインポート
except ImportError as e:
    # インポートエラーの詳細をログに出力
    print(f"Import error: {e}")
    print(f"Python path: {sys.path}")
    print(f"Current working directory: {os.getcwd()}")
    raise

2. UUID形式の誤解

問題: Bedrock Knowledge Base IDをUUID形式として検証していた

# 間違った検証パターン(UUIDを想定)
SYNC_REQUEST_SCHEMA = {
    "properties": {
        "knowledgeBaseId": {
            "type": "string",
            "pattern": r"^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$"
        }
    }
}

実際のID形式: XXXXXXXXXX(10文字の英数字)

解決策:

# 正しい検証パターン(Bedrock ID形式)
SYNC_REQUEST_SCHEMA = {
    "type": "object",
    "properties": {
        "knowledgeBaseId": {
            "type": "string",
            "pattern": r"^[A-Z0-9]{10}$",  # Bedrock Knowledge Base ID形式
            "minLength": 10,
            "maxLength": 10
        },
        "dataSourceId": {
            "type": "string", 
            "pattern": r"^[A-Z0-9]{10}$",  # Bedrock Data Source ID形式
            "minLength": 10,
            "maxLength": 10
        }
    },
    "required": ["knowledgeBaseId", "dataSourceId"],
    "additionalProperties": False
}

# エラーメッセージも更新
if not re.match(pattern, value, re.IGNORECASE):
    if field_name in ["knowledgeBaseId", "dataSourceId"]:
        raise ValidationError(
            f"パラメータ '{field_name}' は有効なBedrock ID形式である必要があります(10文字の英数字)",
            "INVALID_BEDROCK_ID_FORMAT"
        )

3. IAM権限の最適化

問題: 最小権限では動作しない

最初に試した権限:

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "bedrock:StartIngestionJob",
        "bedrock:GetKnowledgeBase",
        "bedrock:GetDataSource"
      ],
      "Resource": "*"
    }
  ]
}

エラー: BedrockAccessDeniedError - Bedrockサービスへのアクセスが拒否されました

解決策: Bedrockの内部処理で追加のAPIが必要

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "BedrockFullAccess",
      "Effect": "Allow",
      "Action": ["bedrock:*"],
      "Resource": "*"
    },
    {
      "Sid": "CloudWatchLogsAccess",
      "Effect": "Allow",
      "Action": [
        "logs:CreateLogGroup",
        "logs:CreateLogStream",
        "logs:PutLogEvents"
      ],
      "Resource": "arn:aws:logs:*:*:*"
    }
  ]
}

学んだこと: Knowledge Base操作には、ドキュメントに記載されていない内部的なAPI呼び出しが含まれるため、bedrock:* の広い権限が必要でした。

Kiroの貢献: 各問題に対して、Kiroは段階的なアプローチを提案し、最小権限から始めて徐々に権限を拡張する方法を教えてくれました。また、CloudWatchログの分析方法や、効率的なデバッグ手法も学ぶことができました。

デプロイメントと運用

デプロイ手順

1. デプロイパッケージ作成

# deploy/create_deployment_package.py
import os
import shutil
import subprocess
import tempfile
import zipfile
from pathlib import Path

def create_deployment_package():
    """
    Lambda デプロイパッケージを作成
    """
    print("Lambda デプロイパッケージの作成を開始します...")

    # 一時ディレクトリの作成
    with tempfile.TemporaryDirectory() as temp_dir:
        print(f"一時ディレクトリ: {temp_dir}")

        # 依存関係のインストール
        print("依存関係をインストール中...")
        subprocess.run([
            "pip", "install", 
            "-r", "deploy/requirements-lambda.txt",
            "-t", temp_dir
        ], check=True)

        # ソースコードのコピー
        print("ソースコードをコピー中...")
        src_dir = Path("src")
        for py_file in src_dir.glob("*.py"):
            shutil.copy2(py_file, temp_dir)
            print(f"  コピー: {py_file.name}")

        # ZIPパッケージの作成
        print("ZIPパッケージを作成中...")
        zip_path = "bedrock-kb-sync-lambda.zip"

        with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
            for root, dirs, files in os.walk(temp_dir):
                for file in files:
                    file_path = os.path.join(root, file)
                    arc_name = os.path.relpath(file_path, temp_dir)
                    zipf.write(file_path, arc_name)
                    print(f"  追加: {arc_name}")

        # パッケージサイズの確認
        size_mb = os.path.getsize(zip_path) / (1024 * 1024)
        print(f"デプロイパッケージの作成が完了しました: {zip_path}")
        print(f"パッケージサイズ: {size_mb:.2f} MB")

        if size_mb > 50:
            print("⚠️  警告: パッケージサイズが50MBを超えています")

if __name__ == "__main__":
    create_deployment_package()
# 実行
python deploy/create_deployment_package.py

2. IAMロール作成

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "BedrockFullAccess",
      "Effect": "Allow",
      "Action": ["bedrock:*"],
      "Resource": "*"
    },
    {
      "Sid": "CloudWatchLogsAccess",
      "Effect": "Allow",
      "Action": [
        "logs:CreateLogGroup",
        "logs:CreateLogStream", 
        "logs:PutLogEvents"
      ],
      "Resource": "arn:aws:logs:*:*:*"
    }
  ]
}

3. Lambda関数作成

# AWS CLI を使用した作成例
aws lambda create-function \
  --function-name bedrock-kb-sync-function \
  --runtime python3.11 \
  --role arn:aws:iam::ACCOUNT-ID:role/bedrock-kb-sync-lambda-role \
  --handler lambda_function.lambda_handler \
  --zip-file fileb://bedrock-kb-sync-lambda.zip \
  --timeout 30 \
  --memory-size 256 \
  --environment Variables='{
    "LOG_LEVEL": "INFO",
    "BEDROCK_REGION": "ap-northeast-1"
  }'

4. API Gateway設定

# HTTP API作成
aws apigatewayv2 create-api \
  --name bedrock-kb-sync-api \
  --protocol-type HTTP \
  --description "Bedrock Knowledge Base同期API"

# Lambda統合作成
aws apigatewayv2 create-integration \
  --api-id YOUR-API-ID \
  --integration-type AWS_PROXY \
  --integration-uri arn:aws:lambda:ap-northeast-1:ACCOUNT-ID:function:bedrock-kb-sync-function \
  --payload-format-version 2.0

# ルート作成
aws apigatewayv2 create-route \
  --api-id YOUR-API-ID \
  --route-key "POST /sync" \
  --target integrations/YOUR-INTEGRATION-ID

動作確認

# 正常ケース
curl -X POST https://your-api-id.execute-api.ap-northeast-1.amazonaws.com/prod/sync \
  -H "Content-Type: application/json" \
  -d '{"knowledgeBaseId": "XXXXXXXXXX","dataSourceId": "YYYYYYYYYY"}' \
  -w "\nHTTP Status: %{http_code}\nResponse Time: %{time_total}s\n"

# 期待されるレスポンス
{
  "status": "success",
  "jobId": "HDVT5KDVYH",
  "knowledgeBaseId": "XXXXXXXXXX",
  "timestamp": "2025-09-15T01:39:33.027825Z",
  "message": "同期ジョブが正常に開始されました"
}
# エラーケースのテスト
curl -X POST https://your-api-id.execute-api.ap-northeast-1.amazonaws.com/prod/sync \
  -H "Content-Type: application/json" \
  -d '{}' \
  -w "\nHTTP Status: %{http_code}\n"

# 期待されるエラーレスポンス(400 Bad Request)
{
  "status": "error",
  "errorCode": "MISSING_PARAMETER",
  "message": "必須パラメータ 'knowledgeBaseId' が不足しています",
  "timestamp": "2025-09-15T01:39:33.027825Z"
}

包括的なテストスクリプト

#!/bin/bash
# test-api.sh - API動作確認スクリプト

API_URL="https://your-api-id.execute-api.ap-northeast-1.amazonaws.com/prod/sync"
KB_ID="XXXXXXXXXX"
DS_ID="YYYYYYYYYY"

echo "=== Bedrock Knowledge Base Sync API テスト ==="
echo ""

# 1. 正常ケース
echo "1. 正常ケースのテスト:"
response=$(curl -s -w "%{http_code}" -X POST "$API_URL" \
  -H "Content-Type: application/json" \
  -d "{\"knowledgeBaseId\":\"$KB_ID\",\"dataSourceId\":\"$DS_ID\"}")

http_code="${response: -3}"
response_body="${response%???}"

if [ "$http_code" -eq 200 ]; then
  job_id=$(echo "$response_body" | jq -r '.jobId')
  echo "✅ 成功 - ジョブID: $job_id"
else
  echo "❌ 失敗 - HTTP $http_code"
  echo "$response_body"
fi

echo ""

# 2. バリデーションエラー
echo "2. バリデーションエラーのテスト:"
response=$(curl -s -w "%{http_code}" -X POST "$API_URL" \
  -H "Content-Type: application/json" \
  -d '{}')

http_code="${response: -3}"
if [ "$http_code" -eq 400 ]; then
  echo "✅ 期待通りのバリデーションエラー: HTTP $http_code"
else
  echo "❌ 予期しないレスポンス: HTTP $http_code"
fi

echo ""

# 3. 不正JSON
echo "3. 不正JSONのテスト:"
response=$(curl -s -w "%{http_code}" -X POST "$API_URL" \
  -H "Content-Type: application/json" \
  -d 'invalid json')

http_code="${response: -3}"
if [ "$http_code" -eq 400 ]; then
  echo "✅ 期待通りのJSONエラー: HTTP $http_code"
else
  echo "❌ 予期しないレスポンス: HTTP $http_code"
fi

echo ""
echo "テスト完了"

運用での効果

Before(手動運用)

  • 同期頻度: 1日1回(手動)
  • 作業時間: 5-10分/回
  • エラー対応: 手動確認とリトライ
  • スケーラビリティ: 限定的

After(API自動化)

  • 同期頻度: 必要に応じて随時
  • 作業時間: 0分(完全自動化)
  • エラー対応: 構造化ログで詳細把握
  • スケーラビリティ: 複数KB対応可能

具体的な改善例

  1. 夜間バッチとの連携
# S3アップロード後に自動同期
import requests
import logging

def upload_and_sync():
    """
    S3にファイルをアップロードした後、自動的にKnowledge Baseを同期
    """
    try:
        # S3にファイルアップロード
        upload_to_s3(files)
        logging.info("S3へのファイルアップロードが完了しました")

        # 同期API呼び出し
        sync_url = 'https://api-id.execute-api.ap-northeast-1.amazonaws.com/prod/sync'
        payload = {
            'knowledgeBaseId': 'XXXXXXXXXX',  # 実際のKnowledge Base ID
            'dataSourceId': 'YYYYYYYYYY'     # 実際のData Source ID
        }

        response = requests.post(
            sync_url,
            json=payload,
            headers={'Content-Type': 'application/json'},
            timeout=30
        )

        if response.status_code == 200:
            result = response.json()
            job_id = result['jobId']
            logging.info(f"同期ジョブが正常に開始されました: {job_id}")
            return job_id
        else:
            logging.error(f"同期API呼び出しエラー: {response.status_code} - {response.text}")
            raise Exception(f"同期に失敗しました: {response.status_code}")

    except requests.exceptions.RequestException as e:
        logging.error(f"API呼び出し中にネットワークエラーが発生: {str(e)}")
        raise
    except Exception as e:
        logging.error(f"予期しないエラーが発生: {str(e)}")
        raise

# 使用例
if __name__ == "__main__":
    try:
        job_id = upload_and_sync()
        print(f"処理完了: ジョブID {job_id}")
    except Exception as e:
        print(f"処理失敗: {str(e)}")
        exit(1)
  1. CI/CDパイプライン統合
# GitHub Actions例
name: Deploy and Sync Knowledge Base

on:
  push:
    branches: [ main ]
    paths: [ 'documents/**' ]

jobs:
  deploy-and-sync:
    runs-on: ubuntu-latest
    steps:
    - uses: actions/checkout@v3

    - name: Upload documents to S3
      run: |
        aws s3 sync documents/ s3://${{ vars.S3_BUCKET }}/documents/ \
          --delete \
          --exclude "*.tmp"

    - name: Sync Knowledge Base
      run: |
        response=$(curl -s -w "%{http_code}" -X POST ${{ secrets.SYNC_API_URL }} \
          -H "Content-Type: application/json" \
          -H "X-API-Key: ${{ secrets.API_KEY }}" \
          -d '{"knowledgeBaseId": "${{ vars.KB_ID }}", "dataSourceId": "${{ vars.DS_ID }}"}')

        http_code="${response: -3}"
        response_body="${response%???}"

        if [ "$http_code" -eq 200 ]; then
          job_id=$(echo "$response_body" | jq -r '.jobId')
          echo "✅ 同期ジョブが開始されました: $job_id"
        else
          echo "❌ 同期に失敗しました: HTTP $http_code"
          echo "$response_body"
          exit 1
        fi

    - name: Notify Slack
      if: always()
      uses: 8398a7/action-slack@v3
      with:
        status: ${{ job.status }}
        text: "Knowledge Base同期: ${{ job.status }}"
      env:
        SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK }}
  1. Lambda関数からの呼び出し
# 他のLambda関数からAPI呼び出し
import json
import urllib3
import os

def lambda_handler(event, context):
    """
    S3イベントトリガーでKnowledge Base同期を実行
    """
    http = urllib3.PoolManager()

    # S3イベントから情報を取得
    bucket = event['Records'][0]['s3']['bucket']['name']
    key = event['Records'][0]['s3']['object']['key']

    # ドキュメントフォルダの変更のみ処理
    if not key.startswith('documents/'):
        return {'statusCode': 200, 'body': 'スキップ: ドキュメント以外の変更'}

    try:
        # 同期API呼び出し
        sync_payload = {
            'knowledgeBaseId': os.environ['KNOWLEDGE_BASE_ID'],
            'dataSourceId': os.environ['DATA_SOURCE_ID']
        }

        response = http.request(
            'POST',
            os.environ['SYNC_API_URL'],
            body=json.dumps(sync_payload),
            headers={
                'Content-Type': 'application/json',
                'X-API-Key': os.environ['API_KEY']
            }
        )

        if response.status == 200:
            result = json.loads(response.data.decode('utf-8'))
            print(f"同期ジョブ開始: {result['jobId']}")

            return {
                'statusCode': 200,
                'body': json.dumps({
                    'message': '同期が正常に開始されました',
                    'jobId': result['jobId'],
                    'trigger': f's3://{bucket}/{key}'
                })
            }
        else:
            print(f"同期API呼び出しエラー: {response.status}")
            return {
                'statusCode': 500,
                'body': json.dumps({'error': f'同期に失敗しました: {response.status}'})
            }

    except Exception as e:
        print(f"エラー: {str(e)}")
        return {
            'statusCode': 500,
            'body': json.dumps({'error': str(e)})
        }

監視とトラブルシューティング

CloudWatch監視

設定したアラーム:

  • Lambda関数エラー率 > 5%
  • Lambda関数実行時間 > 25秒
  • API Gateway 5xxエラー > 10件/5分

よくある問題と対処法

問題症状対処法
権限不足403 ForbiddenIAMポリシーの確認・更新
ID不正400 Bad RequestKnowledge Base/Data Source IDの確認
タイムアウト504 Gateway TimeoutLambda実行時間の延長

まとめ – Kiroとの協働開発で得られた成果

Amazon Bedrock Knowledge Baseの同期を自動化するAPIを、Kiroと一緒に開発することで、以下の成果を得られました:

技術的成果

  • 完全自動化: 手動作業の排除
  • 高い可用性: サーバーレスアーキテクチャ
  • 詳細監視: 構造化ログとメトリクス
  • セキュア: 適切なIAM権限設定

運用面の改善

  • 効率化: 作業時間の大幅削減
  • 柔軟性: 様々なタイミングでの同期実行
  • スケーラビリティ: 複数Knowledge Baseへの対応
  • 信頼性: エラーハンドリングと監視

Kiroとの協働で学んだ教訓

  1. AI支援開発の効果: 複雑な問題も段階的に解決可能
  2. ペアプログラミングの価値: 人間の創造性とAIの知識が相乗効果
  3. 継続的な学習: Kiroとの対話を通じてベストプラクティスを習得
  4. 品質向上: AIによるコードレビューで見落としを防止
  5. ドキュメント化の自動化: 包括的な運用ドキュメントを効率的に作成

Kiroとの開発体験

  • 迅速な問題解決: エラーメッセージから即座に原因を特定
  • コード品質向上: ベストプラクティスに基づいた実装提案
  • 学習効果: 開発過程で新しい技術や手法を習得
  • 効率的な開発: 繰り返し作業の自動化と最適化

今後の展開

現在検討している機能拡張:

  1. バッチ同期: 複数Knowledge Baseの一括同期
  2. スケジュール同期: CloudWatch Eventsとの連携
  3. 同期状況確認: ジョブステータス取得API
  4. Webhook通知: 同期完了時の通知機能

Kiroについて

今回の開発で使用した「Kiro」は、開発者向けのAIアシスタントです。コード生成、デバッグ、アーキテクチャ設計、ドキュメント作成など、開発の全工程をサポートしてくれます。

特に印象的だったのは:

  • 文脈理解: プロジェクト全体を把握した上での適切な提案
  • 実践的な知識: 実際の運用を考慮したベストプラクティスの提示
  • 学習支援: 単なるコード生成ではなく、理解を深める説明

Amazon Bedrockを使ったRAGシステムの運用で同様の課題を抱えている方、そしてAI支援開発に興味がある方の参考になれば幸いです。


参考リンク

コメント

タイトルとURLをコピーしました