← 返回
未分类 Key 中文

Rag Retrieve

Performs domain- and version-aware hybrid vector+BM25+metadata search on TiDB, with multi-query expansion and optional chain-of-retrieval for multi-hop queries.
在 TiDB 上执行域和版本感知的混合向量+BM25+元数据检索,支持多查询扩展及可选的多跳检索链。
stephenlthorn stephenlthorn 来源
未分类 clawhub v2.0.0 1 版本 99722.2 Key: 需要
★ 0
Stars
📥 359
下载
💾 0
安装
1
版本
#latest

概述

RAG Retrieve

Architecture

Query
  ↓
Multi-query expansion (3-5 variants via Gemma 4)
  ↓
Parallel vector + BM25 + metadata search in TiDB
  ↓
Cross-encoder rerank (bge-reranker-v2-m3, local)
  ↓
Top-K chunks
  ↓ (if use_corag)
CoRAG controller: reason → retrieve again → accumulate
  ↓
Pack into context budget

TiDB schema

CREATE TABLE knowledge_chunks (
    id BIGINT AUTO_RANDOM PRIMARY KEY,
    chunk_id VARCHAR(64) NOT NULL UNIQUE,
    content TEXT NOT NULL,
    source TEXT NOT NULL,  -- 'apple_docs' | 'wwdc' | 'user_codebase' | 'blog' | etc.
    domain VARCHAR(32) NOT NULL,  -- 'ios' | 'web' | 'python' | 'trading' | 'vc'

    -- iOS-specific metadata
    ios_version_min VARCHAR(16),  -- '18.0', '26.0', NULL if not iOS
    ios_version_max VARCHAR(16),  -- null if current
    swift_version VARCHAR(16),
    framework_tags JSON,  -- ["SwiftData", "CloudKit"]
    deprecated BOOLEAN DEFAULT FALSE,
    deprecated_in VARCHAR(16),

    -- Chunking metadata
    chunk_type VARCHAR(32),  -- 'api_doc' | 'code_pattern' | 'user_code' | 'concept'
    parent_doc_id VARCHAR(64),
    chunk_index INT,

    -- Retrieval signals
    embedding VECTOR(1536),  -- voyage-code-3 or nomic-embed-text
    last_verified DATE,  -- for freshness filtering

    -- Full-text search
    FULLTEXT KEY content_fts (content) WITH PARSER NGRAM,
    VECTOR INDEX idx_embedding (embedding) USING HNSW,

    KEY idx_domain (domain),
    KEY idx_ios_version (ios_version_min, ios_version_max),
    KEY idx_framework (cast(framework_tags as char(128) array))
);

Hybrid search query

HYBRID_QUERY = """
SELECT
    chunk_id,
    content,
    source,
    domain,
    ios_version_min,
    ios_version_max,
    framework_tags,
    chunk_type,
    (
        0.55 * (1 - VEC_COSINE_DISTANCE(embedding, %(query_vec)s))
        + 0.30 * MATCH(content) AGAINST(%(query_text)s IN NATURAL LANGUAGE MODE) / 10
        + 0.15 * %(metadata_boost)s
    ) AS relevance
FROM knowledge_chunks
WHERE
    domain IN %(allowed_domains)s
    AND deprecated = FALSE
    AND (
        ios_version_min IS NULL
        OR ios_version_min <= %(target_ios)s
    )
    AND (
        ios_version_max IS NULL
        OR ios_version_max >= %(target_ios)s
    )
    AND (
        %(frameworks)s IS NULL
        OR JSON_OVERLAPS(framework_tags, %(frameworks)s)
    )
ORDER BY relevance DESC
LIMIT %(limit)s
"""

Execution

import asyncio
import json
from typing import List, Dict, Optional
import numpy as np
from pathlib import Path


async def rag_retrieve(
    query: str,
    domain: str = "general",
    ios_version: Optional[str] = None,
    frameworks: Optional[List[str]] = None,
    top_k: int = 10,
    use_corag: bool = False,
    user_codebase: bool = True,
):
    # 1. Multi-query expansion
    expansions = await _expand_query(query, domain, frameworks)

    # 2. Embed all variants
    query_vectors = await _embed_batch(expansions)

    # 3. Parallel hybrid search for each expansion
    allowed_domains = [domain, "general"]
    if user_codebase:
        allowed_domains.append("user_codebase")

    all_results = []
    for variant_text, variant_vec in zip(expansions, query_vectors):
        results = await _hybrid_search(
            query_text=variant_text,
            query_vec=variant_vec,
            allowed_domains=allowed_domains,
            target_ios=ios_version or "99.0",
            frameworks=frameworks,
            limit=top_k * 2,  # overfetch, dedup later
        )
        all_results.extend(results)

    # 4. Deduplicate by chunk_id
    unique = {}
    for r in all_results:
        cid = r["chunk_id"]
        if cid not in unique or r["relevance"] > unique[cid]["relevance"]:
            unique[cid] = r
    all_unique = list(unique.values())

    # 5. Cross-encoder rerank
    reranked = await _rerank(query, all_unique, top_k=top_k * 2)

    # 6. Optional CoRAG
    corag_trace = None
    if use_corag:
        final, corag_trace = await _corag_controller(
            query, reranked[:top_k], top_k, domain, ios_version, frameworks
        )
    else:
        final = reranked[:top_k]

    # 7. Estimate tokens
    total_tokens = sum(len(c["content"].split()) * 1.3 for c in final)

    return {
        "chunks": final,
        "total_tokens": int(total_tokens),
        "query_expansions": expansions,
        "corag_trace": corag_trace,
    }


# ── Multi-query expansion ────────────────────────────────────────────────────

async def _expand_query(query, domain, frameworks):
    """Generate 3-5 semantic variants of the query."""
    framework_hint = ", ".join(frameworks) if frameworks else "none specified"

    prompt = f"""Given this {domain} query, produce 4 semantic variants that
would retrieve complementary information.

Query: {query}
Frameworks: {framework_hint}

Variants should:
- Ask the same underlying question from different angles
- Use different vocabulary (if query says "how to do X", add "implementation
  of X", "example of X", "best practices for X")
- Include specific technical terms the original may have omitted

Output as JSON array of 4 strings. Include the original as the first entry.
Only the JSON, no explanation."""

    response = await llm.generate(
        prompt=prompt,
        model="gemma-4-26b-moe",  # fast router model handles this well
        temperature=0.3,
        max_tokens=400
    )

    try:
        variants = json.loads(response.strip().strip("`").strip("json").strip())
        return variants[:5]
    except Exception:
        # Fallback: just use the original query
        return [query]


# ── Embedding ────────────────────────────────────────────────────────────────

async def _embed_batch(texts):
    """Embed via local MLX nomic-embed-text or bge-m3."""
    import mlx.core as mx
    from mlx_lm import load

    # Assumes an embedding model is loaded globally
    # In practice, run a persistent MLX server and HTTP call it
    response = await shell.run(
        f"curl -X POST http://localhost:8888/embed "
        f"-H 'Content-Type: application/json' "
        f"-d '{json.dumps({\"texts\": texts})}'"
    )
    return json.loads(response.stdout)["embeddings"]


# ── TiDB hybrid search ───────────────────────────────────────────────────────

async def _hybrid_search(query_text, query_vec, allowed_domains,
                         target_ios, frameworks, limit):
    """Execute the hybrid query against TiDB."""
    from pymysql import connect

    conn = connect(
        host="127.0.0.1",
        port=4000,
        user="root",
        database="openclaw_rag",
    )

    cursor = conn.cursor(dictionary=True)

    # Convert embedding vector to TiDB VECTOR literal
    query_vec_str = "[" + ",".join(f"{v:.6f}" for v in query_vec) + "]"

    # Metadata boost calculation (domain match = 0.2, chunk_type match = 0.1)
    metadata_boost = 0.2 if len(allowed_domains) == 1 else 0.1

    params = {
        "query_vec": query_vec_str,
        "query_text": query_text,
        "metadata_boost": metadata_boost,
        "allowed_domains": tuple(allowed_domains),
        "target_ios": target_ios or "99.0",
        "frameworks": json.dumps(frameworks) if frameworks else None,
        "limit": limit,
    }

    cursor.execute(HYBRID_QUERY, params)
    results = cursor.fetchall()
    conn.close()

    return results


# ── Cross-encoder rerank ─────────────────────────────────────────────────────

async def _rerank(query, candidates, top_k):
    """Rerank candidates using bge-reranker-v2-m3 (local)."""
    if not candidates:
        return []

    pairs = [(query, c["content"][:512]) for c in candidates]

    # Call local reranker HTTP endpoint
    response = await shell.run(
        f"curl -X POST http://localhost:8889/rerank "
        f"-H 'Content-Type: application/json' "
        f"-d '{json.dumps({\"pairs\": pairs})}'"
    )
    scores = json.loads(response.stdout)["scores"]

    # Attach scores and sort
    scored = [
        {**c, "rerank_score": scores[i]}
        for i, c in enumerate(candidates)
    ]
    scored.sort(key=lambda x: x["rerank_score"], reverse=True)

    return scored[:top_k]


# ── CoRAG controller ─────────────────────────────────────────────────────────

async def _corag_controller(query, initial_chunks, top_k, domain,
                           ios_version, frameworks, max_hops=3):
    """
    Chain-of-retrieval for multi-hop questions.

    After first retrieval, ask the model: "do we have enough context to
    answer? if not, what follow-up would you retrieve?" Then retrieve that,
    accumulate, repeat.
    """
    trace = [{"hop": 0, "query": query, "retrieved_ids": [c["chunk_id"] for c in initial_chunks]}]
    accumulated = list(initial_chunks)

    for hop in range(1, max_hops + 1):
        # Ask the model if we need more
        ctx_summary = "\n\n".join(
            f"[{i+1}] {c['content'][:300]}"
            for i, c in enumerate(accumulated[:15])
        )

        decision_prompt = f"""Original question: {query}

Retrieved context so far:
{ctx_summary}

Question: Is the above context sufficient to answer the original question?

Respond with JSON:
- If sufficient: {{"sufficient": true}}
- If not: {{"sufficient": false, "follow_up_query": "specific follow-up question"}}

Only the JSON."""

        response = await llm.generate(
            prompt=decision_prompt,
            model="m27-jangtq-crack",
            temperature=0.1,
            max_tokens=200
        )

        try:
            decision = json.loads(response.strip().strip("`").strip("json").strip())
        except Exception:
            break  # Give up on CoRAG, use what we have

        if decision.get("sufficient"):
            break

        follow_up = decision.get("follow_up_query")
        if not follow_up:
            break

        # Retrieve for follow-up
        follow_up_vec = (await _embed_batch([follow_up]))[0]
        new_results = await _hybrid_search(
            query_text=follow_up,
            query_vec=follow_up_vec,
            allowed_domains=[domain, "general"],
            target_ios=ios_version or "99.0",
            frameworks=frameworks,
            limit=5,
        )

        # Dedup against what we already have
        existing_ids = {c["chunk_id"] for c in accumulated}
        new_unique = [r for r in new_results if r["chunk_id"] not in existing_ids]

        accumulated.extend(new_unique)
        trace.append({
            "hop": hop,
            "query": follow_up,
            "retrieved_ids": [c["chunk_id"] for c in new_unique]
        })

        if not new_unique:
            break  # No new info, stop

    # Final rerank against original query
    final = await _rerank(query, accumulated, top_k=top_k)
    return final, trace

Ingestion (one-time setup)

You need to ingest your corpus into TiDB. Key sources:

# From the training data collection script (collect_training_data.py):
# Use the same scraped docs but now embed and ingest

async def ingest_corpus(corpus_dir: Path):
    for jsonl_file in corpus_dir.rglob("*.jsonl"):
        with open(jsonl_file) as f:
            for line in f:
                doc = json.loads(line)
                # Chunk the document
                chunks = _chunk_document(doc)
                # Embed each chunk
                for chunk in chunks:
                    chunk["embedding"] = await _embed_batch([chunk["content"]])[0]
                    # Extract iOS version, frameworks from doc metadata
                    chunk["ios_version_min"] = doc.get("ios_version")
                    chunk["framework_tags"] = doc.get("frameworks", [])
                # Insert batch into TiDB
                await _insert_chunks(chunks)


def _chunk_document(doc):
    """Code-aware chunking via tree-sitter for Swift/Python, semantic for prose."""
    if doc.get("source") == "user_codebase" and doc.get("language") == "swift":
        return _chunk_swift_tree_sitter(doc["text"])
    if doc.get("source") == "user_codebase" and doc.get("language") == "python":
        return _chunk_python_ast(doc["text"])
    # Default: semantic chunking at heading/paragraph boundaries
    return _chunk_semantic(doc["text"], max_tokens=500)


def _chunk_swift_tree_sitter(source):
    """Use tree-sitter-swift to preserve function/struct/extension boundaries."""
    import tree_sitter_swift as tsswift
    import tree_sitter as ts

    parser = ts.Parser(ts.Language(tsswift.language()))
    tree = parser.parse(source.encode())

    chunks = []
    # Walk the tree, extract top-level declarations with their docs
    for node in tree.root_node.children:
        if node.type in ("function_declaration", "class_declaration",
                         "struct_declaration", "extension_declaration",
                         "enum_declaration", "actor_declaration"):
            # Include preceding comments
            start = node.start_byte
            prev = node.prev_sibling
            if prev and prev.type == "comment":
                start = prev.start_byte
            chunk_text = source[start:node.end_byte]
            chunks.append({"content": chunk_text, "chunk_type": "code_declaration"})

    return chunks

Performance expectations

On M4 Max 128GB with TiDB running locally:

  • Multi-query expansion: ~200ms (Gemma 4 26B on MBP M1)
  • Embedding 4 query variants: ~100ms (nomic-embed-text local)
  • TiDB hybrid search: ~50-200ms depending on corpus size
  • Cross-encoder rerank of 40 candidates: ~400ms (bge-reranker-v2-m3 local)
  • CoRAG 1 hop: +500ms (LLM decision + retrieval)

Total: ~1-2 seconds for non-CoRAG, 2-4 seconds with CoRAG.

Context budgeting

Returns chunks with total_tokens so caller can make intelligent truncation

decisions. With M2.7 JANGTQ-CRACK at 180K context, you can easily fit 30K+

tokens of RAG context. But more isn't always better — the cross-encoder

rerank already picks the best chunks. Default top_k=10 is a reasonable

sweet spot.

For deep reasoning tasks where retrieval quality matters most: top_k=5,

use_corag=true.

For broad research where recall matters: top_k=20, use_corag=false.

Integration

Called by:

  • coding-orchestrator step 3
  • decompose-plan to enrich planning context
  • Standalone via "find docs about X"

版本历史

共 1 个版本

  • v2.0.0 当前
    2026-05-07 14:40 安全 安全

安全检测

腾讯云安全 (Keen)

安全,无风险
查看报告

腾讯云安全 (Sanbu)

安全,无风险
查看报告

🔗 相关推荐

Reflect Critique Revise

stephenlthorn
执行多轮高级工程师的代码审查与修改,通过捕获错误、API误用和风格问题,提高代码质量,涵盖iOS等领域。
★ 0 📥 437

Decompose Plan

stephenlthorn
Forces M2.7 to produce an explicit structured plan before writing code. This makes Tree-of-Thought reasoning explicit in
★ 0 📥 430

Route Specialist

stephenlthorn
Classifies tasks by domain using deterministic and LLM methods, then routes to specialized prompts with tuned models and
★ 0 📥 411