LangChain SQL を操作しデータを取得するAgent の作成
モデルがデータベースを操作してデータを取得できるのか確認しています
記事作成日:2025-11-29, 著者: Hi6
Overview
音楽関連のデータが入っている、SQLiteのサンプルデータベース Chinook.db に、データベースを操作するモデルにOllamaのgpt-oss:120b-cloudを指定し、ユーザーが入力する質問プロンプトに対しクエリを作成>実行>評価(ReAct Agent)をLang Chainで実装していきます。
サンプルデータベースの準備
https://storage.googleapis.com/benchmarks-artifacts/chinook/Chinook.db
上記アドレスからあらかじめローカルのプロジェクトディレクトリにデータベースをダウンロードしておきます。
from langchain_community.utilities import SQLDatabase
db = SQLDatabase.from_uri("sqlite:///Chinook.db") # ローカルのデータベースを読み込む
# MYSQL の場合
db = SQLDatabase.from_uri("mysql+pymysql://user:password@localhost/dbname")
# PostgreSQLの場合
db_postgres = SQLDatabase.from_uri("postgresql://user:password@localhost/dbname")
# DB 接続テスト
print(f"Dialect: {db.dialect}")
# SQLは標準規格ですが、各データベースが独自の拡張機能や構文を持っています。それらの違いを「ダイアレクト」と呼びます。
print(f"Available tables: {db.get_usable_table_names()}")
print(f'Sample output: {db.run("SELECT * FROM Artist LIMIT 5;")}')
tool を操作する model の準備
from langchain_ollama import ChatOllama
model = ChatOllama(
model="gpt-oss:120b-cloud",
temperature=0.8
)
データベースを操作する tool の準備
from langchain_community.agent_toolkits import SQLDatabaseToolkit
toolkit = SQLDatabaseToolkit(db=db, llm=model)
tools = toolkit.get_tools()
# 以下でどのようなツールが作成されたか確認できる(英文なので、下記に訳を掲載)
for tool in tools:
print(f"{tool.name}: {tool.description}\n")
- sql_db_query:このツールへの入力は、詳細で正確なSQLクエリです。出力はデータベースからの結果です。クエリが正しくない場合は、エラーメッセージが返されます。エラーが返されたら、クエリを書き直し、チェックして再度試してください。「Unknown column ‘xxxx’ in ‘field list’」という問題に遭遇した場合は、sql_db_schema を使用して正しいテーブルフィールドを照会します。
- sql_db_schema:このツールへの入力はカンマ区切りのテーブル名リストであり、出力はそれらのテーブルのスキーマとサンプル行です。実際にテーブルが存在することを確認するために、まず sql_db_list_tables を呼び出してください!例:Input: table1, table2, table3
- sql_db_list_tables:入力は空文字列で、出力はデータベース内のテーブル名をカンマ区切りで返します。
- sql_db_query_checker:このツールを使用してクエリが正しいかどうかを二重に確認してください。sql_db_query でクエリを実行する前に必ずこのツールを使ってください。
ReActエージェントを構築
エージェントはユーザーからの質問リクエストを解釈し、ツールが実行するSQLコマンドを生成します。コマンドにエラーがある場合は、エラーメッセージがモデルに返されます。 モデルは元のリクエストと新しいエラーメッセージを調べ、新しいコマンドを生成します。この処理は、LLMがコマンドを正常に生成するか、終了カウントに達するまで継続されます。
from langchain.agents.middleware import ModelCallLimitMiddleware
system_prompt = """
あなたはSQLデータベースと対話するように設計されたエージェントです。
入力された質問を受け取り、正しい構文の{dialect}クエリを作成して実行し、その結果から答えを返します。ユーザーが取得したい例の数を明示的に指定しない限り、常に最大で{top_k}件までに制限してください。
結果は関連する列だけを選択し、テーブル全体のすべての列を問い合わせることは避けます。
重要な例を返すために、適切な列で結果を並び替えることができます。
クエリを実行する前に必ず構文を再確認してください。実行時にエラーが発生した場合はクエリを書き直して再試行します。
INSERT, UPDATE, DELETE, DROP などの DML 文は絶対に実行しないでください。
作業開始時には、必ずデータベース内のテーブルを確認し、何が問い合わせ可能かを把握してください。このステップをスキップしてはいけません。
次に、最も関連性の高いテーブルのスキーマを照会します。
""".format(
dialect=db.dialect,
top_k=5,
)
agent = create_agent(
model,
tools,
system_prompt=system_prompt,
# Agent の実行回数に制限を設ける場合はmiddlewareを利用
middleware=[
ModelCallLimitMiddleware(
thread_limit=10,
run_limit=5,
exit_behavior="error", # or end
)
],
)
- thread_limit : スレッド内のすべての実行におけるモデル呼び出しの最大数。デフォルトでは制限なしです。
- run_limit : 1回の呼び出しあたりの最大モデル呼び出し回数。デフォルトでは制限なしです。
- exit_behavior : 制限に達したときの動作。 ‘end’ (正常終了) または ‘error’ (例外発生)
ReAct Agentの実行
question = "平均してトラックが最も長いジャンルを配信しているアーティストは?"
for step in agent.stream(
{"messages": [{"role": "user", "content": question}]},
stream_mode="values",
):
step["messages"][-1].pretty_print()
Human-in-the-loop
クエリ実行の前に人間による承認を挟む場合の実装例
from langchain.agents.middleware import HumanInTheLoopMiddleware
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.types import Command
# Agentにミドルウェアを追加します
agent = create_agent(
model,
tools,
system_prompt=system_prompt,
middleware=[
HumanInTheLoopMiddleware(
interrupt_on={"sql_db_query": True},
description_prefix="ツールの実行には承認が必要です。",
),
],
checkpointer=InMemorySaver(),
)
# クエリ実行前にエージェントを一時停止
question = "最も売り上げに貢献している従業員は?"
config = {"configurable": {"thread_id": "1"}}
for step in agent.stream(
{"messages": [{"role": "user", "content": question}]},
config,
stream_mode="values",
):
if "messages" in step:
step["messages"][-1].pretty_print()
elif "__interrupt__" in step:
print("INTERRUPTED:")
interrupt = step["__interrupt__"][0]
for request in interrupt.value["action_requests"]:
print(request["description"])
else:
pass
# ユーザーの許可を取得
def query_authorization(prompt="クエリを実行してもよいですか? yes(y) or no(n)"):
while True:
answer = input(prompt).strip().lower()
if answer in ("yes", "y"):
return {
"type": "approve",
}
elif answer in ("no", "n"):
return {
"type": "reject",
"message": "この操作はユーザーに拒否され中断しました。",
}
else:
print("Please enter 'yes' or 'no'.")
# 中断エージェントの再実行
for step in agent.stream(
Command(resume={"decisions": [query_authorization()]}),
config,
stream_mode="values",
):
if "messages" in step:
step["messages"][-1].pretty_print()
elif "__interrupt__" in step:
print("INTERRUPTED:")
interrupt = step["__interrupt__"][0]
for request in interrupt.value["action_requests"]:
print(request["description"])
else:
pass
[!NOTE] 参照サイト