はじめに
Amazon Bedrockを使ったRAG(Retrieval-Augmented Generation)システムを運用していると、Knowledge Baseのデータ同期が課題になることがあります。今回、手動で行っていた同期作業を自動化するため、AI開発アシスタント「Kiro」と一緒に専用のAPIを開発しました。
この記事では、Kiroとのペアプログラミング体験を含めて、開発の背景から実装、そして運用まで、実際に遭遇した問題とその解決方法を詳しく解説します。
開発の背景と課題
既存の運用フロー
私たちのシステムでは、以下のような運用を行っていました:
- 夜間バッチでS3にファイルアップロード
- ローカルマシンでbatファイルやPythonスクリプトを実行
- 大量のドキュメントを定期的にS3バケットに配置
- 手動でKnowledge Base同期
- AWSコンソールにログイン
- Bedrock Knowledge Baseの画面で手動同期を実行
- 同期完了まで待機
課題
- 手動作業の負荷: 毎回コンソールにアクセスして同期を実行
- タイミングの制約: 夜間アップロード後、翌朝まで同期されない
- スケーラビリティの問題: 複数のKnowledge Baseや頻繁な更新に対応困難
- 自動化の欠如: CI/CDパイプラインに組み込めない
解決策:同期API の開発 – Kiroとの協働開発
Kiroとの開発プロセス
今回の開発では、AI開発アシスタント「Kiro」と一緒にペアプログラミングを行いました。Kiroは以下のような支援を提供してくれました:
- 要件定義の整理: 曖昧な要求を具体的な仕様に落とし込み
- アーキテクチャ設計: ベストプラクティスに基づいた設計提案
- コード実装: 実際のPythonコードの生成と最適化
- トラブルシューティング: エラー解析と解決策の提案
- ドキュメント作成: 包括的な運用ドキュメントの生成
設計方針
Kiroと議論しながら、以下の方針でAPIを設計しました:
- RESTful API: 標準的なHTTP APIとして実装
- サーバーレス: AWS Lambda + API Gatewayで運用コストを最小化
- セキュア: 適切なIAM権限とAPI Key認証
- 監視可能: 構造化ログとCloudWatch統合
- エラーハンドリング: 詳細なエラー情報と適切なHTTPステータス
アーキテクチャ
[クライアント] → [API Gateway] → [Lambda] → [Bedrock API]
↓
[CloudWatch Logs]
Kiroは、このシンプルなアーキテクチャが運用コストと複雑さのバランスを取る最適解であることを提案してくれました。
実装の詳細
API仕様
エンドポイント: POST /sync
リクエスト形式:
{
"knowledgeBaseId": "XXXXXXXXXX",
"dataSourceId": "YYYYYYYYYY"
}
成功レスポンス:
{
"status": "success",
"jobId": "HDVT5KDVYH",
"knowledgeBaseId": "XXXXXXXXXX",
"timestamp": "2025-09-15T01:39:33.027825Z",
"message": "同期ジョブが正常に開始されました"
}
主要コンポーネント
1. Lambda関数のメインハンドラー
"""
Amazon Bedrock Knowledge Base 同期API
Lambda関数のメインハンドラー
"""
import json
import logging
import os
import sys
from typing import Dict, Any
# Lambda環境でのモジュール検索パスを設定
if '/var/task' not in sys.path:
sys.path.insert(0, '/var/task')
try:
from validation import validate_sync_request
from bedrock_service import BedrockService
from response_handler import ResponseHandler
from request_tracker import RequestTrackerManager
from exceptions import (
ValidationError,
BedrockSyncError,
BedrockServiceUnavailableError,
TimeoutError
)
except ImportError as e:
# インポートエラーの詳細をログに出力
print(f"Import error: {e}")
print(f"Python path: {sys.path}")
print(f"Current working directory: {os.getcwd()}")
raise
# ログ設定
logger = logging.getLogger()
logger.setLevel(os.environ.get('LOG_LEVEL', 'INFO'))
def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]:
"""
Lambda関数のメインハンドラー
Args:
event: API Gatewayからのイベントデータ
context: Lambda実行コンテキスト
Returns:
API Gatewayレスポンス形式の辞書
"""
# リクエストトラッカーの初期化
request_id = context.aws_request_id
tracker = RequestTrackerManager.create_tracker(request_id)
try:
# 1. リクエスト検証
with tracker.track_operation("request_validation"):
sync_request = validate_sync_request(event)
tracker.log_request_received(sync_request, client_info)
tracker.log_validation_success()
# 2. Bedrock API呼び出し
with tracker.track_operation("bedrock_api_call"):
bedrock_service = BedrockService()
job_id = bedrock_service.start_ingestion_job(
knowledge_base_id=sync_request.knowledge_base_id,
data_source_id=sync_request.data_source_id,
request_id=request_id
)
tracker.log_bedrock_call_success(job_id, {'job_id': job_id})
# 3. 成功レスポンスの生成
response = ResponseHandler.create_success_response(
job_id=job_id,
knowledge_base_id=sync_request.knowledge_base_id,
message="同期ジョブが正常に開始されました"
)
return response
except ValidationError as e:
# バリデーションエラーの処理
tracker.log_error(e, "validation", "リクエストバリデーションエラー")
response = ResponseHandler.create_validation_error_response(
error_code=e.error_code,
message=e.message
)
return response
except BedrockSyncError as e:
# Bedrock関連エラーの処理
tracker.log_error(e, "bedrock_api", "Bedrock APIエラー")
response = ResponseHandler.create_error_response(e)
return response
2. バリデーション機能
class RequestValidator:
"""リクエスト検証クラス"""
# JSONスキーマ定義
SYNC_REQUEST_SCHEMA = {
"type": "object",
"properties": {
"knowledgeBaseId": {
"type": "string",
"pattern": r"^[A-Z0-9]{10}$", # Bedrock Knowledge Base ID形式
"minLength": 10,
"maxLength": 10
},
"dataSourceId": {
"type": "string",
"pattern": r"^[A-Z0-9]{10}$", # Bedrock Data Source ID形式
"minLength": 10,
"maxLength": 10
}
},
"required": ["knowledgeBaseId", "dataSourceId"],
"additionalProperties": False
}
@staticmethod
def parse_json(json_string: str) -> Dict[str, Any]:
"""
JSON文字列を解析
Args:
json_string: JSON文字列
Returns:
解析されたデータ
Raises:
ValidationError: JSON解析に失敗した場合
"""
try:
if not json_string or json_string.strip() == "":
raise ValidationError(
"リクエストボディが空です",
"EMPTY_REQUEST_BODY"
)
data = json.loads(json_string)
if not isinstance(data, dict):
raise ValidationError(
"リクエストボディはJSONオブジェクトである必要があります",
"INVALID_REQUEST_FORMAT"
)
return data
except json.JSONDecodeError as e:
logger.error(f"JSON解析エラー: {str(e)}")
raise ValidationError(
"無効なJSON形式です",
"INVALID_JSON_FORMAT"
)
3. Bedrockサービス統合
class BedrockService:
"""
Amazon Bedrock APIとの統合を管理するサービスクラス
"""
def __init__(self, region_name: Optional[str] = None, max_retries: int = 3):
"""
BedrockServiceを初期化
Args:
region_name: AWSリージョン名(省略時は環境変数から取得)
max_retries: API呼び出しの最大リトライ回数
"""
self.region_name = region_name
self._client = None
self._error_handler = BedrockErrorHandler(max_retries=max_retries)
self._initialize_client()
def _initialize_client(self) -> None:
"""
Bedrockクライアントを初期化
Raises:
BedrockServiceUnavailableError: クライアント初期化に失敗した場合
"""
try:
client_config = {}
if self.region_name:
client_config['region_name'] = self.region_name
self._client = boto3.client('bedrock-agent', **client_config)
# クライアントの動作確認(軽量なAPI呼び出し)
self._validate_client_connection()
logger.info(f"Bedrockクライアントを初期化しました (リージョン: {self._client.meta.region_name})")
except Exception as e:
logger.error(f"Bedrockクライアントの初期化に失敗: {str(e)}")
raise BedrockServiceUnavailableError(
details={'error': str(e), 'region': self.region_name}
)
def start_ingestion_job(self, knowledge_base_id: str, data_source_id: str, request_id: str) -> str:
"""
Knowledge Base同期ジョブを開始
Args:
knowledge_base_id: Knowledge Base ID
data_source_id: Data Source ID
request_id: リクエストID(ログ用)
Returns:
開始されたジョブのID
Raises:
BedrockSyncError: 同期ジョブの開始に失敗した場合
"""
try:
logger.info(f"同期ジョブ開始: KB={knowledge_base_id}, DS={data_source_id}")
response = self._client.start_ingestion_job(
knowledgeBaseId=knowledge_base_id,
dataSourceId=data_source_id
)
job_id = response['ingestionJob']['ingestionJobId']
logger.info(f"同期ジョブが正常に開始されました: job_id={job_id}")
return job_id
except ClientError as e:
logger.error(f"Bedrock API呼び出しエラー: {str(e)}")
raise map_boto3_error_to_custom_error(e)
except Exception as e:
logger.error(f"予期しないエラー: {str(e)}")
raise BedrockSyncError(f"同期ジョブの開始に失敗しました: {str(e)}")
4. レスポンスハンドラー
class ResponseHandler:
"""レスポンス生成ハンドラークラス"""
@staticmethod
def create_success_response(
job_id: str,
knowledge_base_id: str,
message: str = "同期ジョブが正常に開始されました"
) -> Dict[str, Any]:
"""
成功レスポンスを作成
Args:
job_id: BedrockジョブID
knowledge_base_id: Knowledge Base ID
message: 成功メッセージ
Returns:
API Gateway形式の成功レスポンス
"""
try:
response_data = SyncResponse.create_success(job_id, knowledge_base_id, message)
logger.info(f"成功レスポンス作成: job_id={job_id}, kb_id={knowledge_base_id}")
return ResponseHandler._create_api_response(
status_code=HTTPStatusCodes.OK,
body=response_data.to_dict()
)
except Exception as e:
logger.error(f"成功レスポンス作成中にエラー: {str(e)}")
# フォールバック用の基本的な成功レスポンス
return ResponseHandler._create_api_response(
status_code=HTTPStatusCodes.OK,
body={
"status": "success",
"jobId": job_id,
"knowledgeBaseId": knowledge_base_id,
"message": message,
"timestamp": ResponseHandler._get_current_timestamp()
}
)
@staticmethod
def _create_api_response(
status_code: int,
body: Dict[str, Any],
additional_headers: Optional[Dict[str, str]] = None
) -> Dict[str, Any]:
"""
API Gateway形式のレスポンスを作成
Args:
status_code: HTTPステータスコード
body: レスポンスボディ
additional_headers: 追加ヘッダー
Returns:
API Gateway形式のレスポンス
"""
headers = ResponseHandler._get_default_headers()
if additional_headers:
headers.update(additional_headers)
return {
'statusCode': status_code,
'headers': headers,
'body': json.dumps(body, ensure_ascii=False)
}
@staticmethod
def _get_default_headers() -> Dict[str, str]:
"""
デフォルトヘッダーを取得
Returns:
デフォルトヘッダー辞書
"""
return {
'Content-Type': 'application/json',
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Methods': 'POST, OPTIONS',
'Access-Control-Allow-Headers': 'Content-Type, Authorization'
}
5. 構造化ログの例
{
"timestamp": "2025-09-15T01:39:33.027825Z",
"level": "INFO",
"request_id": "12345678-1234-1234-1234-123456789012",
"action": "bedrock_call_success",
"knowledge_base_id": "XXXXXXXXXX",
"data_source_id": "YYYYYYYYYY",
"details": {
"job_id": "HDVT5KDVYH",
"operation_time_seconds": 1.089
}
}
開発中に遭遇した問題と解決 – Kiroとのトラブルシューティング
開発過程では様々な問題に遭遇しましたが、Kiroの支援により効率的に解決できました。
1. Runtime.ImportModuleError – Kiroによる迅速な問題特定
問題: Lambda関数で相対インポートエラーが発生
Runtime.ImportModuleError: Unable to import module 'lambda_function':
attempted relative import with no known parent package
原因:
- Python 3.13を使用
- 相対インポート(
from .module import
)を使用
解決策:
- ランタイムをPython 3.11に変更
- 相対インポートを絶対インポートに修正
# 修正前(エラーになる)
from .bedrock_service import BedrockService
from .validation import validate_sync_request
from .response_handler import ResponseHandler
# 修正後(正常動作)
from bedrock_service import BedrockService
from validation import validate_sync_request
from response_handler import ResponseHandler
# さらに、Lambda環境でのモジュール検索パスを設定
import sys
if '/var/task' not in sys.path:
sys.path.insert(0, '/var/task')
try:
from validation import validate_sync_request
from bedrock_service import BedrockService
from response_handler import ResponseHandler
# ... 他のインポート
except ImportError as e:
# インポートエラーの詳細をログに出力
print(f"Import error: {e}")
print(f"Python path: {sys.path}")
print(f"Current working directory: {os.getcwd()}")
raise
2. UUID形式の誤解
問題: Bedrock Knowledge Base IDをUUID形式として検証していた
# 間違った検証パターン(UUIDを想定)
SYNC_REQUEST_SCHEMA = {
"properties": {
"knowledgeBaseId": {
"type": "string",
"pattern": r"^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$"
}
}
}
実際のID形式: XXXXXXXXXX
(10文字の英数字)
解決策:
# 正しい検証パターン(Bedrock ID形式)
SYNC_REQUEST_SCHEMA = {
"type": "object",
"properties": {
"knowledgeBaseId": {
"type": "string",
"pattern": r"^[A-Z0-9]{10}$", # Bedrock Knowledge Base ID形式
"minLength": 10,
"maxLength": 10
},
"dataSourceId": {
"type": "string",
"pattern": r"^[A-Z0-9]{10}$", # Bedrock Data Source ID形式
"minLength": 10,
"maxLength": 10
}
},
"required": ["knowledgeBaseId", "dataSourceId"],
"additionalProperties": False
}
# エラーメッセージも更新
if not re.match(pattern, value, re.IGNORECASE):
if field_name in ["knowledgeBaseId", "dataSourceId"]:
raise ValidationError(
f"パラメータ '{field_name}' は有効なBedrock ID形式である必要があります(10文字の英数字)",
"INVALID_BEDROCK_ID_FORMAT"
)
3. IAM権限の最適化
問題: 最小権限では動作しない
最初に試した権限:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"bedrock:StartIngestionJob",
"bedrock:GetKnowledgeBase",
"bedrock:GetDataSource"
],
"Resource": "*"
}
]
}
エラー: BedrockAccessDeniedError - Bedrockサービスへのアクセスが拒否されました
解決策: Bedrockの内部処理で追加のAPIが必要
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "BedrockFullAccess",
"Effect": "Allow",
"Action": ["bedrock:*"],
"Resource": "*"
},
{
"Sid": "CloudWatchLogsAccess",
"Effect": "Allow",
"Action": [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents"
],
"Resource": "arn:aws:logs:*:*:*"
}
]
}
学んだこと: Knowledge Base操作には、ドキュメントに記載されていない内部的なAPI呼び出しが含まれるため、bedrock:*
の広い権限が必要でした。
Kiroの貢献: 各問題に対して、Kiroは段階的なアプローチを提案し、最小権限から始めて徐々に権限を拡張する方法を教えてくれました。また、CloudWatchログの分析方法や、効率的なデバッグ手法も学ぶことができました。
デプロイメントと運用
デプロイ手順
1. デプロイパッケージ作成
# deploy/create_deployment_package.py
import os
import shutil
import subprocess
import tempfile
import zipfile
from pathlib import Path
def create_deployment_package():
"""
Lambda デプロイパッケージを作成
"""
print("Lambda デプロイパッケージの作成を開始します...")
# 一時ディレクトリの作成
with tempfile.TemporaryDirectory() as temp_dir:
print(f"一時ディレクトリ: {temp_dir}")
# 依存関係のインストール
print("依存関係をインストール中...")
subprocess.run([
"pip", "install",
"-r", "deploy/requirements-lambda.txt",
"-t", temp_dir
], check=True)
# ソースコードのコピー
print("ソースコードをコピー中...")
src_dir = Path("src")
for py_file in src_dir.glob("*.py"):
shutil.copy2(py_file, temp_dir)
print(f" コピー: {py_file.name}")
# ZIPパッケージの作成
print("ZIPパッケージを作成中...")
zip_path = "bedrock-kb-sync-lambda.zip"
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
for root, dirs, files in os.walk(temp_dir):
for file in files:
file_path = os.path.join(root, file)
arc_name = os.path.relpath(file_path, temp_dir)
zipf.write(file_path, arc_name)
print(f" 追加: {arc_name}")
# パッケージサイズの確認
size_mb = os.path.getsize(zip_path) / (1024 * 1024)
print(f"デプロイパッケージの作成が完了しました: {zip_path}")
print(f"パッケージサイズ: {size_mb:.2f} MB")
if size_mb > 50:
print("⚠️ 警告: パッケージサイズが50MBを超えています")
if __name__ == "__main__":
create_deployment_package()
# 実行
python deploy/create_deployment_package.py
2. IAMロール作成
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "BedrockFullAccess",
"Effect": "Allow",
"Action": ["bedrock:*"],
"Resource": "*"
},
{
"Sid": "CloudWatchLogsAccess",
"Effect": "Allow",
"Action": [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents"
],
"Resource": "arn:aws:logs:*:*:*"
}
]
}
3. Lambda関数作成
# AWS CLI を使用した作成例
aws lambda create-function \
--function-name bedrock-kb-sync-function \
--runtime python3.11 \
--role arn:aws:iam::ACCOUNT-ID:role/bedrock-kb-sync-lambda-role \
--handler lambda_function.lambda_handler \
--zip-file fileb://bedrock-kb-sync-lambda.zip \
--timeout 30 \
--memory-size 256 \
--environment Variables='{
"LOG_LEVEL": "INFO",
"BEDROCK_REGION": "ap-northeast-1"
}'
4. API Gateway設定
# HTTP API作成
aws apigatewayv2 create-api \
--name bedrock-kb-sync-api \
--protocol-type HTTP \
--description "Bedrock Knowledge Base同期API"
# Lambda統合作成
aws apigatewayv2 create-integration \
--api-id YOUR-API-ID \
--integration-type AWS_PROXY \
--integration-uri arn:aws:lambda:ap-northeast-1:ACCOUNT-ID:function:bedrock-kb-sync-function \
--payload-format-version 2.0
# ルート作成
aws apigatewayv2 create-route \
--api-id YOUR-API-ID \
--route-key "POST /sync" \
--target integrations/YOUR-INTEGRATION-ID
動作確認
# 正常ケース
curl -X POST https://your-api-id.execute-api.ap-northeast-1.amazonaws.com/prod/sync \
-H "Content-Type: application/json" \
-d '{"knowledgeBaseId": "XXXXXXXXXX","dataSourceId": "YYYYYYYYYY"}' \
-w "\nHTTP Status: %{http_code}\nResponse Time: %{time_total}s\n"
# 期待されるレスポンス
{
"status": "success",
"jobId": "HDVT5KDVYH",
"knowledgeBaseId": "XXXXXXXXXX",
"timestamp": "2025-09-15T01:39:33.027825Z",
"message": "同期ジョブが正常に開始されました"
}
# エラーケースのテスト
curl -X POST https://your-api-id.execute-api.ap-northeast-1.amazonaws.com/prod/sync \
-H "Content-Type: application/json" \
-d '{}' \
-w "\nHTTP Status: %{http_code}\n"
# 期待されるエラーレスポンス(400 Bad Request)
{
"status": "error",
"errorCode": "MISSING_PARAMETER",
"message": "必須パラメータ 'knowledgeBaseId' が不足しています",
"timestamp": "2025-09-15T01:39:33.027825Z"
}
包括的なテストスクリプト
#!/bin/bash
# test-api.sh - API動作確認スクリプト
API_URL="https://your-api-id.execute-api.ap-northeast-1.amazonaws.com/prod/sync"
KB_ID="XXXXXXXXXX"
DS_ID="YYYYYYYYYY"
echo "=== Bedrock Knowledge Base Sync API テスト ==="
echo ""
# 1. 正常ケース
echo "1. 正常ケースのテスト:"
response=$(curl -s -w "%{http_code}" -X POST "$API_URL" \
-H "Content-Type: application/json" \
-d "{\"knowledgeBaseId\":\"$KB_ID\",\"dataSourceId\":\"$DS_ID\"}")
http_code="${response: -3}"
response_body="${response%???}"
if [ "$http_code" -eq 200 ]; then
job_id=$(echo "$response_body" | jq -r '.jobId')
echo "✅ 成功 - ジョブID: $job_id"
else
echo "❌ 失敗 - HTTP $http_code"
echo "$response_body"
fi
echo ""
# 2. バリデーションエラー
echo "2. バリデーションエラーのテスト:"
response=$(curl -s -w "%{http_code}" -X POST "$API_URL" \
-H "Content-Type: application/json" \
-d '{}')
http_code="${response: -3}"
if [ "$http_code" -eq 400 ]; then
echo "✅ 期待通りのバリデーションエラー: HTTP $http_code"
else
echo "❌ 予期しないレスポンス: HTTP $http_code"
fi
echo ""
# 3. 不正JSON
echo "3. 不正JSONのテスト:"
response=$(curl -s -w "%{http_code}" -X POST "$API_URL" \
-H "Content-Type: application/json" \
-d 'invalid json')
http_code="${response: -3}"
if [ "$http_code" -eq 400 ]; then
echo "✅ 期待通りのJSONエラー: HTTP $http_code"
else
echo "❌ 予期しないレスポンス: HTTP $http_code"
fi
echo ""
echo "テスト完了"
運用での効果
Before(手動運用)
- 同期頻度: 1日1回(手動)
- 作業時間: 5-10分/回
- エラー対応: 手動確認とリトライ
- スケーラビリティ: 限定的
After(API自動化)
- 同期頻度: 必要に応じて随時
- 作業時間: 0分(完全自動化)
- エラー対応: 構造化ログで詳細把握
- スケーラビリティ: 複数KB対応可能
具体的な改善例
- 夜間バッチとの連携
# S3アップロード後に自動同期
import requests
import logging
def upload_and_sync():
"""
S3にファイルをアップロードした後、自動的にKnowledge Baseを同期
"""
try:
# S3にファイルアップロード
upload_to_s3(files)
logging.info("S3へのファイルアップロードが完了しました")
# 同期API呼び出し
sync_url = 'https://api-id.execute-api.ap-northeast-1.amazonaws.com/prod/sync'
payload = {
'knowledgeBaseId': 'XXXXXXXXXX', # 実際のKnowledge Base ID
'dataSourceId': 'YYYYYYYYYY' # 実際のData Source ID
}
response = requests.post(
sync_url,
json=payload,
headers={'Content-Type': 'application/json'},
timeout=30
)
if response.status_code == 200:
result = response.json()
job_id = result['jobId']
logging.info(f"同期ジョブが正常に開始されました: {job_id}")
return job_id
else:
logging.error(f"同期API呼び出しエラー: {response.status_code} - {response.text}")
raise Exception(f"同期に失敗しました: {response.status_code}")
except requests.exceptions.RequestException as e:
logging.error(f"API呼び出し中にネットワークエラーが発生: {str(e)}")
raise
except Exception as e:
logging.error(f"予期しないエラーが発生: {str(e)}")
raise
# 使用例
if __name__ == "__main__":
try:
job_id = upload_and_sync()
print(f"処理完了: ジョブID {job_id}")
except Exception as e:
print(f"処理失敗: {str(e)}")
exit(1)
- CI/CDパイプライン統合
# GitHub Actions例
name: Deploy and Sync Knowledge Base
on:
push:
branches: [ main ]
paths: [ 'documents/**' ]
jobs:
deploy-and-sync:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Upload documents to S3
run: |
aws s3 sync documents/ s3://${{ vars.S3_BUCKET }}/documents/ \
--delete \
--exclude "*.tmp"
- name: Sync Knowledge Base
run: |
response=$(curl -s -w "%{http_code}" -X POST ${{ secrets.SYNC_API_URL }} \
-H "Content-Type: application/json" \
-H "X-API-Key: ${{ secrets.API_KEY }}" \
-d '{"knowledgeBaseId": "${{ vars.KB_ID }}", "dataSourceId": "${{ vars.DS_ID }}"}')
http_code="${response: -3}"
response_body="${response%???}"
if [ "$http_code" -eq 200 ]; then
job_id=$(echo "$response_body" | jq -r '.jobId')
echo "✅ 同期ジョブが開始されました: $job_id"
else
echo "❌ 同期に失敗しました: HTTP $http_code"
echo "$response_body"
exit 1
fi
- name: Notify Slack
if: always()
uses: 8398a7/action-slack@v3
with:
status: ${{ job.status }}
text: "Knowledge Base同期: ${{ job.status }}"
env:
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK }}
- Lambda関数からの呼び出し
# 他のLambda関数からAPI呼び出し
import json
import urllib3
import os
def lambda_handler(event, context):
"""
S3イベントトリガーでKnowledge Base同期を実行
"""
http = urllib3.PoolManager()
# S3イベントから情報を取得
bucket = event['Records'][0]['s3']['bucket']['name']
key = event['Records'][0]['s3']['object']['key']
# ドキュメントフォルダの変更のみ処理
if not key.startswith('documents/'):
return {'statusCode': 200, 'body': 'スキップ: ドキュメント以外の変更'}
try:
# 同期API呼び出し
sync_payload = {
'knowledgeBaseId': os.environ['KNOWLEDGE_BASE_ID'],
'dataSourceId': os.environ['DATA_SOURCE_ID']
}
response = http.request(
'POST',
os.environ['SYNC_API_URL'],
body=json.dumps(sync_payload),
headers={
'Content-Type': 'application/json',
'X-API-Key': os.environ['API_KEY']
}
)
if response.status == 200:
result = json.loads(response.data.decode('utf-8'))
print(f"同期ジョブ開始: {result['jobId']}")
return {
'statusCode': 200,
'body': json.dumps({
'message': '同期が正常に開始されました',
'jobId': result['jobId'],
'trigger': f's3://{bucket}/{key}'
})
}
else:
print(f"同期API呼び出しエラー: {response.status}")
return {
'statusCode': 500,
'body': json.dumps({'error': f'同期に失敗しました: {response.status}'})
}
except Exception as e:
print(f"エラー: {str(e)}")
return {
'statusCode': 500,
'body': json.dumps({'error': str(e)})
}
監視とトラブルシューティング
CloudWatch監視
設定したアラーム:
- Lambda関数エラー率 > 5%
- Lambda関数実行時間 > 25秒
- API Gateway 5xxエラー > 10件/5分
よくある問題と対処法
問題 | 症状 | 対処法 |
---|---|---|
権限不足 | 403 Forbidden | IAMポリシーの確認・更新 |
ID不正 | 400 Bad Request | Knowledge Base/Data Source IDの確認 |
タイムアウト | 504 Gateway Timeout | Lambda実行時間の延長 |
まとめ – Kiroとの協働開発で得られた成果
Amazon Bedrock Knowledge Baseの同期を自動化するAPIを、Kiroと一緒に開発することで、以下の成果を得られました:
技術的成果
- 完全自動化: 手動作業の排除
- 高い可用性: サーバーレスアーキテクチャ
- 詳細監視: 構造化ログとメトリクス
- セキュア: 適切なIAM権限設定
運用面の改善
- 効率化: 作業時間の大幅削減
- 柔軟性: 様々なタイミングでの同期実行
- スケーラビリティ: 複数Knowledge Baseへの対応
- 信頼性: エラーハンドリングと監視
Kiroとの協働で学んだ教訓
- AI支援開発の効果: 複雑な問題も段階的に解決可能
- ペアプログラミングの価値: 人間の創造性とAIの知識が相乗効果
- 継続的な学習: Kiroとの対話を通じてベストプラクティスを習得
- 品質向上: AIによるコードレビューで見落としを防止
- ドキュメント化の自動化: 包括的な運用ドキュメントを効率的に作成
Kiroとの開発体験
- 迅速な問題解決: エラーメッセージから即座に原因を特定
- コード品質向上: ベストプラクティスに基づいた実装提案
- 学習効果: 開発過程で新しい技術や手法を習得
- 効率的な開発: 繰り返し作業の自動化と最適化
今後の展開
現在検討している機能拡張:
- バッチ同期: 複数Knowledge Baseの一括同期
- スケジュール同期: CloudWatch Eventsとの連携
- 同期状況確認: ジョブステータス取得API
- Webhook通知: 同期完了時の通知機能
Kiroについて
今回の開発で使用した「Kiro」は、開発者向けのAIアシスタントです。コード生成、デバッグ、アーキテクチャ設計、ドキュメント作成など、開発の全工程をサポートしてくれます。
特に印象的だったのは:
- 文脈理解: プロジェクト全体を把握した上での適切な提案
- 実践的な知識: 実際の運用を考慮したベストプラクティスの提示
- 学習支援: 単なるコード生成ではなく、理解を深める説明
Amazon Bedrockを使ったRAGシステムの運用で同様の課題を抱えている方、そしてAI支援開発に興味がある方の参考になれば幸いです。
参考リンク
コメント