データベーススキーマ
概要
mixseek-coreは、Leader Agentの実行履歴、Member Agent応答の集約結果、チーム評価結果をDuckDBデータベースに永続化します。
データベースの特徴
並列書き込み対応: DuckDBのMVCC(Multi-Version Concurrency Control)により、複数チームが同時実行してもロック競合なしでデータ保存が可能
JSON型サポート: Pydantic AIのMessage構造やリソース使用量をネイティブJSON型で保存
トランザクション管理: BEGIN/COMMIT/ROLLBACKによる安全なデータ永続化
データベースファイル配置
データベースファイルは環境変数MIXSEEK_WORKSPACEで指定されたディレクトリ配下に配置されます:
$MIXSEEK_WORKSPACE/
├── mixseek.db # DuckDBデータベースファイル
└── archive/ # Parquetエクスポート用ディレクトリ(将来実装)
環境変数の設定例:
export MIXSEEK_WORKSPACE=/path/to/workspace
Note
環境変数が未設定の場合、即座にエラー終了します(憲章Article 9: Data Accuracy Mandate準拠)。
テーブル定義
round_history テーブル
チームの各ラウンドにおけるMessage HistoryとMember Agent応答の集約結果を保存します。
仕様参照: specs/008-leader/spec.md FR-006, FR-007
スキーマ定義
カラム名 |
型 |
制約 |
|---|---|---|
|
INTEGER |
PRIMARY KEY, GENERATED BY DEFAULT AS IDENTITY |
|
TEXT |
NOT NULL |
|
TEXT |
NOT NULL |
|
TEXT |
NOT NULL |
|
INTEGER |
NOT NULL |
|
JSON |
- |
|
JSON |
- |
|
TIMESTAMP |
DEFAULT CURRENT_TIMESTAMP |
制約
-- 一意性制約(FR-008、execution_id統合後)
UNIQUE(execution_id, team_id, round_number)
同一実行・同一チーム・同一ラウンドの組み合わせは一意です。重複保存を試みた場合、UPSERT処理により最新データで上書きされます。
Note
execution_id導入の目的: 複数のオーケストレーション実行を識別し、同一実行に属するチーム結果をグループ化できます。これにより、異なる実行で同一チームが参加した場合も正しく区別できます。
カラム説明
id
説明: レコード一意識別子
発行元: DuckDBシーケンス
round_history_id_seq(自動採番)保存処理:
AggregationStore._init_tables_sync()(aggregation_store.py)
execution_id
説明: オーケストレーション実行識別子(UUID v4形式)
発行元:
OrchestratorTask.__init__()(models.py) -uuid4()で生成保存処理:
RoundController.run_round()→AggregationStore.save_aggregation()→AggregationStore._save_sync()(aggregation_store.py)
team_id
説明: チームの一意識別子
発行元: チーム設定TOML (
team_config.team_id)保存処理:
RoundController.run_round()(controller.py) →MemberSubmissionsRecord(models.py) →AggregationStore._save_sync()(aggregation_store.py)
team_name
説明: チーム名
発行元: チーム設定TOML (
team_config.team_name)保存処理:
RoundController.run_round()(controller.py) →MemberSubmissionsRecord(models.py) →AggregationStore._save_sync()(aggregation_store.py)
round_number
説明: ラウンド番号(1から開始)
発行元:
RoundController.__init__()(controller.py) - インスタンス化時に指定保存処理:
RoundController.run_round()(controller.py) →MemberSubmissionsRecord(models.py) →AggregationStore._save_sync()(aggregation_store.py)
message_history (JSON型)
説明: Pydantic AI Message History(Leader Agent実行時のメッセージ履歴)
発行元: Leader Agent実行結果 (
result.all_messages())保存処理:
RoundController.run_round()(controller.py) →AggregationStore.save_aggregation()→AggregationStore._save_sync()(aggregation_store.py)JSON変換:
to_jsonable_python(message_history)→json.dumps()
JSON構造:
[
{
"kind": "request",
"parts": [
{
"part_kind": "user-prompt",
"content": "データ分析を実行してください",
"timestamp": "2025-11-05T10:00:00Z"
}
]
},
{
"kind": "response",
"parts": [
{
"part_kind": "text",
"content": "分析結果:...",
"timestamp": "2025-11-05T10:00:15Z"
}
]
},
{
"kind": "request",
"parts": [
{
"part_kind": "tool-call",
"tool_name": "web-search",
"args": {"query": "AI trends 2025"}
}
]
},
{
"kind": "response",
"parts": [
{
"part_kind": "tool-return",
"tool_name": "web-search",
"content": "検索結果..."
}
]
}
]
Pydantic AIのModelMessage構造はkind(request/response)とparts配列を持ち、各partにはpart_kind(user-prompt/text/tool-call/tool-returnなど)が含まれます。
member_submissions_record (JSON型)
説明: Member Agent応答集約結果(全Member Agentの実行結果とメタデータ)
発行元:
RoundController.run_round()(controller.py) -MemberSubmissionsRecordインスタンス作成保存処理:
AggregationStore.save_aggregation()→AggregationStore._save_sync()(aggregation_store.py)JSON変換:
aggregated.model_dump(mode="json")→json.dumps()
JSON構造:
{
"execution_id": "550e8400-e29b-41d4-a716-446655440000",
"team_id": "team-001",
"team_name": "Alpha Team",
"round_number": 1,
"submissions": [
{
"agent_name": "web-search",
"agent_type": "system",
"content": "検索結果...",
"status": "SUCCESS",
"error_message": null,
"usage": {
"input_tokens": 50,
"cache_write_tokens": 0,
"cache_read_tokens": 0,
"output_tokens": 100,
"input_audio_tokens": 0,
"cache_audio_read_tokens": 0,
"output_audio_tokens": 0,
"details": {},
"requests": 1,
"tool_calls": 0
},
"timestamp": "2025-11-05T10:00:15Z",
"execution_time_ms": 2500.0,
"all_messages": null
}
],
"successful_submissions": [...],
"failed_submissions": [],
"total_count": 1,
"success_count": 1,
"failure_count": 0,
"total_usage": {
"input_tokens": 50,
"cache_write_tokens": 0,
"cache_read_tokens": 0,
"output_tokens": 100,
"input_audio_tokens": 0,
"cache_audio_read_tokens": 0,
"output_audio_tokens": 0,
"details": {},
"requests": 1,
"tool_calls": 0
}
}
Computed Fields: successful_submissions, failed_submissions, total_count, success_count, failure_count, total_usageはPydanticの@computed_fieldデコレータで動的計算され、JSON保存時にも含まれます(models.py:39-76)。
created_at
説明: レコード作成日時
発行元: DuckDB (
DEFAULT CURRENT_TIMESTAMP)保存処理:
AggregationStore._init_tables_sync()(aggregation_store.py) - DuckDBが自動設定
インデックス
-- execution_id + team_id + round_numberによる検索用
CREATE INDEX idx_round_history_execution
ON round_history(execution_id, team_id, round_number);
-- execution_idによるグループ化検索用
CREATE INDEX idx_round_history_execution_id
ON round_history(execution_id);
execution_idとチームIDによる高速検索を実現します。
leader_board テーブル
チームのSubmission評価結果とランキング情報を保存します。
仕様参照: specs/008-leader/spec.md FR-010, FR-011
スキーマ定義
カラム名 |
型 |
制約 |
|---|---|---|
|
INTEGER |
PRIMARY KEY, GENERATED BY DEFAULT AS IDENTITY |
|
TEXT |
NOT NULL |
|
TEXT |
NOT NULL |
|
TEXT |
NOT NULL |
|
INTEGER |
NOT NULL |
|
DOUBLE |
NOT NULL, CHECK (evaluation_score >= 0.0 AND evaluation_score <= 1.0) |
|
TEXT |
- |
|
TEXT |
NOT NULL |
|
TEXT |
DEFAULT ‘structured_json’ |
|
JSON |
- |
|
TIMESTAMP |
DEFAULT CURRENT_TIMESTAMP |
制約
-- 評価スコア範囲チェック(DB level validation)
CHECK (evaluation_score >= 0.0 AND evaluation_score <= 1.0)
評価スコアは0.0~1.0の範囲内である必要があります。範囲外の値を保存しようとした場合、データベースレベルでエラーが発生します(憲章Article 9準拠)。
Orchestrator実行との関係
Orchestrator実行時、各チームのRoundResultがこのテーブルに記録されます:
複数チーム記録: 並列実行された全チームの結果が個別に記録されます
スコアベース選択:
evaluation_scoreを基準に最高スコアチームが特定されますスコア表示: 内部は0.0-1.0スケールで記録、CLI表示時は0-100に変換されます
失敗チーム: 失敗したチームは記録されません(Orchestrator層で隔離)
詳細は Orchestratorガイド を参照してください。
カラム説明
id
説明: レコード一意識別子
発行元: DuckDBシーケンス
leader_board_id_seq(自動採番)保存処理:
AggregationStore._init_tables_sync()(aggregation_store.py)
execution_id
説明: オーケストレーション実行識別子(UUID v4形式)
発行元:
OrchestratorTask.__init__()(models.py) -uuid4()で生成保存処理:
RoundController.run_round()(controller.py) →AggregationStore.save_to_leader_board()→AggregationStore._save_to_leader_board_sync()(aggregation_store.py)
team_id
説明: チームの一意識別子
発行元: チーム設定TOML (
team_config.team_id)保存処理:
RoundController.run_round()(controller.py) →AggregationStore._save_to_leader_board_sync()(aggregation_store.py)
team_name
説明: チーム名
発行元: チーム設定TOML (
team_config.team_name)保存処理:
RoundController.run_round()(controller.py) →AggregationStore._save_to_leader_board_sync()(aggregation_store.py)
round_number
説明: ラウンド番号(現在は常に1)
発行元:
RoundController.__init__()(controller.py) - インスタンス化時に指定保存処理:
RoundController.run_round()(controller.py) →AggregationStore._save_to_leader_board_sync()(aggregation_store.py)
evaluation_score
説明: 評価スコア(0.0~1.0スケール、内部保存形式)
発行元:
Evaluator.evaluate()→EvaluationResult.overall_score(0-100スケール)スコア変換:
RoundController.run_round()(controller.py) -evaluation_result.overall_score / 100.0で0.0-1.0に変換保存処理:
AggregationStore.save_to_leader_board()→AggregationStore._save_to_leader_board_sync()(aggregation_store.py)バリデーション:
AggregationStore._save_to_leader_board_sync()が0.0-1.0範囲チェックを実施
Note
スコア変換の理由: Evaluatorは0-100スケールで生成しますが、データベースには0.0-1.0スケールで保存します。CLI表示時は再度100倍して0-100スケールに戻します。
evaluation_feedback
説明: 評価フィードバックコメント(複数メトリクスの統合テキスト)
発行元:
RoundController.run_round()(controller.py) -EvaluationResult.metricsから統合統合ロジック: 各メトリクスの
metric_name,score,evaluator_commentを改行結合保存処理:
AggregationStore._save_to_leader_board_sync()(aggregation_store.py)
フィードバック形式例:
Relevance (0.90): 高品質な情報が提供されています。
Coverage (0.85): 包括的な分析が含まれています。
Clarity (0.88): 明確で理解しやすい表現です。
submission_content
説明: Submissionの内容(Leader Agentの最終出力)
発行元: Leader Agent実行結果 (
result.output)保存処理:
RoundController.run_round()(controller.py) →AggregationStore._save_to_leader_board_sync()(aggregation_store.py)
submission_format
説明: Submissionの形式
発行元: DuckDB (
DEFAULT 'structured_json')保存処理:
AggregationStore._init_tables_sync()(aggregation_store.py) - DuckDBが自動設定注意: 現在の実装では常に ‘structured_json’ が設定されます
usage_info (JSON型)
説明: リソース使用量(簡略版、3フィールドのみ)
発行元: Leader Agent実行結果 (
result.usage()) - Pydantic AIRunUsage変換処理:
RoundController.run_round()(controller.py) - 辞書形式に変換(簡略版)保存処理:
AggregationStore._save_to_leader_board_sync()(aggregation_store.py)JSON変換:
json.dumps(usage_info, ensure_ascii=False)
JSON構造(簡略版):
{
"input_tokens": 450,
"output_tokens": 900,
"requests": 3
}
Note
簡略版の理由: Pydantic AIのRunUsageは10フィールドを持ちますが、現在の実装ではinput_tokens, output_tokens, requestsの3フィールドのみを保存しています(controller.py:132-136)。完全なRunUsage構造が必要な場合は、round_history.member_submissions_recordを参照してください。
created_at
説明: レコード作成日時
発行元: DuckDB (
DEFAULT CURRENT_TIMESTAMP)保存処理:
AggregationStore._init_tables_sync()(aggregation_store.py) - DuckDBが自動設定
インデックス
-- スコア順ランキング用
CREATE INDEX idx_leader_board_score
ON leader_board(evaluation_score DESC, created_at ASC);
-- execution_id別ランキング用
CREATE INDEX idx_leader_board_execution
ON leader_board(execution_id, evaluation_score DESC);
評価スコア降順、同スコアの場合は作成日時早い順でソートされます(FR-011準拠)。execution_idによるフィルタリングも高速化されます。
execution_summary テーブル
オーケストレーション実行の集約結果を保存します(Orchestrator統合、007-orchestration)。
仕様参照: specs/007-orchestration/spec.md
スキーマ定義
カラム名 |
型 |
制約 |
|---|---|---|
|
TEXT |
PRIMARY KEY |
|
TEXT |
NOT NULL |
|
TEXT |
NOT NULL, CHECK |
|
JSON |
NOT NULL |
|
INTEGER |
NOT NULL |
|
TEXT |
- |
|
DOUBLE |
- |
|
DOUBLE |
NOT NULL |
|
TIMESTAMP |
DEFAULT CURRENT_TIMESTAMP |
|
TIMESTAMP |
DEFAULT CURRENT_TIMESTAMP |
制約
-- ステータス値チェック
CHECK (status IN ('completed', 'partial_failure', 'failed'))
ステータス値の意味:
completed: 全チーム正常完了partial_failure: 一部チーム失敗、一部成功failed: 全チーム失敗
用途
オーケストレーション実行の履歴管理
複数実行の比較分析
実行統計の集計
execution_idによる全テーブルの関連付け
カラム説明
execution_id
説明: オーケストレーション実行識別子(UUID v4形式、プライマリキー)
発行元:
OrchestratorTask.__init__()(models.py) -uuid4()で生成保存処理:
Orchestrator.execute()(orchestrator.py) →AggregationStore.save_execution_summary()→AggregationStore._save_execution_summary_sync()(aggregation_store.py)
user_prompt
説明: ユーザプロンプト(オーケストレーション実行時のユーザ入力)
発行元:
Orchestrator.execute()(orchestrator.py) - メソッド引数として受け取り保存処理:
Orchestrator.execute()(orchestrator.py) →AggregationStore._save_execution_summary_sync()(aggregation_store.py)
status
説明: 実行ステータス(completed/partial_failure/failed)
発行元:
Orchestrator.execute()(orchestrator.py) - チーム成功/失敗状況から判定判定ロジック:
len(failed_teams_info) == 0→ “completed”(全チーム成功)len(team_results) == 0→ “failed”(全チーム失敗)その他 → “partial_failure”(一部チーム失敗)
保存処理:
AggregationStore._save_execution_summary_sync()(aggregation_store.py)バリデーション:
AggregationStore._save_execution_summary_sync()がCHECK制約に従う3値をバリデーション
team_results (JSON型)
説明: チーム結果リスト(全チームの
RoundResult配列)発行元:
Orchestrator.execute()(orchestrator.py) - 各RoundControllerからRoundResultを収集変換処理:
Orchestrator.execute()(orchestrator.py) -[result.model_dump(mode="json") for result in summary.team_results]保存処理:
AggregationStore._save_execution_summary_sync()(aggregation_store.py)JSON変換:
json.dumps(team_results, ensure_ascii=False)
JSON構造:
[
{
"execution_id": "550e8400-e29b-41d4-a716-446655440000",
"team_id": "team-001",
"team_name": "Alpha Team",
"round_number": 1,
"submission_content": "分析結果...",
"evaluation_score": 0.85,
"evaluation_feedback": "Relevance (0.90): 高品質...\nCoverage (0.85): 包括的...",
"usage": {
"input_tokens": 450,
"cache_write_tokens": 0,
"cache_read_tokens": 0,
"output_tokens": 900,
"input_audio_tokens": 0,
"cache_audio_read_tokens": 0,
"output_audio_tokens": 0,
"details": {},
"requests": 3,
"tool_calls": 0
},
"execution_time_seconds": 5.2,
"completed_at": "2025-11-05T10:05:00Z"
},
{
"execution_id": "550e8400-e29b-41d4-a716-446655440000",
"team_id": "team-002",
"team_name": "Beta Team",
"round_number": 1,
"submission_content": "調査結果...",
"evaluation_score": 0.78,
"evaluation_feedback": "Relevance (0.85): 良好...\nCoverage (0.75): 適切...",
"usage": {
"input_tokens": 320,
"cache_write_tokens": 0,
"cache_read_tokens": 0,
"output_tokens": 640,
"input_audio_tokens": 0,
"cache_audio_read_tokens": 0,
"output_audio_tokens": 0,
"details": {},
"requests": 2,
"tool_calls": 0
},
"execution_time_seconds": 4.8,
"completed_at": "2025-11-05T10:04:50Z"
}
]
各要素はRoundResultモデルの完全なJSON表現です(models.py:51-67)。
total_teams
説明: 総チーム数(設定ファイルで定義されたチーム数)
発行元:
Orchestrator.execute()(orchestrator.py) -len(self.config.teams)で算出保存処理:
AggregationStore._save_execution_summary_sync()(aggregation_store.py)
best_team_id
説明: 最高スコアチームID(
evaluation_score最大のチーム)発行元:
Orchestrator.execute()(orchestrator.py) -max(team_results, key=lambda r: r.evaluation_score)で特定保存処理:
AggregationStore._save_execution_summary_sync()(aggregation_store.py)NULL条件: チーム結果が空の場合(全チーム失敗)はNULL
best_score
説明: 最高評価スコア(0.0-1.0スケール)
発行元:
Orchestrator.execute()(orchestrator.py) - 最大evaluation_scoreを特定保存処理:
AggregationStore._save_execution_summary_sync()(aggregation_store.py)NULL条件: チーム結果が空の場合(全チーム失敗)はNULL
total_execution_time_seconds
説明: 総実行時間(秒、全チームの並列実行時間)
発行元:
Orchestrator.execute()(orchestrator.py) -time.time() - start_timeで計測計測範囲: 全チームの並列実行開始から完了まで(チーム個別の実行時間ではない)
保存処理:
AggregationStore._save_execution_summary_sync()(aggregation_store.py)
completed_at
説明: 実行完了日時
発行元: DuckDB (
DEFAULT CURRENT_TIMESTAMP)保存処理:
AggregationStore._init_tables_sync()(aggregation_store.py) - DuckDBが自動設定
created_at
説明: レコード作成日時
発行元: DuckDB (
DEFAULT CURRENT_TIMESTAMP)保存処理:
AggregationStore._init_tables_sync()(aggregation_store.py) - DuckDBが自動設定
クエリ例
Leader Boardランキング取得
評価スコア降順、同スコアは作成日時早い順でソート(FR-011準拠):
SELECT
team_name,
round_number,
evaluation_score,
evaluation_feedback,
created_at
FROM leader_board
ORDER BY evaluation_score DESC, created_at ASC
LIMIT 10;
Python実装: AggregationStore.get_leader_board(limit=10)
チーム統計集計
JSON内部クエリで総ラウンド数、平均スコア、総トークン使用量を集計(FR-013準拠):
SELECT
COUNT(*) as total_rounds,
AVG(evaluation_score) as avg_score,
MAX(evaluation_score) as best_score,
SUM(CAST(json_extract(usage_info, '$.input_tokens') AS INTEGER)) as total_input_tokens,
SUM(CAST(json_extract(usage_info, '$.output_tokens') AS INTEGER)) as total_output_tokens
FROM leader_board
WHERE team_id = 'team-001';
Python実装: AggregationStore.get_team_statistics(team_id="team-001")
結果例:
{
"total_rounds": 5,
"avg_score": 0.82,
"best_score": 0.95,
"total_input_tokens": 2250,
"total_output_tokens": 4500
}
ラウンド履歴読み込み
特定実行・特定チーム・特定ラウンドの履歴を取得:
SELECT member_submissions_record, message_history
FROM round_history
WHERE execution_id = ? AND team_id = ? AND round_number = ?;
Python実装:
AggregationStore.load_round_history(
execution_id="550e8400-e29b-41d4-a716-446655440000",
team_id="team-001",
round_number=1
)
戻り値:
(
MemberSubmissionsRecord(...), # 集約結果
[ModelMessage(...), ...] # Message History
)
レコードが存在しない場合は(None, [])を返します。
実行サマリー取得
特定のオーケストレーション実行の集約結果を取得:
SELECT *
FROM execution_summary
WHERE execution_id = '550e8400-e29b-41d4-a716-446655440000';
Python実装:
# ExecutionSummaryから直接取得可能
summary = orchestrator.execute(...)
print(f"Execution ID: {summary.execution_id}")
print(f"Best Team: {summary.best_team_id}")
UPSERT処理(重複保存時の上書き)
同一実行・同一チーム・同一ラウンド番号で複数回保存を試みた場合、最新データで上書き:
INSERT INTO round_history
(execution_id, team_id, team_name, round_number, message_history, member_submissions_record)
VALUES (?, ?, ?, ?, ?, ?)
ON CONFLICT (execution_id, team_id, round_number) DO UPDATE SET
message_history = EXCLUDED.message_history,
member_submissions_record = EXCLUDED.member_submissions_record;
Note
この動作はEdge Caseとして仕様で定義されています(spec.md L119)。execution_idの導入により、同一チームが異なる実行に参加した場合も正しく区別できます。
並列書き込み対応
MVCC並列制御
DuckDBのMVCC(Multi-Version Concurrency Control)により、複数チームが同時にデータベースに書き込んでもロック競合が発生しません(FR-009準拠)。
実装戦略:
スレッドローカルコネクション: 各スレッドが独立したDuckDBコネクションを保持
asyncio.to_thread: 同期APIを非同期実行に変換
トランザクション管理: BEGIN/COMMIT/ROLLBACKによる一貫性保証
パフォーマンス目標: 10チーム×5ラウンド=50件の並列書き込みが全て成功(SC-001)
エクスポネンシャルバックオフリトライ
データベース書き込み失敗時、1秒、2秒、4秒の間隔で最大3回リトライします(FR-019準拠)。
リトライ間隔:
1回目失敗 → 1秒待機 → 2回目試行
2回目失敗 → 2秒待機 → 3回目試行
3回目失敗 → 4秒待機 → 4回目試行
4回目失敗 →
DatabaseWriteErrorを送出
3回リトライ失敗後は即座にエラー終了し、詳細ログを出力します(FR-020準拠)。
トランザクション管理
すべての書き込み操作はトランザクション内で実行されます(FR-015準拠)。
実装:
with self._transaction(conn):
conn.execute("INSERT INTO ...")
# 成功 → COMMIT
# 例外発生 → ROLLBACK
保証:
データ一貫性: 部分的な書き込みは発生しない
エラー安全性: 例外発生時は自動的にROLLBACK
関連ドキュメント
仕様書: specs/008-leader/spec.md
契約定義: specs/008-leader/contracts/aggregation_store.md
注意事項
環境変数必須
MIXSEEK_WORKSPACE環境変数が未設定の場合、データベース初期化時に即座にエラー終了します:
raise EnvironmentError(
"MIXSEEK_WORKSPACE environment variable is not set.\n"
"Please set it: export MIXSEEK_WORKSPACE=/path/to/workspace"
)
これは憲章Article 9(Data Accuracy Mandate)に準拠した設計です。暗黙的なデフォルト値やフォールバックは一切提供されません。
評価スコア範囲
evaluation_scoreは0.0~1.0の範囲内である必要があります。範囲外の値を保存しようとした場合:
raise ValueError(
f"evaluation_score must be between 0.0 and 1.0, got {evaluation_score}. "
"This violates the contract specification."
)
データベースレベルのCHECK制約も設定されており、二重の検証により契約違反を防止します。