“100 亿个数中找出最大的 1000 个”、“两个 10GB 的文件找出共同的 URL”——这些经典面试题的本质都是内存放不下。单机方案围绕分治展开,分布式方案则把分治思想映射到集群节点上。本文按问题类型组织,每类问题给出从单机到 Spark/Flink 的渐进式解法,并附上概率数据结构(布隆过滤器、HyperLogLog、Count-Min Sketch)在近似场景中的应用。


引言:大数据问题的共同特征

为什么"内存放不下"

面试中给出的数据规模往往是精心设计的——刚好跨过单机内存的边界:

数据规模 内存需求 典型服务器内存 能否放入内存
1 亿个 int 400 MB 16 GB
10 亿个 int 4 GB 16 GB ✅(但留给程序的余量不多)
100 亿个 int 40 GB 16 GB
10 亿个 URL(平均 100 字节) 100 GB 16 GB

上表只计算了裸数据大小。实际使用 HashMap、HashSet 等容器时,对象头、指针、负载因子会使内存占用膨胀 3-5 倍。

通用解题框架

1
2
3
4
5
6
7
8
9
10
11
12
大数据问题


能否放入内存?
├── 能 → 直接用内存数据结构解决


不能 → 分而治之

├── 1. 分区(Partition):按哈希/范围将数据分成小块
├── 2. 局部处理:每个小块在内存中处理
└── 3. 合并(Merge):合并各小块的结果

算法的衍生关系

graph TD
    DivideConquer[分治法] --> ExternalSort[外部排序]
    DivideConquer --> HashPartition[哈希分区]
    DivideConquer --> MapReduce[MapReduce]

    ExternalSort --> MergeSort[归并排序]
    ExternalSort --> TopK_Sort[TopK 排序]

    HashPartition --> Dedup[去重]
    HashPartition --> WordCount[词频统计]
    HashPartition --> CommonURL[共同 URL]

    MapReduce --> Spark[Spark RDD]
    MapReduce --> Flink[Flink DataStream]

    Heap[堆] --> TopK_Heap[TopK 堆解法]
    BloomFilter[布隆过滤器] --> Dedup_Bloom[近似去重]
    CountMinSketch[Count-Min Sketch] --> FreqEstimate[频率估计]

Part 1: TopK 问题

问题描述

100 亿个整数,找出最大的 1000 个。

解法一:最小堆(单机,内存足够时)

维护一个大小为 K 的最小堆

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public static int[] topK(int[] data, int k) {
PriorityQueue<Integer> minHeap = new PriorityQueue<>(k);

for (int num : data) {
if (minHeap.size() < k) {
minHeap.offer(num);
} else if (num > minHeap.peek()) {
minHeap.poll();
minHeap.offer(num);
}
}

return minHeap.stream().mapToInt(Integer::intValue).toArray();
}
  • 时间复杂度:O(N × log K)
  • 空间复杂度:O(K)
  • 优势:只需要 O(K) 的内存,K=1000 时只需要几 KB

解法二:分区 + 堆(单机,内存不够时)

当数据在磁盘上时:

  1. 将 100 亿个数分成 1000 个文件(每个文件 1000 万个数)
  2. 对每个文件用最小堆找出 Top 1000
  3. 合并 1000 个文件的 Top 1000(共 100 万个数),再用最小堆找出最终 Top 1000

解法三:快速选择算法(QuickSelect)

基于快速排序的分区思想,平均 O(N) 时间将数组划分为"前 K 大"和"其余"两部分。一次 partition 完成后,pivot 左侧的所有元素都 ≥ pivot,右侧都 < pivot;递归缩小范围直到 pivot 恰好落在第 K 个位置,此时数组前 K 个元素即为 Top K(无序):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public static int quickSelect(int[] arr, int left, int right, int k) {
if (left == right) return arr[left];

int pivotIndex = partition(arr, left, right);
int rank = pivotIndex - left + 1;

if (rank == k) {
return arr[pivotIndex];
} else if (rank > k) {
return quickSelect(arr, left, pivotIndex - 1, k);
} else {
return quickSelect(arr, pivotIndex + 1, right, k - rank);
}
}

private static int partition(int[] arr, int left, int right) {
// 随机选择 pivot,避免最坏情况
int randomIndex = left + ThreadLocalRandom.current().nextInt(right - left + 1);
swap(arr, randomIndex, right);

int pivot = arr[right];
int storeIndex = left;
for (int i = left; i < right; i++) {
if (arr[i] >= pivot) { // 降序排列,找最大的 K 个
swap(arr, i, storeIndex);
storeIndex++;
}
}
swap(arr, storeIndex, right);
return storeIndex;
}

private static void swap(int[] arr, int i, int j) {
int temp = arr[i];
arr[i] = arr[j];
arr[j] = temp;
}
  • 平均时间复杂度:O(N)
  • 最坏时间复杂度:O(N²)(通过随机化 pivot 避免)
  • 空间复杂度:O(1)(原地操作)
  • 局限:要求所有数据可随机访问(在内存中),不适用于磁盘上的流式数据

解法四:Spark 分布式 TopK

1
2
3
val topK = sc.textFile("hdfs:///data/numbers.txt")
.map(_.toLong)
.top(1000) // 内部使用 BoundedPriorityQueue

Spark 的 top(K) 内部使用 BoundedPriorityQueue,执行流程:

  1. 每个 Partition 独立维护一个大小为 K 的最小堆(Map 端局部 TopK)
  2. Reduce 阶段合并各 Partition 的堆——两个堆合并后再截断到 K
  3. 最终 Driver 端得到全局 Top K

这个过程本质上就是"解法二"的分布式版本——Spark 把手动分文件 + 手动合并的工作自动化了。

flowchart LR
    subgraph Map["Map 端(每个 Partition 独立处理)"]
        direction TB
        P1[Partition 1] --> H1["局部最小堆<br/>size = K"]
        P2[Partition 2] --> H2["局部最小堆<br/>size = K"]
        Pn[Partition N] --> Hn["局部最小堆<br/>size = K"]
    end

    H1 --> M["Reduce 端<br/>两两合并堆<br/>合并后截断到 K"]
    H2 --> M
    Hn --> M

    M --> D["Driver<br/>全局 Top K"]

节点间只传输大小为 K 的堆,网络代价为 O(N × K)(N 个 Partition × K 个元素),而非整个数据集;这是 Spark 把"内存敏感"操作做成可扩展操作的标准模式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
DataStream<Long> numbers = env.readTextFile("hdfs:///data/numbers.txt")
.map(Long::parseLong);

numbers
.windowAll(TumblingProcessingTimeWindows.of(Time.minutes(1)))
.process(new TopKProcessFunction(1000))
.print();

public class TopKProcessFunction extends ProcessAllWindowFunction<Long, String, TimeWindow> {
private final int k;

public TopKProcessFunction(int k) {
this.k = k;
}

@Override
public void process(Context context, Iterable<Long> elements, Collector<String> out) {
PriorityQueue<Long> minHeap = new PriorityQueue<>(k);
for (Long element : elements) {
if (minHeap.size() < k) {
minHeap.offer(element);
} else if (element > minHeap.peek()) {
minHeap.poll();
minHeap.offer(element);
}
}
out.collect("TopK: " + minHeap);
}
}

Part 2: 外部排序

问题描述

对一个 100GB 的文件进行排序,可用内存只有 1GB。

外部归并排序算法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
Phase 1: 分割与排序(Sort Phase)
┌──────────────────────────────────────────────┐
│ 100GB 文件 │
│ ┌─────┐ ┌─────┐ ┌─────┐ ┌─────┐ │
│ │ 1GB │ │ 1GB │ │ 1GB │ ... │ 1GB │ 共100块 │
│ └──┬──┘ └──┬──┘ └──┬──┘ └──┬──┘ │
│ │ │ │ │ │
│ ▼ ▼ ▼ ▼ │
│ 内存排序 内存排序 内存排序 内存排序 │
│ │ │ │ │ │
│ ▼ ▼ ▼ ▼ │
│ 有序块1 有序块2 有序块3 ... 有序块100 │
└──────────────────────────────────────────────┘

Phase 2: 多路归并(Merge Phase)
┌──────────────────────────────────────────────┐
│ 100 个有序块 │
│ │ │ │ │ │
│ ▼ ▼ ▼ ▼ │
│ ┌─────────────────────────────────┐ │
│ │ 100 路归并(最小堆) │ │
│ │ 每个块读入一个缓冲区(~10MB) │ │
│ └──────────────┬──────────────────┘ │
│ │ │
│ ▼ │
│ 排序后的 100GB 文件 │
└──────────────────────────────────────────────┘

多路归并的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public static void externalMergeSort(List<File> sortedChunks, File output) throws IOException {
// 最小堆,按每个块的当前最小值排序
PriorityQueue<ChunkReader> minHeap = new PriorityQueue<>(
Comparator.comparingLong(ChunkReader::peek)
);

// 初始化:每个块读入第一个元素
for (File chunk : sortedChunks) {
ChunkReader reader = new ChunkReader(chunk);
if (reader.hasNext()) {
minHeap.offer(reader);
}
}

try (BufferedWriter writer = new BufferedWriter(new FileWriter(output))) {
while (!minHeap.isEmpty()) {
ChunkReader smallest = minHeap.poll();
writer.write(String.valueOf(smallest.pop()));
writer.newLine();

if (smallest.hasNext()) {
minHeap.offer(smallest); // 重新入堆
} else {
smallest.close();
}
}
}
}

多路归并时间复杂度分析

对于 k 路归并排序,设总数据量为 N,内存容量为 M,则:

  1. 分割阶段

    • 分块数量:k = N / M
    • 每块排序时间:O(M log M)
    • 总排序时间:O(N log M)
  2. 归并阶段

    • 每次从最小堆中取出最小元素:O(log k)
    • 总共需要取出 N 个元素
    • 归并时间:O(N log k)
    • 由于 k = N / M,故归并时间:O(N log(N/M))
  3. 总时间复杂度

    • O(N log M) + O(N log(N/M))
    • 数学最优在 M = √N 处取得:此时归并路数 k = √N,总复杂度可化简为 O(N log N)(log √N = ½·log N,差常数因子)
    • 工程上 M 由可用内存上限决定,通常 M ≫ √N;此时归并的 O(N log(N/M)) 远小于分割阶段,分割排序的 O(N log M) 是固定开销
    • 实际瓶颈是磁盘 I/O 而非 CPU 计算,选 M 时主要看内存余量、缓冲区大小、文件句柄上限,而不是去逼近数学最优点
  4. I/O 复杂度(决定实际耗时的关键因素):

    • 每个数据块被读写各 2 次(排序阶段读入 + 写出有序块,归并阶段读入 + 写出最终文件)
    • 总 I/O 数据量:4N(对于单轮归并)
    • 多轮归并时 I/O 翻倍:每增加一轮归并,中间数据被多读写一次
    • 磁盘顺序读写带宽约 200MB/s(HDD)或 2GB/s(SSD),100GB 数据的 I/O 时间即为瓶颈

优化技巧

优化 说明 收益
替换选择排序 用败者树在输出一个有序块的同时接收新输入,平均生成 2M 大小的有序块 块数减半 → 归并路数减半
多阶段归并 当块数太多(如 1000 路),分多轮归并(如每轮 10 路,共 3 轮) 降低堆操作开销、减少同时打开的文件句柄
双缓冲 / 异步 I/O 一个缓冲区供 CPU 处理,另一个缓冲区异步预读下一批数据 CPU 与磁盘并行,吞吐提升 30-50%
压缩 对中间有序块使用 LZ4/Snappy 等快速压缩 减少 I/O 量,对重复率高的数据效果显著

Spark 的排序

1
2
3
4
val sorted = sc.textFile("hdfs:///data/large_file.txt")
.map(_.toLong)
.sortBy(identity)
.saveAsTextFile("hdfs:///data/sorted_output")

Spark 的排序使用 Range Partitioner

  1. 采样:从数据中采样,确定分区边界
  2. 分区:按范围将数据分配到不同 Partition
  3. 局部排序:每个 Partition 内部排序
  4. 输出:按 Partition 顺序输出,得到全局有序结果

Part 3: 去重问题

问题描述

10 亿个 URL,去除重复的 URL。

解法一:哈希分区 + HashSet(精确去重)

  1. 对每个 URL 计算哈希值:hash(url) % 1000
  2. 将 URL 写入对应的文件(共 1000 个文件)
  3. 对每个文件,加载到内存用 HashSet 去重
  4. 合并所有文件的结果
flowchart LR
    Input[10 亿个 URL] --> Hash["hash(url) % 1000"]
    Hash --> F0[文件 0<br/>~100 MB]
    Hash --> F1[文件 1<br/>~100 MB]
    Hash --> Fdots[...]
    Hash --> F999[文件 999<br/>~100 MB]
    F0 --> S0[HashSet 去重]
    F1 --> S1[HashSet 去重]
    Fdots --> Sdots[...]
    F999 --> S999[HashSet 去重]
    S0 --> Out[合并去重结果]
    S1 --> Out
    Sdots --> Out
    S999 --> Out

关键:相同的 URL 一定会被分到同一个文件中,所以每个文件可以独立去重——不会出现同一个 URL 分散在两个文件里而漏去重的情况。

分区数的选择:10 亿个 URL × 100 字节 = 100 GB。分成 1000 个文件,每个文件约 100 MB。考虑到 HashSet 的内存膨胀(Java 中约 5-8 倍于裸数据),每个文件处理时需要 500 MB - 800 MB 内存。如果内存不够,增大分区数即可。

解法二:布隆过滤器(近似去重)

布隆过滤器(Bloom Filter) 是一种空间高效的概率数据结构:

  • 插入:将元素通过 K 个哈希函数映射到位数组的 K 个位置,置为 1
  • 查询:检查 K 个位置是否都为 1
  • 特点:可能有假阳性(误判存在),但不会有假阴性(不会漏判)
flowchart TB
    subgraph Insert["插入元素 x"]
        direction LR
        X[x] --> H1["hash₁(x) = 3"]
        X --> H2["hash₂(x) = 7"]
        X --> H3["hash₃(x) = 11"]
    end

    Bit["位数组(m bits)<br/>插入后 pos 3、7、11 被置 1"]

    H1 --> Bit
    H2 --> Bit
    H3 --> Bit

    Bit --> Query["查询元素 y(实际未插入)<br/>hash₁(y) = 3 → 1 ✓<br/>hash₂(y) = 11 → 1 ✓<br/>hash₃(y) = 7 → 1 ✓<br/>三位全 1 → 误判为存在(假阳性)"]

假阴性不可能:若 y 真的被插入过,它的 K 个 hash 位置一定都被置过 1,查询必然命中。假阳性的根源是不同元素的 hash 位置可能巧合重叠。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class BloomFilter {
private final BitSet bitSet;
private final int size;
private final int hashCount;

public BloomFilter(int expectedElements, double falsePositiveRate) {
this.size = optimalSize(expectedElements, falsePositiveRate);
this.hashCount = optimalHashCount(size, expectedElements);
this.bitSet = new BitSet(size);
}

public void add(String element) {
for (int i = 0; i < hashCount; i++) {
int hash = hash(element, i);
bitSet.set(Math.abs(hash % size));
}
}

public boolean mightContain(String element) {
for (int i = 0; i < hashCount; i++) {
int hash = hash(element, i);
if (!bitSet.get(Math.abs(hash % size))) {
return false; // 一定不存在
}
}
return true; // 可能存在(有假阳性概率)
}

private int hash(String element, int seed) {
return MurmurHash3.hash32(element.getBytes(), seed);
}

private static int optimalSize(int n, double p) {
return (int) (-n * Math.log(p) / (Math.log(2) * Math.log(2)));
}

private static int optimalHashCount(int m, int n) {
return Math.max(1, (int) Math.round((double) m / n * Math.log(2)));
}
}

布隆过滤器的参数推导

设 n 为期望插入的元素数量,m 为位数组长度(比特数),k 为哈希函数数量。

  1. 误判率计算

    • 插入 n 个元素后,任意一位仍为 0 的概率:(1 - 1/m)^(kn) ≈ e^(-kn/m)
    • 任意一位为 1 的概率:1 - e^(-kn/m)
    • 查询一个不存在的元素时,k 个哈希位置都为 1 的概率(误判率):
      p = (1 - e^(-kn/m))^k
  2. 最优哈希函数数量
    对 p 关于 k 求导并令导数为 0,得到:
    k = (m/n) × ln 2

  3. 最优位数组长度
    将 k 代入误判率公式,给定目标误判率 p,解得:
    m = -n × ln p / (ln 2)^2

  4. 参数选择示例

元素数量 误判率 位数组大小 哈希函数数
10 亿 1% 1.2 GB 7
10 亿 0.1% 1.8 GB 10
10 亿 0.01% 2.4 GB 13

解法三:HyperLogLog 近似去重

对于超大规模的去重场景(如亿级 UV 统计),可以使用 HyperLogLog

HyperLogLog 原理

HyperLogLog 基于 LogLog 算法,通过观察哈希值的二进制表示中前导零的个数来估计基数。

  1. 基本思想

    • 将每个元素通过哈希函数映射为均匀分布的 32 位或 64 位整数
    • 计算哈希值的二进制表示中前导零的个数 ρ(h(x))
    • 前导零越多,说明哈希值越小,对应的基数越大
    • 使用 2^max(ρ) 作为基数的估计
  2. 调和平均数修正

    • LogLog 使用算术平均数,对极端值敏感
    • HyperLogLog 使用调和平均数,具有更好的鲁棒性
    • 设 m 为桶数量,每个桶记录该桶内元素的最大前导零个数 M[j]
    • 基数估计:
      1
      E = α_m × m² / (Σ[1/(2^M[j])])
      其中 α_m 为修正系数:
      • α_m ≈ 0.7213/(1 + 1.079/m)(当 m ≥ 128 时)
  3. 为什么使用调和平均数

    • 调和平均数对小值更敏感,能更好地处理哈希冲突
    • 当某个桶的 M[j] 异常大时,调和平均数不会被过度影响
    • 数学上,调和平均数 ≤ 几何平均数 ≤ 算术平均数
    • 对于基数估计,调和平均数提供了更保守且准确的估计
  4. 空间复杂度

    • 只需 m 个寄存器,每个寄存器 5-6 位(记录最大前导零个数)
    • m = 2^14(16384 个桶)时,仅需 12KB 内存,标准误差约 0.81%
    • m = 2^16(65536 个桶)时,需 48KB 内存,标准误差约 0.41%
    • 理论上可估计的基数上限取决于哈希位数(64 位哈希 → 可估计 2^64 级基数),与桶数无关;桶数只影响精度
    • 标准误差公式:σ ≈ 1.04 / √m
  5. 修正规则

    • 小范围修正:当 E < 5m/2 时
      • 使用线性计数修正:E’ = m × ln(m/V0)
      • 其中 V0 为 M[j] = 0 的桶数量
    • 大范围修正:当 E > 2^32/30 时
      • 修正上限:E’ = -2^32 × ln(1 - E/2^32)

解法四:Spark 去重

1
2
3
val uniqueUrls = sc.textFile("hdfs:///data/urls.txt")
.distinct() // 内部使用 reduceByKey
.saveAsTextFile("hdfs:///data/unique_urls")

distinct() 的内部实现:

  1. map(x => (x, null)):将每个元素转为 KV 对
  2. reduceByKey((a, b) => a):按 Key 去重
  3. map(_._1):提取 Key

Part 4: 词频统计

问题描述

统计 10GB 文本文件中每个单词出现的次数,找出出现次数最多的 100 个单词。

解法一:哈希分区 + HashMap

  1. 逐行读取文件,对每个单词计算 hash(word) % 1000
  2. 将单词写入对应的文件
  3. 对每个文件,用 HashMap 统计词频
  4. 对每个文件的结果,用最小堆找出 Top 100
  5. 合并 1000 个文件的 Top 100,找出全局 Top 100

解法二:MapReduce 经典实现

flowchart LR
    subgraph Input["输入分片"]
        I1["Hello World"]
        I2["Hello Spark"]
        I3["World Flink"]
    end

    subgraph MapPhase["Map 阶段(本地无状态)"]
        M1["(Hello, 1)<br/>(World, 1)"]
        M2["(Hello, 1)<br/>(Spark, 1)"]
        M3["(World, 1)<br/>(Flink, 1)"]
    end

    I1 --> M1
    I2 --> M2
    I3 --> M3

    subgraph ShufflePhase["Shuffle(按 Key 哈希分组到 Reducer)"]
        S1["Hello: [1, 1]"]
        S2["World: [1, 1]"]
        S3["Spark: [1]"]
        S4["Flink: [1]"]
    end

    M1 --> S1
    M1 --> S2
    M2 --> S1
    M2 --> S3
    M3 --> S2
    M3 --> S4

    subgraph ReducePhase["Reduce 阶段"]
        R1["(Hello, 2)"]
        R2["(World, 2)"]
        R3["(Spark, 1)"]
        R4["(Flink, 1)"]
    end

    S1 --> R1
    S2 --> R2
    S3 --> R3
    S4 --> R4

Shuffle 是 MapReduce 的昂贵环节——所有 Map 输出按 Key 重分布到 Reducer,涉及全集群网络传输。Combiner(Map 端预聚合)和加盐打散都是为了把 Shuffle 阶段的数据量压下来。

解法三:Spark WordCount

1
2
3
4
5
6
7
8
val wordCounts = sc.textFile("hdfs:///data/text.txt")
.flatMap(_.split("\\s+"))
.map(word => (word.toLowerCase, 1))
.reduceByKey(_ + _)

// 找出 Top 100
val top100 = wordCounts
.top(100)(Ordering.by(_._2))
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
DataStream<String> text = env.readTextFile("hdfs:///data/text.txt");

DataStream<Tuple2<String, Integer>> wordCounts = text
.flatMap(new Tokenizer())
.keyBy(value -> value.f0)
.sum(1);

// 窗口 TopK
wordCounts
.windowAll(TumblingProcessingTimeWindows.of(Time.minutes(5)))
.process(new TopKWordFunction(100))
.print();

public static class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
for (String word : value.toLowerCase().split("\\s+")) {
if (!word.isEmpty()) {
out.collect(new Tuple2<>(word, 1));
}
}
}
}

Count-Min Sketch:近似词频统计

当精确统计内存不够时,可以使用 Count-Min Sketch

1
2
3
4
5
6
7
8
9
10
11
12
13
Count-Min Sketch 结构:
┌─────────────────────────────────────┐
│ d 个哈希函数 × w 个计数器 │
│ │
h1: [3] [0] [5] [1] [0] [2] [0]
h2: [1] [4] [0] [0] [3] [0] [1]
h3: [0] [2] [1] [3] [0] [4] [0]
│ │
│ 查询 "hello" 的频率: │
min(h1[hash1("hello")], │
h2[hash2("hello")], │
h3[hash3("hello")]) │
└─────────────────────────────────────┘

Count-Min Sketch 误差界分析

设 d 为哈希函数数量,w 为每行的计数器数量,ε 为误差率,δ 为失败概率。

  1. 误差上界推导

    • 设元素 x 的真实频率为 f,估计频率为 f̂
    • 对于任意哈希函数 hi,计数器 C[hi(x)] 的期望为 f
    • 由于哈希冲突,C[hi(x)] = f + noise,其中 noise ≥ 0
    • 取最小值:f̂ = min(C[h1(x)], …, C[hd(x)])
  2. 参数选择

    • 宽度 w:控制单次哈希冲突的概率
      • w = ⌈e/ε⌉,其中 e ≈ 2.718
    • 深度 d:控制多次哈希同时冲突的概率
      • d = ⌈ln(1/δ)⌉
  3. 误差保证

    • 以至少 1-δ 的概率,对于所有元素 x:
      f̂ ≤ f + εN
      其中 N 为所有元素的频率总和
  4. 空间复杂度

    • 总空间:O(d × w) = O((1/ε) × ln(1/δ))
    • 对于 ε = 0.01, δ = 0.01:约 4600 个计数器
  5. 特性

    • 单调性:频率估计值 ≥ 真实值(只会高估,不会低估)
    • 加性:支持增量更新,适合流式数据
    • 并行性:多个 sketch 可以合并(对应位置相加)

Part 5: 共同元素问题

问题描述

两个各有 10 亿个 URL 的文件,找出共同的 URL。

解法一:哈希分区

  1. 对文件 A 的每个 URL,计算 hash(url) % 1000,写入 A_0, A_1, …, A_999
  2. 对文件 B 的每个 URL,计算 hash(url) % 1000,写入 B_0, B_1, …, B_999
  3. 对每对 (A_i, B_i),将 A_i 加载到 HashSet,遍历 B_i 查找交集
  4. 合并所有交集

关键:使用相同的哈希函数,相同的 URL 一定在相同编号的文件中。

解法二:排序 + 归并

  1. 对文件 A 和文件 B 分别进行外部排序
  2. 使用双指针归并,找出相同的 URL
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public static List<String> findCommon(BufferedReader readerA, BufferedReader readerB)
throws IOException {
List<String> common = new ArrayList<>();
String lineA = readerA.readLine();
String lineB = readerB.readLine();

while (lineA != null && lineB != null) {
int cmp = lineA.compareTo(lineB);
if (cmp == 0) {
common.add(lineA);
lineA = readerA.readLine();
lineB = readerB.readLine();
} else if (cmp < 0) {
lineA = readerA.readLine();
} else {
lineB = readerB.readLine();
}
}

return common;
}

解法三:Spark 交集

1
2
3
4
5
val urlsA = sc.textFile("hdfs:///data/urls_a.txt")
val urlsB = sc.textFile("hdfs:///data/urls_b.txt")

val commonUrls = urlsA.intersection(urlsB)
commonUrls.saveAsTextFile("hdfs:///data/common_urls")

intersection() 的内部实现:

  1. 将两个 RDD 都转为 (url, null) 的 KV 对
  2. 使用 cogroup 将相同 Key 的数据聚合(触发 Shuffle)
  3. 过滤出在两个 RDD 中都存在的 Key

解法四:布隆过滤器预筛选

当两个文件的交集远小于全集(即大部分 URL 不重复)时,可以用布隆过滤器做第一遍筛选,大幅减少第二遍需要精确比对的数据量:

  1. 对文件 A 的所有 URL 构建布隆过滤器 BF_A
  2. 扫描文件 B,用 BF_A 过滤:通过布隆过滤器的 URL 写入候选集 C(含少量假阳性)
  3. 对候选集 C 与文件 A 做精确比对(此时 C 已远小于 B)

这种两阶段策略在实际生产中很常见:第一遍用概率结构快速淘汰大量不可能的候选,第二遍对小规模候选做精确验证。


Part 6: 中位数问题

问题描述

100 亿个整数,找出中位数。

解法一:二分搜索(值域二分)

关键洞察:不对数据排序,而是对值域做二分。数据范围已知时(如 32 位无符号整数,范围 0 ~ 2^32-1),每次二分值域,统计落在左半区间的元素个数来决定中位数在哪一侧:

  1. 设搜索范围为 [lo, hi],取 mid = (lo + hi) / 2
  2. 扫描全部数据,统计 ≤ mid 的个数 count
  3. 如果 count < N/2,说明中位数 > mid,令 lo = mid + 1
  4. 如果 count ≥ N/2,说明中位数 ≤ mid,令 hi = mid
  5. 重复直到 lo == hi,此时 lo 即为中位数

时间复杂度:O(N × log(MAX_VALUE))。对于 32 位整数,需要扫描数据 32 次,即 O(32N)。
空间复杂度:O(1)——只需一个计数器。
适用性:数据可以在磁盘上,每次只需顺序扫描一遍;不要求随机访问。

解法二:分桶计数(两遍扫描)

将值域等分为若干桶,用两遍扫描精确定位中位数。以 32 位无符号整数(值域 0 ~ 2^32-1)为例:

  1. 第一遍扫描:将值域分成 2^16 = 65536 个桶,每个桶覆盖 65536 个连续整数值。扫描全部数据,统计每个桶的元素个数
  2. 定位目标桶:从第一个桶开始累加计数,当累加值首次 ≥ N/2 时,中位数在当前桶内
  3. 第二遍扫描:只关注落在目标桶内的元素(仅 65536 个可能的值),用一个 65536 大小的数组精确计数
  4. 在目标桶的数组中再次累加,找到精确的中位数

空间复杂度:O(桶数量) = 65536 × 4 字节 = 256 KB(第一遍)+ 256 KB(第二遍)。
时间复杂度:O(2N)——两遍顺序扫描。
对比解法一:分桶只扫描 2 遍,而二分搜索需扫描 32 遍;但分桶需要 512 KB 内存,二分搜索只需 O(1)。

解法三:Spark 中位数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
val numbers = sc.textFile("hdfs:///data/numbers.txt").map(_.toLong)

// 方式一:精确中位数(需要全局排序,代价高——触发全量 Shuffle)
val count = numbers.count()
val median = numbers.sortBy(identity)
.zipWithIndex()
.filter { case (_, index) => index == count / 2 }
.map(_._1)
.first()

// 方式二:近似中位数——Spark SQL 内置的 approxQuantile(基于 Greenwald-Khanna 算法)
import org.apache.spark.sql.functions._
val df = numbers.toDF("value")
// 第三个参数 relativeError 控制精度:0 为精确,0.01 为 1% 误差
val approxMedian = df.stat.approxQuantile("value", Array(0.5), 0.01)(0)

// 方式三:手动采样(最粗略,适合快速估算数量级)
val sampled = numbers.sample(false, 0.01).collect().sorted
val sampledMedian = sampled(sampled.length / 2)

三种方式的权衡:

方式 精度 代价 适用场景
sortBy + zipWithIndex 精确 全量 Shuffle + 排序 数据量可控、必须精确
approxQuantile 可控误差(relativeError 参数) 单遍扫描,无 Shuffle 生产环境首选
sample + 本地排序 取决于采样率 采样率 × 数据量 快速验证数量级

架构对比

维度 Spark Flink
计算模型 微批处理(Micro-Batch);Structured Streaming 的 Continuous Mode 可做到毫秒级但功能受限 逐事件处理(Event-by-Event),流是一等公民
延迟 秒级(由微批间隔决定) 毫秒级(事件到达即处理)
状态管理 Structured Streaming 支持有限状态;复杂状态需借助外部存储 内置 Keyed State / Operator State,支持 RocksDB 后端,可管理 TB 级状态
容错 RDD Lineage(丢失分区重算);Structured Streaming 用 WAL + 偏移量 基于 Chandy-Lamport 的异步屏障快照(Checkpoint),支持 Exactly-Once
窗口 基于处理时间或微批边界 支持事件时间、处理时间、会话窗口,可处理乱序和迟到数据
API RDD / DataFrame / Dataset / Structured Streaming DataStream / Table API / SQL(批流统一)
生态 MLlib、GraphX、PySpark 生态成熟,社区规模大 CEP 库、Stateful Functions(Statefun),阿里/字节等大厂深度投入

选型决策树

1
2
3
4
5
6
7
8
9
10
11
12
13
需求是什么?
├── 离线批处理 / ETL / 数据仓库
│ └── Spark(DataFrame + SQL 优化器成熟,与 Hive/Iceberg 集成好)
├── 机器学习训练
│ └── Spark(MLlib + 与 Python 生态的互操作)
├── 亚秒级实时流处理
│ └── Flink(逐事件模型,延迟确定性高)
├── 有状态流处理(如实时风控、会话分析)
│ └── Flink(内置状态后端 + Savepoint 支持版本迭代)
├── 复杂事件处理(CEP)/ 模式匹配
│ └── Flink(原生 CEP 库)
└── 团队只有 Spark 经验、无严格延迟要求
└── Spark Structured Streaming(降低学习成本)

容错机制对比

Spark RDD Lineage

  • 记录 RDD 的转换链(DAG),元数据本身不占大量存储
  • 某个 Partition 丢失时,根据 Lineage 从上游重新计算
  • 窄依赖(map/filter):只需重算丢失的 Partition,恢复快
  • 宽依赖(Shuffle):需要重算 Shuffle 上游的所有父 Partition;生产环境中通常对 Shuffle 中间结果做持久化(spark.shuffle.service.enabled)来缓解
  • Lineage 过长时可手动插入 persist() / checkpoint() 截断链路

Flink Checkpoint

  • 基于 Chandy-Lamport 算法:向数据流中注入 Barrier(屏障标记),Barrier 流过算子时触发状态快照
  • 快照异步写入持久化存储(HDFS/S3),不阻塞正常数据处理
  • 故障恢复时,所有算子回退到最近一次完成的 Checkpoint,从 Source 的对应偏移量重放
  • Exactly-Once 保证依赖于 Barrier 对齐(Aligned Checkpoint);Unaligned Checkpoint 牺牲部分语义换取反压场景下的快照速度
  • Savepoint 是手动触发的 Checkpoint,用于版本升级、拓扑变更等运维场景
sequenceDiagram
    participant JM as JobManager
    participant S as Source
    participant Op1 as Operator A
    participant Op2 as Operator B
    participant Store as State Backend<br/>(HDFS/S3)

    JM->>S: 触发 Checkpoint n
    S->>S: 记录 Source 偏移量
    S->>Store: 异步持久化 Source 状态
    S->>Op1: 数据流中注入 Barrier n

    Note over Op1: 收到 Barrier n<br/>(若多输入:等待所有<br/>输入的 Barrier 对齐)
    Op1->>Store: 异步持久化 Op A 状态
    Op1->>Op2: 继续转发 Barrier n

    Note over Op2: 收到 Barrier n
    Op2->>Store: 异步持久化 Op B 状态
    Op2->>JM: 报告 Checkpoint n 完成

    Note over JM: 所有算子确认 →<br/>Checkpoint n 标记为完成<br/>故障时从此点恢复

Barrier 对齐期间,先到的输入通道的数据会被缓冲(不处理),直到所有通道的 Barrier 都到齐,以此保证快照切面的一致性。反压严重时对齐等待时间变长,此时可切换为 Unaligned Checkpoint——不等 Barrier 对齐,而是将未处理的缓冲数据也纳入快照,代价是快照体积增大。


Part 8: 实际生产中的注意事项

数据倾斜处理

问题:某些 Key 的数据量远大于其他 Key,导致个别 Task 处理时间过长。

解决方案

方案 适用场景 实现方式
加盐打散 聚合操作 Key 加随机前缀,两阶段聚合
广播小表 Join 操作(一大一小) 将小表广播到所有节点
采样倾斜 Key Join 操作(都大) 对倾斜 Key 单独处理
自定义分区器 通用 根据数据分布自定义分区逻辑

Spark 加盐打散示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 原始:直接 reduceByKey 会导致数据倾斜
val result = data.reduceByKey(_ + _)

// 优化:两阶段聚合
val salted = data.map { case (key, value) =>
val salt = ThreadLocalRandom.current().nextInt(10)
(s"${salt}_${key}", value)
}

val partialResult = salted.reduceByKey(_ + _)

val finalResult = partialResult.map { case (saltedKey, value) =>
val originalKey = saltedKey.split("_", 2)(1)
(originalKey, value)
}.reduceByKey(_ + _)
flowchart TB
    subgraph Bad["直接 reduceByKey(倾斜)"]
        direction TB
        BA["原始数据<br/>(hot, 1) × 1 亿条<br/>(cold, 1) × 100 条"]
        BA --> BR["所有 hot 落到同一 Reducer<br/>该任务处理 1 亿条 ⚠️<br/>其他 Reducer 几乎空闲"]
    end

    subgraph Good["加盐两阶段聚合"]
        direction TB
        GA["原始数据(同上)"]
        GA --> GS1["阶段 1:Key 加随机前缀<br/>0_hot, 1_hot, ..., 9_hot"]
        GS1 --> GS2["散到 10 个 Reducer<br/>每个处理约 1000 万条<br/>局部聚合得 (k_hot, count_k)"]
        GS2 --> GS3["阶段 2:去掉前缀<br/>(hot, count_0), ..., (hot, count_9)"]
        GS3 --> GS4["再次 reduceByKey<br/>(hot, count_0 + ... + count_9)"]
    end

加盐打散的代价是多了一轮 Shuffle,但每轮 Shuffle 的数据量都受控;不加盐时是"一个 Reducer 卡 99% 的时间"。盐的桶数应略大于倾斜倍数:倾斜 100 倍取 10 个盐桶通常足够,过多会拖累阶段 2 的合并代价。

资源调优

参数 Spark Flink 调优方向
并行度 spark.default.parallelism parallelism.default 通常设为集群总核数的 2-3 倍
内存 spark.executor.memory taskmanager.memory.process.size 需为堆外内存(网络缓冲、RocksDB)预留空间
Shuffle spark.shuffle.file.buffer(默认 32KB,建议 64-128KB) taskmanager.network.memory.fraction(默认 0.1) Shuffle 密集型任务适当增大
GC spark.executor.extraJavaOptions 配置 G1GC TaskManager JVM 参数 大堆场景用 G1 或 ZGC 减少 Full GC 停顿

面试回答策略

面试中遇到大数据问题时,按以下层次递进作答:

  1. 明确约束:数据量多大?内存多大?是否要求精确结果?是批处理还是流式?
  2. 单机方案:先给出最直觉的分治方案(哈希分区 / 外部排序),说明时间和空间复杂度
  3. 分布式方案:说明如何用 Spark/Flink 实现,指出框架替你做了哪些事(分区、Shuffle、容错)
  4. 近似方案:如果面试官追问"能否用更少的内存",给出概率数据结构方案(布隆过滤器 / HyperLogLog / Count-Min Sketch),说明误差界
  5. 生产考量:主动提及数据倾斜、容错、资源调优,展示工程视野

总结

问题类型 单机解法 分布式解法 近似解法 核心思路
TopK 最小堆 O(N log K) Spark top(K) 只保留"当前最优的 K 个",丢弃其余
排序 外部归并排序 Spark sortBy(Range Partitioner) 分块排序 → 多路归并
去重 哈希分区 + HashSet Spark distinct 布隆过滤器(允许假阳性) 相同元素必须落入同一分区
词频统计 哈希分区 + HashMap Spark reduceByKey Count-Min Sketch(只会高估) 相同 Key 聚合到同一节点计数
共同元素 哈希分区 + HashSet Spark intersection 布隆过滤器 相同哈希 → 同一分区 → 局部求交
中位数 值域二分 / 分桶计数 approxQuantile(Greenwald-Khanna) 采样 不排序,而是对值域做搜索

所有问题的底层模式归结为两条路径:

  1. 分区 → 局部处理 → 合并:将数据按哈希或范围拆成内存可容纳的块,独立处理后合并结果。MapReduce / Spark / Flink 的执行模型正是这条路径的分布式实现。
  2. 用亚线性空间换近似结果:当连分区后的单块都嫌大、或只需要统计量而非精确集合时,概率数据结构(布隆过滤器、HyperLogLog、Count-Min Sketch)以极小内存给出有界误差的估计。

参考资料