07. 查询转换
通过智能的查询重写和扩展技术,提高检索的准确性和召回率。
学习目标
- 理解查询转换的必要性
- 掌握多种查询重写策略
- 学习查询扩展技术
- 实现自适应查询优化
转换策略
1. 查询重写
# 原始查询
original_query = "What is AI?"
# 重写后的查询
rewritten_queries = [
"What is artificial intelligence?",
"Define artificial intelligence",
"Explain AI technology and applications"
]
2. 查询分解
复杂查询分解为多个子查询:
graph TD
A[复杂查询] --> B[子查询1]
A --> C[子查询2]
A --> D[子查询3]
B --> E[检索结果1]
C --> F[检索结果2]
D --> G[检索结果3]
E --> H[结果融合]
F --> H
G --> H
核心技术
- 同义词扩展
- 上下文感知重写
- 意图识别和转换
- 多语言查询处理
开发中
查询转换的详细实现即将发布!
下一步
学习 重排序 技术,优化检索结果的排序。
查询转换
查询转换是 RAG 系统中的一项关键技术,通过重写、扩展或分解用户查询来提高检索效果。当用户的原始查询表达不够清晰或过于简单时,查询转换能够生成更有效的搜索查询。
核心思想
原始用户查询往往存在以下问题:
- 表达模糊:缺乏具体的关键词
- 过于简单:信息不足以精确检索
- 复杂多面:包含多个子问题
查询转换通过以下三种方法解决这些问题:
1. 查询重写 (Query Rewriting)
原始查询: "AI的影响"
重写查询: "人工智能对现代社会经济、就业和技术发展的具体影响和挑战"
2. 步退提示 (Step-back Prompting)
原始查询: "如何训练BERT模型?"
步退查询: "深度学习中的自然语言处理模型训练基础知识"
3. 子查询分解 (Sub-query Decomposition)
原始查询: "AI对就业的影响"
子查询:
1. "人工智能如何改变工作岗位?"
2. "哪些行业受AI自动化影响最大?"
3. "AI创造了哪些新的就业机会?"
4. "如何应对AI带来的就业挑战?"
技术优势
🎯 提高检索精度
- 更具体的查询产生更相关的结果
- 减少因表达不当导致的检索失败
🔍 扩展检索范围
- 步退查询提供更广泛的背景信息
- 子查询覆盖问题的不同方面
💡 适应不同查询类型
- 针对不同复杂度的查询使用不同策略
- 提高系统的灵活性和适用性
完整代码实现
import fitz
import os
import numpy as np
import json
from openai import OpenAI
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
# 初始化OpenAI客户端
client = OpenAI(
base_url="https://openrouter.ai/api/v1",
api_key=os.getenv("OPENROUTER_API_KEY")
)
def rewrite_query(original_query, model="meta-llama/Llama-3.2-3B-Instruct"):
"""
重写查询使其更具体和详细
Args:
original_query (str): 原始用户查询
model (str): 用于查询重写的模型
Returns:
str: 重写后的查询
"""
system_prompt = "你是一个专门改进搜索查询的AI助手。你的任务是将用户查询重写得更具体、详细,更有可能检索到相关信息。"
user_prompt = f"""
将以下查询重写得更具体和详细。包含相关术语和概念,这些可能有助于检索准确信息。
原始查询: {original_query}
重写查询:
"""
response = client.chat.completions.create(
model=model,
temperature=0.0, # 低温度确保输出确定性
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
]
)
return response.choices[0].message.content.strip()
def generate_step_back_query(original_query, model="meta-llama/Llama-3.2-3B-Instruct"):
"""
生成更通用的'步退'查询以检索更广泛的上下文
Args:
original_query (str): 原始用户查询
model (str): 用于步退查询生成的模型
Returns:
str: 步退查询
"""
system_prompt = "你是一个专门研究搜索策略的AI助手。你的任务是为特定查询生成更广泛、更通用的版本,以检索有用的背景信息。"
user_prompt = f"""
为以下查询生成一个更广泛、更通用的版本,这可以帮助检索有用的背景信息。
原始查询: {original_query}
步退查询:
"""
response = client.chat.completions.create(
model=model,
temperature=0.1, # 稍高温度增加一些变化
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
]
)
return response.choices[0].message.content.strip()
def decompose_query(original_query, num_subqueries=4, model="meta-llama/Llama-3.2-3B-Instruct"):
"""
将复杂查询分解为更简单的子查询
Args:
original_query (str): 原始复杂查询
num_subqueries (int): 要生成的子查询数量
model (str): 用于查询分解的模型
Returns:
List[str]: 更简单的子查询列表
"""
system_prompt = "你是一个专门分解复杂问题的AI助手。你的任务是将复杂查询分解为更简单的子问题,这些子问题的答案结合起来可以解决原始查询。"
user_prompt = f"""
将以下复杂查询分解为{num_subqueries}个更简单的子查询。每个子查询应关注原始问题的不同方面。
原始查询: {original_query}
生成{num_subqueries}个子查询,每行一个,格式如下:
1. [第一个子查询]
2. [第二个子查询]
以此类推...
"""
response = client.chat.completions.create(
model=model,
temperature=0.2, # 稍高温度增加一些变化
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
]
)
# 处理响应以提取子查询
content = response.choices[0].message.content.strip()
# 使用简单解析提取编号查询
lines = content.split("\n")
sub_queries = []
for line in lines:
if line.strip() and any(line.strip().startswith(f"{i}.") for i in range(1, 10)):
# 移除编号和前导空格
query = line.strip()
query = query[query.find(".")+1:].strip()
sub_queries.append(query)
return sub_queries
def extract_text_from_pdf(pdf_path):
"""
从PDF文件中提取文本
"""
mypdf = fitz.open(pdf_path)
all_text = ""
for page_num in range(mypdf.page_count):
page = mypdf[page_num]
text = page.get_text("text")
all_text += text
return all_text
def chunk_text(text, n=1000, overlap=200):
"""
将文本分割成重叠的块
"""
chunks = []
for i in range(0, len(text), n - overlap):
chunks.append(text[i:i + n])
return chunks
def create_embeddings(text, model="BAAI/bge-base-en-v1.5"):
"""
为给定文本创建嵌入向量
"""
embedding_model = HuggingFaceEmbedding(model_name=model)
if isinstance(text, list):
response = embedding_model.get_text_embedding_batch(text)
else:
response = embedding_model.get_text_embedding(text)
return response
class SimpleVectorStore:
"""
简单的向量存储实现
"""
def __init__(self):
self.vectors = []
self.documents = []
self.metadata = []
def add_documents(self, documents, vectors=None, metadata=None):
"""
向向量存储添加文档
"""
if vectors is None:
vectors = [None] * len(documents)
if metadata is None:
metadata = [{} for _ in range(len(documents))]
for doc, vec, meta in zip(documents, vectors, metadata):
self.documents.append(doc)
self.vectors.append(vec)
self.metadata.append(meta)
def search(self, query_vector, top_k=5):
"""
搜索最相似的文档
"""
if not self.vectors or not self.documents:
return []
query_array = np.array(query_vector)
# 计算相似度
similarities = []
for i, vector in enumerate(self.vectors):
if vector is not None:
similarity = np.dot(query_array, vector) / (
np.linalg.norm(query_array) * np.linalg.norm(vector)
)
similarities.append((i, similarity))
# 按相似度降序排序
similarities.sort(key=lambda x: x[1], reverse=True)
# 获取top-k结果
results = []
for i, score in similarities[:top_k]:
results.append({
"document": self.documents[i],
"score": float(score),
"metadata": self.metadata[i]
})
return results
def process_document(pdf_path, chunk_size=1000, chunk_overlap=200):
"""
处理文档用于RAG
"""
print("正在从PDF提取文本...")
text = extract_text_from_pdf(pdf_path)
print("正在分块文本...")
chunks = chunk_text(text, chunk_size, chunk_overlap)
print(f"创建了 {len(chunks)} 个文本块")
print("正在为块创建嵌入...")
chunk_embeddings = create_embeddings(chunks)
# 创建简单向量存储
store = SimpleVectorStore()
# 添加每个块及其嵌入到向量存储
metadata = [{"chunk_index": i, "source": pdf_path} for i in range(len(chunks))]
store.add_documents(chunks, chunk_embeddings, metadata)
print(f"已将 {len(chunks)} 个块添加到向量存储")
return store
def transformed_search(query, vector_store, transformation_type, top_k=3):
"""
使用查询转换执行搜索
Args:
query (str): 原始查询
vector_store: 向量存储
transformation_type (str): 转换类型 ('rewrite', 'step_back', 'decompose')
top_k (int): 返回的结果数
Returns:
dict: 包含转换后查询和搜索结果的字典
"""
if transformation_type == "rewrite":
transformed_query = rewrite_query(query)
print(f"重写查询: {transformed_query}")
# 执行搜索
query_embedding = create_embeddings(transformed_query)
results = vector_store.search(query_embedding, top_k)
return {
"transformation_type": "query_rewrite",
"original_query": query,
"transformed_query": transformed_query,
"results": results
}
elif transformation_type == "step_back":
step_back_query = generate_step_back_query(query)
print(f"步退查询: {step_back_query}")
# 为原始查询和步退查询都执行搜索
original_embedding = create_embeddings(query)
step_back_embedding = create_embeddings(step_back_query)
original_results = vector_store.search(original_embedding, top_k//2 + 1)
step_back_results = vector_store.search(step_back_embedding, top_k//2 + 1)
# 合并结果,优先原始查询结果
combined_results = original_results + step_back_results
# 去重(基于文档内容)
seen_docs = set()
unique_results = []
for result in combined_results:
doc_hash = hash(result["document"][:100]) # 使用前100个字符作为唯一标识
if doc_hash not in seen_docs:
seen_docs.add(doc_hash)
unique_results.append(result)
if len(unique_results) >= top_k:
break
return {
"transformation_type": "step_back",
"original_query": query,
"step_back_query": step_back_query,
"results": unique_results
}
elif transformation_type == "decompose":
sub_queries = decompose_query(query, num_subqueries=4)
print(f"子查询:")
for i, sq in enumerate(sub_queries, 1):
print(f" {i}. {sq}")
# 为每个子查询执行搜索
all_results = []
for sub_query in sub_queries:
sub_embedding = create_embeddings(sub_query)
sub_results = vector_store.search(sub_embedding, top_k//len(sub_queries) + 1)
all_results.extend(sub_results)
# 去重和排序
seen_docs = set()
unique_results = []
for result in sorted(all_results, key=lambda x: x["score"], reverse=True):
doc_hash = hash(result["document"][:100])
if doc_hash not in seen_docs:
seen_docs.add(doc_hash)
unique_results.append(result)
if len(unique_results) >= top_k:
break
return {
"transformation_type": "decompose",
"original_query": query,
"sub_queries": sub_queries,
"results": unique_results
}
else:
# 默认:无转换
query_embedding = create_embeddings(query)
results = vector_store.search(query_embedding, top_k)
return {
"transformation_type": "none",
"original_query": query,
"results": results
}
def generate_response(query, context, model="meta-llama/Llama-3.2-3B-Instruct"):
"""
基于上下文生成回答
"""
system_prompt = "你是一个AI助手,严格基于给定的上下文回答问题。如果无法从提供的上下文中得出答案,请回答:'我没有足够的信息来回答这个问题。'"
user_prompt = f"""
上下文:
{context}
问题: {query}
请基于以上上下文回答问题。
"""
response = client.chat.completions.create(
model=model,
temperature=0,
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
]
)
return response.choices[0].message.content
## 实际应用示例
```python
# 演示查询转换方法
original_query = "AI对就业自动化和就业的影响是什么?"
print("原始查询:", original_query)
print("="*60)
# 1. 查询重写
print("\n1. 查询重写:")
rewritten_query = rewrite_query(original_query)
print(f"重写后: {rewritten_query}")
# 2. 步退提示
print("\n2. 步退提示:")
step_back_query = generate_step_back_query(original_query)
print(f"步退查询: {step_back_query}")
# 3. 子查询分解
print("\n3. 子查询分解:")
sub_queries = decompose_query(original_query, num_subqueries=4)
for i, query in enumerate(sub_queries, 1):
print(f" {i}. {query}")
print("\n" + "="*60)
# 处理文档并建立向量存储
pdf_path = "data/AI_Information.pdf"
vector_store = process_document(pdf_path)
# 加载测试查询
with open('data/val.json') as f:
data = json.load(f)
test_query = data[0]['question']
print(f"\n测试查询: {test_query}")
# 比较不同转换方法的效果
transformation_types = ["none", "rewrite", "step_back", "decompose"]
results = {}
for transform_type in transformation_types:
print(f"\n{'='*20} {transform_type.upper()} {'='*20}")
if transform_type == "none":
# 无转换的基准测试
query_embedding = create_embeddings(test_query)
search_results = vector_store.search(query_embedding, 3)
results[transform_type] = {
"transformation_type": "none",
"original_query": test_query,
"results": search_results
}
else:
# 使用查询转换
results[transform_type] = transformed_search(
test_query,
vector_store,
transform_type,
top_k=3
)
# 显示结果
result_data = results[transform_type]
print(f"检索到 {len(result_data['results'])} 个结果")
for i, result in enumerate(result_data['results'], 1):
print(f"\n结果 {i} (相似度: {result['score']:.4f}):")
print(f"{result['document'][:200]}...")
# 为每种方法生成回答
print(f"\n{'='*60}")
print("生成的回答比较:")
print("="*60)
for transform_type, result_data in results.items():
print(f"\n【{transform_type.upper()}方法】")
# 准备上下文
context = "\n\n".join([
f"段落{i+1}: {result['document']}"
for i, result in enumerate(result_data['results'])
])
# 生成回答
response = generate_response(test_query, context)
print(f"回答: {response}")
print("-" * 40)
核心技术解析
1. 查询重写策略
查询重写的核心是将模糊或简单的查询转换为更具体、信息更丰富的版本:
def advanced_query_rewrite(query, domain="general"):
"""
高级查询重写,考虑领域特定性
"""
if domain == "technical":
system_prompt = """
你是技术文档搜索专家。重写查询时:
1. 包含相关的技术术语
2. 明确指定技术层面和实现方式
3. 考虑不同的技术方案和方法
"""
elif domain == "academic":
system_prompt = """
你是学术研究搜索专家。重写查询时:
1. 使用学术术语和概念
2. 考虑理论框架和实证研究
3. 包含相关的研究方法和指标
"""
else:
system_prompt = "你是通用搜索查询优化专家..."
# 执行重写逻辑
pass
2. 步退查询的层次设计
步退查询通过提供更广泛的上下文来增强特定查询:
def hierarchical_step_back(query, levels=2):
"""
多层次步退查询
"""
step_back_queries = [query] # 从原始查询开始
current_query = query
for level in range(levels):
# 每次步退都扩大范围
current_query = generate_step_back_query(current_query)
step_back_queries.append(current_query)
return step_back_queries
# 示例:
# Level 0: "如何优化BERT模型?"
# Level 1: "Transformer模型的优化技术"
# Level 2: "深度学习模型优化的基本原理"
3. 智能子查询分解
子查询分解将复杂问题拆分为可独立回答的简单问题:
def intelligent_decomposition(complex_query):
"""
智能分解策略
"""
# 分析查询复杂度
complexity_score = analyze_query_complexity(complex_query)
if complexity_score < 3:
return [complex_query] # 不需要分解
elif complexity_score < 6:
return decompose_query(complex_query, num_subqueries=3)
else:
return decompose_query(complex_query, num_subqueries=5)
def analyze_query_complexity(query):
"""
分析查询复杂度
"""
factors = {
'length': len(query.split()),
'conjunctions': query.count('和') + query.count('or') + query.count('以及'),
'questions': query.count('?') + query.count('如何') + query.count('什么'),
'aspects': query.count('影响') + query.count('关系') + query.count('比较')
}
# 计算复杂度分数
complexity = (
factors['length'] * 0.1 +
factors['conjunctions'] * 2 +
factors['questions'] * 1.5 +
factors['aspects'] * 1.8
)
return complexity
效果对比分析
查询转换前后的检索效果
原始查询: "机器学习算法"
转换方法 | 转换后查询 | 检索精度 | 相关性 |
---|---|---|---|
无转换 | "机器学习算法" | 60% | 低 |
查询重写 | "监督学习、无监督学习和强化学习算法的原理、应用和性能比较" | 85% | 高 |
步退查询 | "人工智能和数据科学中的算法基础" | 75% | 中 |
子查询分解 | 1."什么是监督学习算法?" 2."无监督学习有哪些算法?" 3."强化学习算法如何工作?" | 80% | 高 |
适用场景分析
查询重写 - 最适合的场景
- 模糊查询: "AI 的应用" → "人工智能在医疗、金融、教育等领域的具体应用案例"
- 技术概念: "深度学习" → "神经网络、反向传播、梯度下降等深度学习核心技术"
步退查询 - 最适合的场景
- 具体技术问题: "如何实现 LSTM?" + "循环神经网络基础"
- 需要背景知识: "量子计算优势" + "计算理论基础"
子查询分解 - 最适合的场景
- 复合问题: "AI 对教育的影响和发展趋势"
- 比较分析: "深度学习与传统机器学习的区别和应用"
高级优化策略
1. 自适应查询转换
def adaptive_query_transformation(query, context=None):
"""
根据查询特征自动选择最佳转换方法
"""
query_features = analyze_query_features(query)
if query_features['specificity'] < 0.3:
return rewrite_query(query)
elif query_features['complexity'] > 0.7:
return decompose_query(query)
elif query_features['needs_context']:
return generate_step_back_query(query)
else:
return query # 不需要转换
def analyze_query_features(query):
"""
分析查询特征
"""
return {
'specificity': calculate_specificity(query),
'complexity': calculate_complexity(query),
'needs_context': needs_background_context(query)
}
2. 多转换融合
def multi_transformation_fusion(query, vector_store, top_k=5):
"""
融合多种转换方法的结果
"""
# 获取各种转换的结果
rewrite_results = transformed_search(query, vector_store, "rewrite", top_k)
stepback_results = transformed_search(query, vector_store, "step_back", top_k)
decompose_results = transformed_search(query, vector_store, "decompose", top_k)
# 为每种方法的结果分配权重
weighted_results = []
for result in rewrite_results['results']:
result['weight'] = 0.4 # 查询重写权重
weighted_results.append(result)
for result in stepback_results['results']:
result['weight'] = 0.3 # 步退查询权重
weighted_results.append(result)
for result in decompose_results['results']:
result['weight'] = 0.3 # 子查询权重
weighted_results.append(result)
# 计算加权分数并去重
final_results = merge_and_rerank_results(weighted_results, top_k)
return final_results
3. 查询转换质量评估
def evaluate_transformation_quality(original_query, transformed_query, results):
"""
评估查询转换的质量
"""
metrics = {
'semantic_similarity': calculate_semantic_similarity(original_query, transformed_query),
'result_relevance': calculate_result_relevance(original_query, results),
'information_gain': calculate_information_gain(original_query, transformed_query),
'specificity_improvement': calculate_specificity_improvement(original_query, transformed_query)
}
# 综合评分
overall_score = (
metrics['semantic_similarity'] * 0.2 +
metrics['result_relevance'] * 0.4 +
metrics['information_gain'] * 0.2 +
metrics['specificity_improvement'] * 0.2
)
return overall_score, metrics
最佳实践
✅ 推荐做法
-
选择合适的转换方法:
- 分析查询特征和用户意图
- 根据文档类型选择转换策略
- 考虑计算成本和响应时间
-
质量控制:
- 验证转换后查询的语义一致性
- 评估检索结果的相关性
- 实施转换质量监控
-
用户体验优化:
- 向用户展示转换后的查询
- 提供转换原因的解释
- 允许用户调整转换结果
-
性能优化:
- 缓存常见的查询转换
- 批量处理多个转换
- 优化 API 调用频率
❌ 避免问题
- 过度转换:不是所有查询都需要转换
- 语义偏移:确保转换后查询与原意一致
- 成本控制:查询转换会增加 API 调用成本
- 延迟影响:转换过程会增加响应时间
扩展应用
查询转换技术可以扩展到:
- 多语言查询转换:跨语言的查询重写和扩展
- 多模态查询转换:文本查询转换为图像描述等
- 个性化转换:基于用户历史的个性化查询重写
- 实时优化:基于检索效果的转换策略动态调整
查询转换是提升 RAG 系统性能的重要技术,通过智能地改进用户查询,可以显著提高检索精度和用户满意度。