“100 亿个数中找出最大的 1000 个”、“两个 10GB 的文件找出共同的 URL”——这些经典面试题的本质都是内存放不下。本文将系统性地梳理大数据场景下的 TopK、排序、去重、词频统计等经典问题,给出从单机到分布式(Spark/Flink)的完整解法。


引言:大数据问题的共同特征

为什么"内存放不下"

数据规模 内存需求 典型服务器内存 能否放入内存
1 亿个 int 400 MB 16 GB
10 亿个 int 4 GB 16 GB
100 亿个 int 40 GB 16 GB
10 亿个 URL(平均 100 字节) 100 GB 16 GB

通用解题框架

1
2
3
4
5
6
7
8
9
10
11
12
大数据问题


能否放入内存?
├── 能 → 直接用内存数据结构解决


不能 → 分而治之

├── 1. 分区(Partition):按哈希/范围将数据分成小块
├── 2. 局部处理:每个小块在内存中处理
└── 3. 合并(Merge):合并各小块的结果

算法的衍生关系

graph TD
    DivideConquer[分治法] --> ExternalSort[外部排序]
    DivideConquer --> HashPartition[哈希分区]
    DivideConquer --> MapReduce[MapReduce]

    ExternalSort --> MergeSort[归并排序]
    ExternalSort --> TopK_Sort[TopK 排序]

    HashPartition --> Dedup[去重]
    HashPartition --> WordCount[词频统计]
    HashPartition --> CommonURL[共同 URL]

    MapReduce --> Spark[Spark RDD]
    MapReduce --> Flink[Flink DataStream]

    Heap[堆] --> TopK_Heap[TopK 堆解法]
    BloomFilter[布隆过滤器] --> Dedup_Bloom[近似去重]
    CountMinSketch[Count-Min Sketch] --> FreqEstimate[频率估计]

Part 1: TopK 问题

问题描述

100 亿个整数,找出最大的 1000 个。

解法一:最小堆(单机,内存足够时)

维护一个大小为 K 的最小堆

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public static int[] topK(int[] data, int k) {
PriorityQueue<Integer> minHeap = new PriorityQueue<>(k);

for (int num : data) {
if (minHeap.size() < k) {
minHeap.offer(num);
} else if (num > minHeap.peek()) {
minHeap.poll();
minHeap.offer(num);
}
}

return minHeap.stream().mapToInt(Integer::intValue).toArray();
}
  • 时间复杂度:O(N × log K)
  • 空间复杂度:O(K)
  • 优势:只需要 O(K) 的内存,K=1000 时只需要几 KB

解法二:分区 + 堆(单机,内存不够时)

当数据在磁盘上时:

  1. 将 100 亿个数分成 1000 个文件(每个文件 1000 万个数)
  2. 对每个文件用最小堆找出 Top 1000
  3. 合并 1000 个文件的 Top 1000(共 100 万个数),再用最小堆找出最终 Top 1000

解法三:快速选择算法(QuickSelect)

基于快速排序的分区思想,平均 O(N) 时间找到第 K 大的元素:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public static int quickSelect(int[] arr, int left, int right, int k) {
if (left == right) return arr[left];

int pivotIndex = partition(arr, left, right);
int rank = pivotIndex - left + 1;

if (rank == k) {
return arr[pivotIndex];
} else if (rank > k) {
return quickSelect(arr, left, pivotIndex - 1, k);
} else {
return quickSelect(arr, pivotIndex + 1, right, k - rank);
}
}

private static int partition(int[] arr, int left, int right) {
// 随机选择 pivot,避免最坏情况
int randomIndex = left + ThreadLocalRandom.current().nextInt(right - left + 1);
swap(arr, randomIndex, right);

int pivot = arr[right];
int storeIndex = left;
for (int i = left; i < right; i++) {
if (arr[i] >= pivot) { // 降序排列,找最大的 K 个
swap(arr, i, storeIndex);
storeIndex++;
}
}
swap(arr, storeIndex, right);
return storeIndex;
}

private static void swap(int[] arr, int i, int j) {
int temp = arr[i];
arr[i] = arr[j];
arr[j] = temp;
}
  • 平均时间复杂度:O(N)
  • 最坏时间复杂度:O(N²)(通过随机化 pivot 避免)
  • 空间复杂度:O(1)(原地操作)

解法四:Spark 分布式 TopK

1
2
3
val topK = sc.textFile("hdfs:///data/numbers.txt")
.map(_.toLong)
.top(1000) // 内部使用 BoundedPriorityQueue

Spark 的 top(K) 内部实现:

  1. 每个 Partition 维护一个大小为 K 的最小堆
  2. 在 Shuffle 阶段合并各 Partition 的堆
  3. 最终 Driver 端合并得到全局 Top K
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
DataStream<Long> numbers = env.readTextFile("hdfs:///data/numbers.txt")
.map(Long::parseLong);

numbers
.windowAll(TumblingProcessingTimeWindows.of(Time.minutes(1)))
.process(new TopKProcessFunction(1000))
.print();

public class TopKProcessFunction extends ProcessAllWindowFunction<Long, String, TimeWindow> {
private final int k;

public TopKProcessFunction(int k) {
this.k = k;
}

@Override
public void process(Context context, Iterable<Long> elements, Collector<String> out) {
PriorityQueue<Long> minHeap = new PriorityQueue<>(k);
for (Long element : elements) {
if (minHeap.size() < k) {
minHeap.offer(element);
} else if (element > minHeap.peek()) {
minHeap.poll();
minHeap.offer(element);
}
}
out.collect("TopK: " + minHeap);
}
}

Part 2: 外部排序

问题描述

对一个 100GB 的文件进行排序,可用内存只有 1GB。

外部归并排序算法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
Phase 1: 分割与排序(Sort Phase)
┌──────────────────────────────────────────────┐
│ 100GB 文件 │
│ ┌─────┐ ┌─────┐ ┌─────┐ ┌─────┐ │
│ │ 1GB │ │ 1GB │ │ 1GB │ ... │ 1GB │ 共100块 │
│ └──┬──┘ └──┬──┘ └──┬──┘ └──┬──┘ │
│ │ │ │ │ │
│ ▼ ▼ ▼ ▼ │
│ 内存排序 内存排序 内存排序 内存排序 │
│ │ │ │ │ │
│ ▼ ▼ ▼ ▼ │
│ 有序块1 有序块2 有序块3 ... 有序块100 │
└──────────────────────────────────────────────┘

Phase 2: 多路归并(Merge Phase)
┌──────────────────────────────────────────────┐
│ 100 个有序块 │
│ │ │ │ │ │
│ ▼ ▼ ▼ ▼ │
│ ┌─────────────────────────────────┐ │
│ │ 100 路归并(最小堆) │ │
│ │ 每个块读入一个缓冲区(~10MB) │ │
│ └──────────────┬──────────────────┘ │
│ │ │
│ ▼ │
│ 排序后的 100GB 文件 │
└──────────────────────────────────────────────┘

多路归并的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public static void externalMergeSort(List<File> sortedChunks, File output) throws IOException {
// 最小堆,按每个块的当前最小值排序
PriorityQueue<ChunkReader> minHeap = new PriorityQueue<>(
Comparator.comparingLong(ChunkReader::peek)
);

// 初始化:每个块读入第一个元素
for (File chunk : sortedChunks) {
ChunkReader reader = new ChunkReader(chunk);
if (reader.hasNext()) {
minHeap.offer(reader);
}
}

try (BufferedWriter writer = new BufferedWriter(new FileWriter(output))) {
while (!minHeap.isEmpty()) {
ChunkReader smallest = minHeap.poll();
writer.write(String.valueOf(smallest.pop()));
writer.newLine();

if (smallest.hasNext()) {
minHeap.offer(smallest); // 重新入堆
} else {
smallest.close();
}
}
}
}

多路归并时间复杂度分析

对于 k 路归并排序,设总数据量为 N,内存容量为 M,则:

  1. 分割阶段

    • 分块数量:k = N / M
    • 每块排序时间:O(M log M)
    • 总排序时间:O(N log M)
  2. 归并阶段

    • 每次从最小堆中取出最小元素:O(log k)
    • 总共需要取出 N 个元素
    • 归并时间:O(N log k)
    • 由于 k = N / M,故归并时间:O(N log(N/M))
  3. 总时间复杂度

    • O(N log M) + O(N log(N/M))
    • 当 M = √N 时达到最优:O(N log √N) = O(N log N)
  4. I/O 复杂度

    • 每个数据块被读取 2 次(排序时 1 次,归并时 1 次)
    • 总 I/O 次数:2N

优化技巧

优化 说明
替换选择排序 初始块的大小可以达到内存的 2 倍
多阶段归并 当块数太多时,分多轮归并(如每轮 10 路)
异步 I/O 读取下一个缓冲区时,同时处理当前缓冲区
压缩 对中间文件进行压缩,减少 I/O

Spark 的排序

1
2
3
4
val sorted = sc.textFile("hdfs:///data/large_file.txt")
.map(_.toLong)
.sortBy(identity)
.saveAsTextFile("hdfs:///data/sorted_output")

Spark 的排序使用 Range Partitioner

  1. 采样:从数据中采样,确定分区边界
  2. 分区:按范围将数据分配到不同 Partition
  3. 局部排序:每个 Partition 内部排序
  4. 输出:按 Partition 顺序输出,得到全局有序结果

Part 3: 去重问题

问题描述

10 亿个 URL,去除重复的 URL。

解法一:哈希分区 + HashSet

  1. 对每个 URL 计算哈希值:hash(url) % 1000
  2. 将 URL 写入对应的文件(共 1000 个文件)
  3. 对每个文件,用 HashSet 去重
  4. 合并所有文件的结果

关键:相同的 URL 一定会被分到同一个文件中,所以每个文件可以独立去重。

解法二:布隆过滤器(近似去重)

布隆过滤器(Bloom Filter) 是一种空间高效的概率数据结构:

  • 插入:将元素通过 K 个哈希函数映射到位数组的 K 个位置,置为 1
  • 查询:检查 K 个位置是否都为 1
  • 特点:可能有假阳性(误判存在),但不会有假阴性(不会漏判)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class BloomFilter {
private final BitSet bitSet;
private final int size;
private final int hashCount;

public BloomFilter(int expectedElements, double falsePositiveRate) {
this.size = optimalSize(expectedElements, falsePositiveRate);
this.hashCount = optimalHashCount(size, expectedElements);
this.bitSet = new BitSet(size);
}

public void add(String element) {
for (int i = 0; i < hashCount; i++) {
int hash = hash(element, i);
bitSet.set(Math.abs(hash % size));
}
}

public boolean mightContain(String element) {
for (int i = 0; i < hashCount; i++) {
int hash = hash(element, i);
if (!bitSet.get(Math.abs(hash % size))) {
return false; // 一定不存在
}
}
return true; // 可能存在(有假阳性概率)
}

private int hash(String element, int seed) {
return MurmurHash3.hash32(element.getBytes(), seed);
}

private static int optimalSize(int n, double p) {
return (int) (-n * Math.log(p) / (Math.log(2) * Math.log(2)));
}

private static int optimalHashCount(int m, int n) {
return Math.max(1, (int) Math.round((double) m / n * Math.log(2)));
}
}

布隆过滤器的参数推导

设 n 为期望插入的元素数量,m 为位数组长度(比特数),k 为哈希函数数量。

  1. 误判率计算

    • 插入 n 个元素后,任意一位仍为 0 的概率:(1 - 1/m)^(kn) ≈ e^(-kn/m)
    • 任意一位为 1 的概率:1 - e^(-kn/m)
    • 查询一个不存在的元素时,k 个哈希位置都为 1 的概率(误判率):
      p = (1 - e^(-kn/m))^k
  2. 最优哈希函数数量
    对 p 关于 k 求导并令导数为 0,得到:
    k = (m/n) × ln 2

  3. 最优位数组长度
    将 k 代入误判率公式,给定目标误判率 p,解得:
    m = -n × ln p / (ln 2)^2

  4. 参数选择示例

元素数量 误判率 位数组大小 哈希函数数
10 亿 1% 1.2 GB 7
10 亿 0.1% 1.8 GB 10
10 亿 0.01% 2.4 GB 13

解法三:HyperLogLog 近似去重

对于超大规模的去重场景(如亿级 UV 统计),可以使用 HyperLogLog

HyperLogLog 原理

HyperLogLog 基于 LogLog 算法,通过观察哈希值的二进制表示中前导零的个数来估计基数。

  1. 基本思想

    • 将每个元素通过哈希函数映射为均匀分布的 32 位或 64 位整数
    • 计算哈希值的二进制表示中前导零的个数 ρ(h(x))
    • 前导零越多,说明哈希值越小,对应的基数越大
    • 使用 2^max(ρ) 作为基数的估计
  2. 调和平均数修正

    • LogLog 使用算术平均数,对极端值敏感
    • HyperLogLog 使用调和平均数,具有更好的鲁棒性
    • 设 m 为桶数量,每个桶记录该桶内元素的最大前导零个数 M[j]
    • 基数估计:
      1
      E = α_m × m² / (Σ[1/(2^M[j])])
      其中 α_m 为修正系数:
      • α_m ≈ 0.7213/(1 + 1.079/m)(当 m ≥ 128 时)
  3. 为什么使用调和平均数

    • 调和平均数对小值更敏感,能更好地处理哈希冲突
    • 当某个桶的 M[j] 异常大时,调和平均数不会被过度影响
    • 数学上,调和平均数 ≤ 几何平均数 ≤ 算术平均数
    • 对于基数估计,调和平均数提供了更保守且准确的估计
  4. 空间复杂度

    • 只需 m 个寄存器,每个寄存器 5-6 位
    • m = 2^16 时,仅需 12KB 内存
    • 可以估计 2^27 个不同元素,误差率约 1.04%
  5. 修正规则

    • 小范围修正:当 E < 5m/2 时
      • 使用线性计数修正:E’ = m × ln(m/V0)
      • 其中 V0 为 M[j] = 0 的桶数量
    • 大范围修正:当 E > 2^32/30 时
      • 修正上限:E’ = -2^32 × ln(1 - E/2^32)

解法四:Spark 去重

1
2
3
val uniqueUrls = sc.textFile("hdfs:///data/urls.txt")
.distinct() // 内部使用 reduceByKey
.saveAsTextFile("hdfs:///data/unique_urls")

distinct() 的内部实现:

  1. map(x => (x, null)):将每个元素转为 KV 对
  2. reduceByKey((a, b) => a):按 Key 去重
  3. map(_._1):提取 Key

Part 4: 词频统计

问题描述

统计 10GB 文本文件中每个单词出现的次数,找出出现次数最多的 100 个单词。

解法一:哈希分区 + HashMap

  1. 逐行读取文件,对每个单词计算 hash(word) % 1000
  2. 将单词写入对应的文件
  3. 对每个文件,用 HashMap 统计词频
  4. 对每个文件的结果,用最小堆找出 Top 100
  5. 合并 1000 个文件的 Top 100,找出全局 Top 100

解法二:MapReduce 经典实现

1
2
3
4
5
6
7
8
9
Map 阶段:
输入:("Hello World Hello") → 输出:(Hello, 1), (World, 1), (Hello, 1)

Shuffle 阶段:
按 Key 分组:(Hello, [1, 1]), (World, [1])

Reduce 阶段:
(Hello, [1, 1]) → (Hello, 2)
(World, [1]) → (World, 1)

解法三:Spark WordCount

1
2
3
4
5
6
7
8
val wordCounts = sc.textFile("hdfs:///data/text.txt")
.flatMap(_.split("\\s+"))
.map(word => (word.toLowerCase, 1))
.reduceByKey(_ + _)

// 找出 Top 100
val top100 = wordCounts
.top(100)(Ordering.by(_._2))
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
DataStream<String> text = env.readTextFile("hdfs:///data/text.txt");

DataStream<Tuple2<String, Integer>> wordCounts = text
.flatMap(new Tokenizer())
.keyBy(value -> value.f0)
.sum(1);

// 窗口 TopK
wordCounts
.windowAll(TumblingProcessingTimeWindows.of(Time.minutes(5)))
.process(new TopKWordFunction(100))
.print();

public static class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
for (String word : value.toLowerCase().split("\\s+")) {
if (!word.isEmpty()) {
out.collect(new Tuple2<>(word, 1));
}
}
}
}

Count-Min Sketch:近似词频统计

当精确统计内存不够时,可以使用 Count-Min Sketch

1
2
3
4
5
6
7
8
9
10
11
12
13
Count-Min Sketch 结构:
┌─────────────────────────────────────┐
│ d 个哈希函数 × w 个计数器 │
│ │
h1: [3] [0] [5] [1] [0] [2] [0]
h2: [1] [4] [0] [0] [3] [0] [1]
h3: [0] [2] [1] [3] [0] [4] [0]
│ │
│ 查询 "hello" 的频率: │
min(h1[hash1("hello")], │
h2[hash2("hello")], │
h3[hash3("hello")]) │
└─────────────────────────────────────┘

Count-Min Sketch 误差界分析

设 d 为哈希函数数量,w 为每行的计数器数量,ε 为误差率,δ 为失败概率。

  1. 误差上界推导

    • 设元素 x 的真实频率为 f,估计频率为 f̂
    • 对于任意哈希函数 hi,计数器 C[hi(x)] 的期望为 f
    • 由于哈希冲突,C[hi(x)] = f + noise,其中 noise ≥ 0
    • 取最小值:f̂ = min(C[h1(x)], …, C[hd(x)])
  2. 参数选择

    • 宽度 w:控制单次哈希冲突的概率
      • w = ⌈e/ε⌉,其中 e ≈ 2.718
    • 深度 d:控制多次哈希同时冲突的概率
      • d = ⌈ln(1/δ)⌉
  3. 误差保证

    • 以至少 1-δ 的概率,对于所有元素 x:
      f̂ ≤ f + εN
      其中 N 为所有元素的频率总和
  4. 空间复杂度

    • 总空间:O(d × w) = O((1/ε) × ln(1/δ))
    • 对于 ε = 0.01, δ = 0.01:约 4600 个计数器
  5. 特性

    • 单调性:频率估计值 ≥ 真实值(只会高估,不会低估)
    • 加性:支持增量更新,适合流式数据
    • 并行性:多个 sketch 可以合并(对应位置相加)

Part 5: 共同元素问题

问题描述

两个各有 10 亿个 URL 的文件,找出共同的 URL。

解法一:哈希分区

  1. 对文件 A 的每个 URL,计算 hash(url) % 1000,写入 A_0, A_1, …, A_999
  2. 对文件 B 的每个 URL,计算 hash(url) % 1000,写入 B_0, B_1, …, B_999
  3. 对每对 (A_i, B_i),将 A_i 加载到 HashSet,遍历 B_i 查找交集
  4. 合并所有交集

关键:使用相同的哈希函数,相同的 URL 一定在相同编号的文件中。

解法二:排序 + 归并

  1. 对文件 A 和文件 B 分别进行外部排序
  2. 使用双指针归并,找出相同的 URL
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public static List<String> findCommon(BufferedReader readerA, BufferedReader readerB)
throws IOException {
List<String> common = new ArrayList<>();
String lineA = readerA.readLine();
String lineB = readerB.readLine();

while (lineA != null && lineB != null) {
int cmp = lineA.compareTo(lineB);
if (cmp == 0) {
common.add(lineA);
lineA = readerA.readLine();
lineB = readerB.readLine();
} else if (cmp < 0) {
lineA = readerA.readLine();
} else {
lineB = readerB.readLine();
}
}

return common;
}

解法三:Spark 交集

1
2
3
4
5
val urlsA = sc.textFile("hdfs:///data/urls_a.txt")
val urlsB = sc.textFile("hdfs:///data/urls_b.txt")

val commonUrls = urlsA.intersection(urlsB)
commonUrls.saveAsTextFile("hdfs:///data/common_urls")

intersection() 的内部实现:

  1. 将两个 RDD 都转为 (url, null) 的 KV 对
  2. 使用 cogroup 将相同 Key 的数据聚合
  3. 过滤出在两个 RDD 中都存在的 Key

Part 6: 中位数问题

问题描述

100 亿个整数,找出中位数。

解法一:二分搜索

如果数据范围已知(如 32 位整数,范围 0 ~ 2^32-1):

  1. 猜测中位数为 mid
  2. 扫描所有数据,统计 ≤ mid 的个数 count
  3. 如果 count < N/2,说明中位数 > mid,搜索右半部分
  4. 如果 count ≥ N/2,说明中位数 ≤ mid,搜索左半部分
  5. 重复直到范围缩小到 1

时间复杂度:O(N × log(MAX_VALUE)),对于 32 位整数是 O(32N)。
空间复杂度:O(1)。

解法二:分桶计数

  1. 将数据范围分成 65536 个桶(每个桶覆盖 65536 个值)
  2. 第一遍扫描:统计每个桶中的元素个数
  3. 根据桶的计数,确定中位数在哪个桶中
  4. 第二遍扫描:只关注目标桶中的元素,精确找到中位数

空间复杂度:O(桶数量) = O(65536) ≈ 256 KB。

解法三:Spark 近似中位数

1
2
3
4
5
6
7
8
9
10
11
12
13
val numbers = sc.textFile("hdfs:///data/numbers.txt").map(_.toLong)

// 精确中位数(需要全局排序,代价高)
val count = numbers.count()
val median = numbers.sortBy(identity)
.zipWithIndex()
.filter { case (_, index) => index == count / 2 }
.map(_._1)
.first()

// 近似中位数(使用采样,更高效)
val approxMedian = numbers.sample(false, 0.01).collect().sorted
val medianApprox = approxMedian(approxMedian.length / 2)

架构对比

维度 Spark Flink
计算模型 微批处理(Micro-Batch) 真正的流处理(Event-by-Event)
延迟 秒级(批间隔) 毫秒级
状态管理 有限(需要外部存储) 内置强大的状态管理
容错 RDD Lineage(重算) Checkpoint + Savepoint
窗口 基于批次 灵活的事件时间窗口
API RDD / DataFrame / Dataset DataStream / Table API
适用场景 批处理、ETL、机器学习 实时流处理、CEP

何时选择 Spark

  • 批处理 ETL:大规模数据清洗和转换
  • 机器学习:MLlib 生态成熟
  • 交互式查询:Spark SQL 性能优秀
  • 团队熟悉度:Spark 社区更大,学习资源更多
  • 实时流处理:毫秒级延迟要求
  • 复杂事件处理(CEP):模式匹配、异常检测
  • 有状态计算:需要精确的状态管理
  • 事件时间处理:需要处理乱序事件

容错机制对比

Spark RDD Lineage

  • 记录 RDD 的转换链(DAG)
  • 某个 Partition 丢失时,根据 Lineage 重新计算
  • 对于窄依赖,只需重算丢失的 Partition
  • 对于宽依赖(Shuffle),可能需要重算多个 Partition

Flink Checkpoint

  • 基于 Chandy-Lamport 算法的分布式快照
  • 定期将所有算子的状态保存到持久化存储
  • 故障恢复时,从最近的 Checkpoint 恢复
  • 支持 Exactly-Once 语义

Part 8: 实际生产中的注意事项

数据倾斜处理

问题:某些 Key 的数据量远大于其他 Key,导致个别 Task 处理时间过长。

解决方案

方案 适用场景 实现方式
加盐打散 聚合操作 Key 加随机前缀,两阶段聚合
广播小表 Join 操作(一大一小) 将小表广播到所有节点
采样倾斜 Key Join 操作(都大) 对倾斜 Key 单独处理
自定义分区器 通用 根据数据分布自定义分区逻辑

Spark 加盐打散示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 原始:直接 reduceByKey 会导致数据倾斜
val result = data.reduceByKey(_ + _)

// 优化:两阶段聚合
val salted = data.map { case (key, value) =>
val salt = ThreadLocalRandom.current().nextInt(10)
(s"${salt}_${key}", value)
}

val partialResult = salted.reduceByKey(_ + _)

val finalResult = partialResult.map { case (saltedKey, value) =>
val originalKey = saltedKey.split("_", 2)(1)
(originalKey, value)
}.reduceByKey(_ + _)

资源调优

参数 Spark Flink
并行度 spark.default.parallelism parallelism.default
内存 spark.executor.memory taskmanager.memory.process.size
Shuffle spark.shuffle.file.buffer taskmanager.network.memory.fraction

总结

问题类型 单机解法 分布式解法 近似解法
TopK 最小堆 O(N log K) Spark top(K)
排序 外部归并排序 Spark sortBy
去重 哈希分区 + HashSet Spark distinct 布隆过滤器
词频统计 哈希分区 + HashMap Spark reduceByKey Count-Min Sketch
共同元素 哈希分区 + HashSet Spark intersection 布隆过滤器
中位数 二分搜索 / 分桶 Spark sortBy + zipWithIndex 采样

核心思想分而治之——将大问题分解为内存可处理的小问题,然后合并结果。

参考资料