上一篇解决了文档怎样变成可搜索的——写入路径经过 buffer、translog、refresh 三步后文档才对搜索可见。这一篇进入一次搜索请求在集群内部怎样执行。

ES 是分布式搜索引擎,一个 index 的数据分布在多个 shard 上。一次搜索需要在所有相关 shard 上执行,然后合并结果。这个过程分成两个阶段:Query 阶段和 Fetch 阶段。

本文只抓一个问题:Query-Then-Fetch 的两阶段搜索是怎么工作的,以及这个设计为什么是必要的。

为什么需要两个阶段

如果 ES 只有一个 shard,搜索很简单:在 shard 上执行查询,取 top-N 结果,返回。但数据分布在多个 shard 上时,直接从每个 shard 取 top-N 的完整文档(包含 _source),再在 coordinating node 合并,会传输大量不需要的数据——大部分文档在全局排序后会被丢弃。

两阶段设计的核心思路:先用轻量数据(docId + score)做全局排序,再只取需要的文档内容。

sequenceDiagram
    participant C as Client
    participant CN as Coordinating Node
    participant S0 as Shard 0
    participant S1 as Shard 1
    participant S2 as Shard 2

    rect rgb(230, 245, 255)
    note over CN,S2: Query Phase — 只传 docId + score
    C->>CN: search request
    par scatter
        CN->>S0: query
        CN->>S1: query
        CN->>S2: query
    end
    S0-->>CN: top-N (docId, score)
    S1-->>CN: top-N (docId, score)
    S2-->>CN: top-N (docId, score)
    CN->>CN: 全局归并排序, 取最终 top-N
    end

    rect rgb(255, 245, 230)
    note over CN,S2: Fetch Phase — 只取命中文档的 _source
    par multi-get
        CN->>S0: fetch doc3, doc7
        CN->>S2: fetch doc15
    end
    S0-->>CN: _source of doc3, doc7
    S2-->>CN: _source of doc15
    end

    CN-->>C: 组装最终 response

两阶段之间的关键转折:Query Phase 结束时 coordinating node 已经知道最终要哪几条文档,Fetch Phase 只取这几条的全文,避免了传输大量最终会被丢弃的 _source

Query 阶段

1
2
3
4
5
6
7
8
Client
→ Coordinating Node
→ 广播 query 到所有相关 shard (scatter)
Shard 0: 本地 Lucene 搜索 → 返回 top-N (docId, score)
Shard 1: 本地 Lucene 搜索 → 返回 top-N (docId, score)
Shard 2: 本地 Lucene 搜索 → 返回 top-N (docId, score)
← 收集所有 shard 的结果 (gather)
→ 全局排序,取最终 top-N 的 docId 列表

每个 shard 在本地执行完整的 Lucene 搜索,包括查询解析、倒排索引查找、BM25 打分、排序。返回给 coordinating node 的只是 docId 和 score 的列表,不包含文档内容。

coordinating node 把所有 shard 返回的结果做全局排序(归并排序),取出最终的 top-N。

如果请求的是 from: 20, size: 10(第三页),每个 shard 需要返回 top-30(from + size),coordinating node 从所有 shard 的 top-30 中全局排序后取第 21-30 条。这就是深翻页开销随页数线性增长的原因。

Fetch 阶段

1
2
3
4
5
6
7
Coordinating Node
→ 根据全局 top-N 的 docId,确定文档在哪些 shard 上
→ 只向相关 shard 发送 fetch 请求(multi-get)
Shard 0: 返回 doc3, doc7 的 _source
Shard 2: 返回 doc15 的 _source
← 收集文档内容
→ 组装最终 response 返回给 client

Fetch 阶段只取最终需要的那几条文档的 _source。如果 size: 10,最多只从相关 shard 取 10 条文档的内容。这比在 Query 阶段就传输所有候选文档的 _source 高效得多。

DFS_QUERY_THEN_FETCH

默认的 QUERY_THEN_FETCH 有一个评分精度问题:每个 shard 用本地的词频统计(document frequency)来计算 BM25 分数。当数据在 shard 之间分布不均匀时,同一个词项在不同 shard 上的 IDF 值可能差异较大,导致分数不可比。

DFS_QUERY_THEN_FETCH 在 Query 阶段之前增加一个 DFS(Distributed Frequency Statistics)阶段:

flowchart LR
    subgraph QTF["QUERY_THEN_FETCH (默认)"]
        direction LR
        Q1[Query<br/>本地 IDF 打分] --> F1[Fetch]
    end
    subgraph DFS_QTF["DFS_QUERY_THEN_FETCH"]
        direction LR
        D[DFS<br/>收集全局词频] --> Q2[Query<br/>全局 IDF 打分] --> F2[Fetch]
    end
    QTF ~~~ DFS_QTF

DFS 多出的那一轮网络往返(收集全局词频)换来的是跨 shard 评分一致性。数据量越小、shard 越少,本地 IDF 与全局 IDF 的偏差越大,DFS 的收益越明显。

通过 search_type 参数控制:

1
2
GET /my-index/_search?search_type=dfs_query_then_fetch
{ "query": { "match": { "title": "elasticsearch" } } }

DFS 的代价是多一轮网络往返。大多数场景下数据分布足够均匀,默认的 QUERY_THEN_FETCH 就够用。只有在 shard 数量少、数据量小、评分精度要求高的场景才需要 DFS。

Preference 参数

默认情况下,搜索请求在 primary 和 replica 之间自适应选择(adaptive replica selection)。preference 参数可以控制搜索请求发往哪些 shard 副本:

行为
_local 优先使用本节点上的 shard
_prefer_nodes:node1,node2 优先使用指定节点
_shards:0,1 只搜索指定 shard
自定义字符串(如用户 ID) 同一个字符串总是路由到同一组 shard 副本

自定义字符串的常见用途是保证同一个用户的多次搜索请求命中同一组 shard 副本,避免因 replica 之间的 refresh 时差导致分页时结果跳动。

实验:用 Profile 观察两阶段耗时

profile: true 可以查看搜索的详细耗时分布:

1
2
3
4
5
6
7
GET /test-inverted-index/_search
{
"profile": true,
"query": {
"match": { "title": "search engine" }
}
}

返回结果中 profile 部分会显示:

  • 每个 shard 的 query 阶段耗时(包括 Lucene 级别的 scorer、collector 等)
  • fetch 阶段耗时
  • 各阶段的时间占比

这个工具可以定位搜索延迟的瓶颈在哪个阶段、哪个 shard。

模式提炼:Scatter-Gather

1
request → scatter to N workers → each worker computes locally → gather results → merge → response

Query-Then-Fetch 是 scatter-gather 模式的典型实现。这个模式在分布式系统中广泛使用:

系统 Scatter Worker 计算 Gather
ES 搜索 coordinating → all shards shard 本地 Lucene 搜索 全局排序 top-N
MapReduce split input → mappers map function reduce / merge
Solr 分布式搜索 coordinating → all shards shard 本地搜索 合并结果
分库分表查询 proxy → all databases SQL 在各库执行 合并排序
gRPC fan-out gateway → all backends 各 backend 处理 聚合响应

scatter-gather 的固有开销:总延迟取决于最慢的 worker(木桶效应)。如果一个 shard 所在的节点负载高或网络延迟大,整个搜索请求的延迟都会被拉长。

工程迁移表

概念 Elasticsearch Apache Solr MapReduce 分库分表
请求路由 Coordinating node Distributed handler JobTracker Proxy / middleware
计算分发 广播到所有 shard 广播到所有 shard 分发到 mapper 广播到所有库
本地计算 Lucene query + score Lucene query + score Map function SQL 执行
结果合并 全局排序 top-N 合并排序 Reduce function 合并排序
深翻页问题 from+size 线性开销 同样存在 N/A 同样存在

常见误解

误解一:coordinating node 做了搜索计算。 coordinating node 不做搜索计算。它只负责分发请求和合并结果。真正的搜索计算(倒排索引查找、BM25 打分)发生在每个 shard 的本地 Lucene 上。

误解二:shard 数越多搜索越快。 shard 数增加意味着 scatter 范围变大、gather 合并的数据量变大、网络开销变多。在数据量不大的情况下,过多的 shard 反而增加搜索延迟。

误解三:from: 10000, size: 10from: 0, size: 10 的开销差不多。 深翻页时每个 shard 要返回 from+size 条结果给 coordinating node,开销随 from 线性增长。这就是 ES 默认限制 max_result_window: 10000 的原因。深翻页应该用 search_afterscroll API。

练习

  1. 创建一个 3 shard 的 index,写入至少 30 条文档。用 _search?profile=true 观察每个 shard 的 query 阶段耗时和 fetch 阶段耗时。

  2. 对比 QUERY_THEN_FETCHDFS_QUERY_THEN_FETCH 的搜索结果分数差异。写入少量文档(每个 shard 只有几条),让数据分布不均匀,然后比较两种模式下的 _score

  3. _search?preference=_shards:0 只搜索 shard 0,对比搜索全部 shard 的结果和分数差异。

系列导航

上一篇 下一篇
写入路径:近实时、Translog 与 Refresh/Flush 相关性评分:BM25 与打分机制

参考资料