ベクターデータベースを使用した本番環境対応のRAGアプリケーション構築

ベクターデータベースを使用した本番環境対応のRAGアプリケーション構築

正直なところ、RAGを初めて実装したとき、「ベクター検索さえ動けばほぼ完成だ」と甘く見ていた。実際には、チャンク戦略のミスで検索精度が崩壊し、リトライロジックの欠如で本番障害を起こした。この記事はその反省も込めて書いている。

LLMには「知識のカットオフ」と「ハルシネーション」という根本的な課題がある。これを解決する現実的なアプローチが検索拡張生成(RAG:Retrieval-Augmented Generation)だ。ベクターデータベースを核としたRAGシステムを、実際の本番環境で運用できるレベルまで持っていく方法を、実装コードを交えて説明する。


RAGとは何か

検索拡張生成(RAG)は、LLMが回答を生成する前に、外部のナレッジベースから関連情報を検索・取得し、その情報をプロンプトに組み込む技術だ。

通常のLLMは学習データだけで回答を生成するため、こういった問題が起きる。

  • 情報の陳腐化:トレーニングデータのカットオフ以降の情報を知らない
  • ハルシネーション:存在しない情報を自信満々に生成してしまう
  • 社内情報への非対応:プロプライエタリなデータやドキュメントを扱えない

RAGはこれらを根本から解決する。ユーザーの質問に対して、まず関連ドキュメントをリアルタイムで検索し、その文書をコンテキストとしてLLMに渡すことで、正確で最新の回答を生成できる。

RAGパイプラインの基本構造

RAGシステムは2つのフェーズで構成される。

インデックス作成フェーズ(オフライン処理)
1. ドキュメントの収集と前処理
2. テキストのチャンク分割
3. 埋め込みモデルによるベクトル化
4. ベクターデータベースへの格納

クエリ処理フェーズ(オンライン処理)
1. ユーザーの質問をベクトル化
2. ベクターデータベースで類似検索を実行
3. 関連チャンクをプロンプトに組み込む
4. LLMが文脈を踏まえた回答を生成


ベクターデータベースはどれを選ぶか:PineconeとWeaviateの比較

RAGシステムの性能はベクターデータベースの選択に大きく依存する。私はこれまでいくつかのプロジェクトで両方を使ってきたが、「どちらが優れているか」より「どちらが自分たちのコンテキストに合っているか」で判断すべきだと思っている。

Pinecone

Pineconeはフルマネージドのベクターデータベースサービスだ。インフラ管理が不要で、APIを叩けばすぐ使える。

主な特徴:
クラウドホスティング” rel=”nofollow sponsored” target=”_blank”>クラウドサーバー” rel=”nofollow sponsored” target=”_blank”>サーバーレスアーキテクチャ(スタータープランは無料枠あり)
– 自動スケーリングとレプリケーション
– ネームスペースによるデータ分離
– ハイブリッド検索(密ベクトル+スパースベクトル)のサポート
– REST APIおよびPython/Node.js SDKの完備

向いているケース: 開発スピードを優先したい場合、インフラ管理の負荷を最小化したい組織

Weaviate

Weaviateはオープンソースのベクターデータベースで、セルフホストとクラウド版の両方を選べる。

主な特徴:
– GraphQLベースのクエリインターフェース
– モジュール型アーキテクチャ(埋め込みモデルを内蔵可能)
– マルチテナンシーのネイティブサポート
– BM25とベクトル検索を組み合わせたハイブリッド検索
– データの完全な制御とカスタマイズ性

向いているケース: データ主権が重要な場合、カスタマイズ性を重視する開発チーム

比較サマリー

項目PineconeWeaviate
ホスティングフルマネージドセルフ/クラウド
セットアップ難易度低い中程度
スケーラビリティ自動手動設定可
コスト従量課金インフラコスト
カスタマイズ性限定的高い

実装:RAGシステムをゼロから作る

PineconeとOpenAIを使った基本的な実装から始め、本番環境を意識した設計へと発展させる。

環境のセットアップ

まず必要なライブラリをインストールする。

pip install pinecone-client openai langchain tiktoken python-dotenv redis

環境変数の設定:

# .env ファイル
OPENAI_API_KEY=sk-your-openai-api-key
PINECONE_API_KEY=your-pinecone-api-key
PINECONE_ENVIRONMENT=us-east-1
PINECONE_INDEX_NAME=rag-production

ドキュメントの埋め込みとインデックス登録

import os
import hashlib
from dotenv import load_dotenv
from pinecone import Pinecone, ServerlessSpec
from openai import OpenAI
from langchain.text_splitter import RecursiveCharacterTextSplitter

load_dotenv()

# クライアントの初期化
openai_client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
pc = Pinecone(api_key=os.getenv("PINECONE_API_KEY"))

# インデックスの作成(存在しない場合)
index_name = os.getenv("PINECONE_INDEX_NAME")
if index_name not in pc.list_indexes().names():
    pc.create_index(
        name=index_name,
        dimension=1536,  # text-embedding-3-small の次元数
        metric="cosine",
        spec=ServerlessSpec(
            cloud="aws",
            region=os.getenv("PINECONE_ENVIRONMENT")
        )
    )

index = pc.Index(index_name)

def get_embedding(text: str) -> list[float]:
    """テキストをベクトルに変換する"""
    response = openai_client.embeddings.create(
        model="text-embedding-3-small",
        input=text
    )
    return response.data[0].embedding

def chunk_and_index_documents(documents: list[dict]) -> None:
    """ドキュメントをチャンク分割してベクターDBに登録する"""
    # 日本語テキストに適したセパレータを指定
    splitter = RecursiveCharacterTextSplitter(
        chunk_size=512,
        chunk_overlap=64,
        separators=["\n\n", "\n", "。", "、", " ", ""]
    )

    vectors = []
    for doc in documents:
        chunks = splitter.split_text(doc["content"])

        for i, chunk in enumerate(chunks):
            chunk_id = hashlib.md5(f"{doc['id']}_{i}".encode()).hexdigest()
            embedding = get_embedding(chunk)

            vectors.append({
                "id": chunk_id,
                "values": embedding,
                "metadata": {
                    "text": chunk,
                    "source": doc["source"],
                    "doc_id": doc["id"],
                    "chunk_index": i
                }
            })

        # バッチサイズ100でアップサート(APIコスト削減)
        if len(vectors) >= 100:
            index.upsert(vectors=vectors)
            vectors = []

    if vectors:
        index.upsert(vectors=vectors)

    stats = index.describe_index_stats()
    print(f"インデックス登録完了: 総ベクトル数 = {stats['total_vector_count']}")

検索と生成の統合

def retrieve_relevant_chunks(
    query: str,
    top_k: int = 5,
    score_threshold: float = 0.70
) -> list[dict]:
    """クエリに関連するチャンクをベクターDBから検索する"""
    query_embedding = get_embedding(query)

    results = index.query(
        vector=query_embedding,
        top_k=top_k,
        include_metadata=True
    )

    # コサイン類似度でフィルタリング
    return [
        {
            "text": match["metadata"]["text"],
            "source": match["metadata"]["source"],
            "score": match["score"]
        }
        for match in results["matches"]
        if match["score"] >= score_threshold
    ]

def generate_rag_response(query: str) -> dict:
    """検索拡張生成を使って質問に回答する"""
    chunks = retrieve_relevant_chunks(query)

    if not chunks:
        return {
            "answer": "関連する情報が見つかりませんでした。",
            "sources": []
        }

    # 取得したチャンクからコンテキストを構築
    context = "\n\n---\n\n".join([
        f"【出典: {c['source']}】\n{c['text']}"
        for c in chunks
    ])

    system_prompt = """あなたは専門的なアシスタントです。
提供されたコンテキストのみを使用して質問に回答してください。
コンテキストに含まれていない情報については、「提供された情報には含まれていません」と明示してください。
回答は簡潔かつ正確にまとめてください。"""

    user_prompt = f"""コンテキスト:
{context}

質問: {query}

上記のコンテキストに基づいて回答してください。"""

    response = openai_client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": user_prompt}
        ],
        temperature=0.1,  # 一貫性のある回答のため低めに設定
        max_tokens=1024
    )

    return {
        "answer": response.choices[0].message.content,
        "sources": list({c["source"] for c in chunks}),
        "chunks_used": len(chunks)
    }

Weaviateを使った代替実装

専門用語が多い業務ドキュメント―法律文書や医療記録など―ではハイブリッド検索が効く。キーワードの完全一致(BM25)とベクトル類似度を組み合わせられるのがWeaviateの強みだ。私の経験では、純粋なベクトル検索だけだと固有名詞や型番の検索精度が落ちることがあった。

import weaviate
from weaviate.classes.config import Configure, Property, DataType

# Weaviate Cloudクライアントの初期化
client = weaviate.connect_to_weaviate_cloud(
    cluster_url=os.getenv("WEAVIATE_URL"),
    auth_credentials=weaviate.auth.AuthApiKey(os.getenv("WEAVIATE_API_KEY"))
)

# コレクション(インデックス)の作成
client.collections.create(
    name="Document",
    vectorizer_config=Configure.Vectorizer.text2vec_openai(
        model="text-embedding-3-small"
    ),
    generative_config=Configure.Generative.openai(
        model="gpt-4o-mini"
    ),
    properties=[
        Property(name="content", data_type=DataType.TEXT),
        Property(name="source", data_type=DataType.TEXT),
        Property(name="doc_id", data_type=DataType.TEXT),
    ]
)

def hybrid_search(query: str, alpha: float = 0.5, limit: int = 5) -> list:
    """
    BM25とベクトル検索を組み合わせたハイブリッド検索
    alpha=0.0: 純粋なキーワード検索(BM25)
    alpha=1.0: 純粋なベクトル検索
    alpha=0.5: ハイブリッド(一般的な用途に推奨)
    """
    collection = client.collections.get("Document")
    results = collection.query.hybrid(
        query=query,
        alpha=alpha,
        limit=limit,
        return_properties=["content", "source"]
    )
    return [
        {"text": obj.properties["content"], "source": obj.properties["source"]}
        for obj in results.objects
    ]

本番環境で詰まるポイントと対策

開発環境では気づかないが、本番に出た途端に問題になることが3つある。レイテンシ、外部API障害への耐性、そして可観測性だ。

レイテンシの最適化:埋め込みキャッシュ

ユーザーの質問は意外と重複する。同じクエリで毎回埋め込みAPIを叩くのは無駄なので、Redisでキャッシュする。TTLは24時間にしているが、ドキュメント更新頻度に合わせて調整してほしい。

import redis
import json

redis_client = redis.Redis(host="localhost", port=6379, decode_responses=True)

def get_embedding_cached(text: str, ttl: int = 86400) -> list[float]:
    """Redisキャッシュを活用した埋め込み取得(TTL: 24時間)"""
    cache_key = f"emb:{hashlib.md5(text.encode()).hexdigest()}"

    cached = redis_client.get(cache_key)
    if cached:
        return json.loads(cached)

    embedding = get_embedding(text)
    redis_client.setex(cache_key, ttl, json.dumps(embedding))
    return embedding

エラーハンドリングとリトライロジック

外部APIへの依存が多いRAGシステムで、リトライなしで本番運用するのは危険だ。OpenAIもPineconeも一時的なエラーが起きる。指数バックオフ付きのデコレータを一度作っておけば使い回せる。

import time
from functools import wraps

def with_retry(max_retries: int = 3, backoff: float = 1.0):
    """指数バックオフ付きリトライデコレータ"""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            for attempt in range(max_retries):
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    if attempt == max_retries - 1:
                        raise
                    wait_time = backoff * (2 ** attempt)
                    print(f"エラー発生 (試行 {attempt+1}/{max_retries}): {e}")
                    print(f"{wait_time:.1f}秒後にリトライします...")
                    time.sleep(wait_time)
        return wrapper
    return decorator

@with_retry(max_retries=3, backoff=0.5)
def get_embedding_safe(text: str) -> list[float]:
    return get_embedding_cached(text)

モニタリングと可観測性

「動いているっぽい」では本番運用は成り立たない。検索精度が静かに劣化していても、メトリクスがなければ気づけない。

import time
from dataclasses import dataclass
from typing import Optional

@dataclass
class RAGMetrics:
    query: str
    retrieval_latency_ms: float = 0.0
    generation_latency_ms: float = 0.0
    chunks_retrieved: int = 0
    avg_similarity_score: float = 0.0
    cache_hit: bool = False
    error: Optional[str] = None

    @property
    def total_latency_ms(self) -> float:
        return self.retrieval_latency_ms + self.generation_latency_ms

def generate_rag_response_monitored(query: str) -> tuple[dict, RAGMetrics]:
    """メトリクス収集付きRAG実行"""
    metrics = RAGMetrics(query=query)

    # 検索フェーズの計測
    t0 = time.time()
    chunks = retrieve_relevant_chunks(query)
    metrics.retrieval_latency_ms = (time.time() - t0) * 1000
    metrics.chunks_retrieved = len(chunks)

    if chunks:
        metrics.avg_similarity_score = sum(
            c["score"] for c in chunks
        ) / len(chunks)

    # 生成フェーズの計測
    t1 = time.time()
    result = generate_rag_response(query)
    metrics.generation_latency_ms = (time.time() - t1) * 1000

    # ログ出力(本番ではDatadog等へ送信)
    print(f"[RAG Metrics] 検索: {metrics.retrieval_latency_ms:.0f}ms | "
          f"生成: {metrics.generation_latency_ms:.0f}ms | "
          f"チャンク数: {metrics.chunks_retrieved} | "
          f"平均スコア: {metrics.avg_similarity_score:.3f}")

    return result, metrics

追跡すべき主要メトリクス:

メトリクス目標値説明
検索レイテンシP95< 300msベクターDB検索時間
生成レイテンシP95< 3,000msLLM応答生成時間
平均類似度スコア> 0.75検索精度の指標
キャッシュヒット率> 30%埋め込みキャッシュ効率
エラーレート< 0.1%API呼び出し失敗率

チャンク戦略の最適化――ここで差がつく

チャンク分割は地味だが、RAGの精度を左右する最重要ポイントだ。固定長で機械的に切ると、意味的に関連する情報が途切れてしまい、検索でヒットしても文脈が中途半端になる。

セマンティックチャンキング

from langchain_experimental.text_splitter import SemanticChunker
from langchain_openai import OpenAIEmbeddings

embeddings = OpenAIEmbeddings(model="text-embedding-3-small")

# 意味的なまとまりを保ってチャンク分割
semantic_splitter = SemanticChunker(
    embeddings=embeddings,
    breakpoint_threshold_type="percentile",
    breakpoint_threshold_amount=95  # 上位5%の意味的乖離点で分割
)

chunks = semantic_splitter.create_documents([long_document_text])

親ドキュメント取得(Parent Document Retriever)

小さいチャンクで精度の高い検索を行い、ヒットした場合は文脈が豊富な親チャンクを取得する手法だ。検索精度とコンテキストの豊富さを両立できる―個人的にはこれが一番コスパが良いと感じている。

from langchain.retrievers import ParentDocumentRetriever
from langchain.storage import InMemoryStore
from langchain_community.vectorstores import Chroma

# 子チャンク(検索用):小さく、精度の高いマッチング
child_splitter = RecursiveCharacterTextSplitter(
    chunk_size=200,
    separators=["\n\n", "\n", "。", " "]
)
# 親チャンク(コンテキスト用):大きく、情報が豊富
parent_splitter = RecursiveCharacterTextSplitter(
    chunk_size=1000,
    separators=["\n\n", "\n", "。", " "]
)

vectorstore = Chroma(embedding_function=embeddings)
store = InMemoryStore()

retriever = ParentDocumentRetriever(
    vectorstore=vectorstore,
    docstore=store,
    child_splitter=child_splitter,
    parent_splitter=parent_splitter,
)

本番環境対応RAGシステムの構築チェックリスト

設計フェーズ
– [ ] ユースケースに合ったベクターデータベース(PineconeまたはWeaviate)の選定
– [ ] チャンクサイズと重複幅の最適化(目安:512〜1024トークン、重複10〜20%)
– [ ] 埋め込みモデルの選定(コストと精度のバランス)
– [ ] ハイブリッド検索の採用可否の検討

実装フェーズ
– [ ] バッチ処理によるインデックス登録の効率化
– [ ] 埋め込みキャッシュの実装(Redisなど)
– [ ] エラーハンドリングとリトライロジックの整備
– [ ] 類似度スコアによるフィルタリングの実装

本番環境デプロイフェーズ
– [ ] レイテンシ目標の設定と監視ダッシュボードの整備
– [ ] APIレート制限とコスト上限の設定
– [ ] 定期的なインデックス更新パイプラインの構築
– [ ] A/Bテストによるチャンク戦略の継続改善

検索拡張生成はLLMアプリケーションの信頼性を大きく引き上げる。ハルシネーションを抑制しつつ、企業固有のナレッジを活かした高精度なAIシステムが現実的に作れる。まず小さいプロトタイプで動かし、本番でのフィードバックを見ながら段階的に改善していくのが結局一番早い道だ。


関連キーワード: RAGチュートリアル, ベクターデータベース, 検索拡張生成, 本番環境AI, Pinecone, Weaviate, LangChain, OpenAI, 埋め込みモデル, チャンキング

Leave a Comment

Your email address will not be published. Required fields are marked *

Scroll to Top