“100 亿个数中找出最大的 1000 个”、“两个 10GB 的文件找出共同的 URL”——这些经典面试题的本质都是内存放不下 。本文将系统性地梳理大数据场景下的 TopK、排序、去重、词频统计等经典问题,给出从单机到分布式(Spark/Flink)的完整解法。
引言:大数据问题的共同特征
为什么"内存放不下"
数据规模
内存需求
典型服务器内存
能否放入内存
1 亿个 int
400 MB
16 GB
✅
10 亿个 int
4 GB
16 GB
✅
100 亿个 int
40 GB
16 GB
❌
10 亿个 URL(平均 100 字节)
100 GB
16 GB
❌
通用解题框架
1 2 3 4 5 6 7 8 9 10 11 12 大数据问题 │ ▼ 能否放入内存? ├── 能 → 直接用内存数据结构解决 │ ▼ 不能 → 分而治之 │ ├── 1. 分区(Partition ):按哈希/ 范围将数据分成小块 ├── 2. 局部处理:每个小块在内存中处理 └── 3. 合并(Merge ):合并各小块的结果
算法的衍生关系
graph TD
DivideConquer[分治法] --> ExternalSort[外部排序]
DivideConquer --> HashPartition[哈希分区]
DivideConquer --> MapReduce[MapReduce]
ExternalSort --> MergeSort[归并排序]
ExternalSort --> TopK_Sort[TopK 排序]
HashPartition --> Dedup[去重]
HashPartition --> WordCount[词频统计]
HashPartition --> CommonURL[共同 URL]
MapReduce --> Spark[Spark RDD]
MapReduce --> Flink[Flink DataStream]
Heap[堆] --> TopK_Heap[TopK 堆解法]
BloomFilter[布隆过滤器] --> Dedup_Bloom[近似去重]
CountMinSketch[Count-Min Sketch] --> FreqEstimate[频率估计]
Part 1: TopK 问题
问题描述
100 亿个整数,找出最大的 1000 个。
解法一:最小堆(单机,内存足够时)
维护一个大小为 K 的最小堆 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public static int [] topK(int [] data, int k) { PriorityQueue<Integer> minHeap = new PriorityQueue <>(k); for (int num : data) { if (minHeap.size() < k) { minHeap.offer(num); } else if (num > minHeap.peek()) { minHeap.poll(); minHeap.offer(num); } } return minHeap.stream().mapToInt(Integer::intValue).toArray(); }
时间复杂度 :O(N × log K)
空间复杂度 :O(K)
优势 :只需要 O(K) 的内存,K=1000 时只需要几 KB
解法二:分区 + 堆(单机,内存不够时)
当数据在磁盘上时:
将 100 亿个数分成 1000 个文件(每个文件 1000 万个数)
对每个文件用最小堆找出 Top 1000
合并 1000 个文件的 Top 1000(共 100 万个数),再用最小堆找出最终 Top 1000
解法三:快速选择算法(QuickSelect)
基于快速排序的分区思想,平均 O(N) 时间找到第 K 大的元素:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 public static int quickSelect (int [] arr, int left, int right, int k) { if (left == right) return arr[left]; int pivotIndex = partition(arr, left, right); int rank = pivotIndex - left + 1 ; if (rank == k) { return arr[pivotIndex]; } else if (rank > k) { return quickSelect(arr, left, pivotIndex - 1 , k); } else { return quickSelect(arr, pivotIndex + 1 , right, k - rank); } }private static int partition (int [] arr, int left, int right) { int randomIndex = left + ThreadLocalRandom.current().nextInt(right - left + 1 ); swap(arr, randomIndex, right); int pivot = arr[right]; int storeIndex = left; for (int i = left; i < right; i++) { if (arr[i] >= pivot) { swap(arr, i, storeIndex); storeIndex++; } } swap(arr, storeIndex, right); return storeIndex; }private static void swap (int [] arr, int i, int j) { int temp = arr[i]; arr[i] = arr[j]; arr[j] = temp; }
平均时间复杂度 :O(N)
最坏时间复杂度 :O(N²)(通过随机化 pivot 避免)
空间复杂度 :O(1)(原地操作)
解法四:Spark 分布式 TopK
1 2 3 val topK = sc.textFile("hdfs:///data/numbers.txt" ) .map(_.toLong) .top(1000 )
Spark 的 top(K) 内部实现:
每个 Partition 维护一个大小为 K 的最小堆
在 Shuffle 阶段合并各 Partition 的堆
最终 Driver 端合并得到全局 Top K
解法五:Flink 流式 TopK
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 DataStream<Long> numbers = env.readTextFile("hdfs:///data/numbers.txt" ) .map(Long::parseLong); numbers .windowAll(TumblingProcessingTimeWindows.of(Time.minutes(1 ))) .process(new TopKProcessFunction (1000 )) .print();public class TopKProcessFunction extends ProcessAllWindowFunction <Long, String, TimeWindow> { private final int k; public TopKProcessFunction (int k) { this .k = k; } @Override public void process (Context context, Iterable<Long> elements, Collector<String> out) { PriorityQueue<Long> minHeap = new PriorityQueue <>(k); for (Long element : elements) { if (minHeap.size() < k) { minHeap.offer(element); } else if (element > minHeap.peek()) { minHeap.poll(); minHeap.offer(element); } } out.collect("TopK: " + minHeap); } }
Part 2: 外部排序
问题描述
对一个 100GB 的文件进行排序,可用内存只有 1GB。
外部归并排序算法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 Phase 1: 分割与排序(Sort Phase) ┌──────────────────────────────────────────────┐ │ 100GB 文件 │ │ ┌─────┐ ┌─────┐ ┌─────┐ ┌─────┐ │ │ │ 1GB │ │ 1GB │ │ 1GB │ ... │ 1GB │ 共100块 │ │ └──┬──┘ └──┬──┘ └──┬──┘ └──┬──┘ │ │ │ │ │ │ │ │ ▼ ▼ ▼ ▼ │ │ 内存排序 内存排序 内存排序 内存排序 │ │ │ │ │ │ │ │ ▼ ▼ ▼ ▼ │ │ 有序块1 有序块2 有序块3 ... 有序块100 │ └──────────────────────────────────────────────┘ Phase 2: 多路归并(Merge Phase) ┌──────────────────────────────────────────────┐ │ 100 个有序块 │ │ │ │ │ │ │ │ ▼ ▼ ▼ ▼ │ │ ┌─────────────────────────────────┐ │ │ │ 100 路归并(最小堆) │ │ │ │ 每个块读入一个缓冲区(~10MB) │ │ │ └──────────────┬──────────────────┘ │ │ │ │ │ ▼ │ │ 排序后的 100GB 文件 │ └──────────────────────────────────────────────┘
多路归并的实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public static void externalMergeSort (List<File> sortedChunks, File output) throws IOException { PriorityQueue<ChunkReader> minHeap = new PriorityQueue <>( Comparator.comparingLong(ChunkReader::peek) ); for (File chunk : sortedChunks) { ChunkReader reader = new ChunkReader (chunk); if (reader.hasNext()) { minHeap.offer(reader); } } try (BufferedWriter writer = new BufferedWriter (new FileWriter (output))) { while (!minHeap.isEmpty()) { ChunkReader smallest = minHeap.poll(); writer.write(String.valueOf(smallest.pop())); writer.newLine(); if (smallest.hasNext()) { minHeap.offer(smallest); } else { smallest.close(); } } } }
多路归并时间复杂度分析
对于 k 路归并排序,设总数据量为 N,内存容量为 M,则:
分割阶段 :
分块数量:k = N / M
每块排序时间:O(M log M)
总排序时间:O(N log M)
归并阶段 :
每次从最小堆中取出最小元素:O(log k)
总共需要取出 N 个元素
归并时间:O(N log k)
由于 k = N / M,故归并时间:O(N log(N/M))
总时间复杂度 :
O(N log M) + O(N log(N/M))
当 M = √N 时达到最优:O(N log √N) = O(N log N)
I/O 复杂度 :
每个数据块被读取 2 次(排序时 1 次,归并时 1 次)
总 I/O 次数:2N
优化技巧
优化
说明
替换选择排序
初始块的大小可以达到内存的 2 倍
多阶段归并
当块数太多时,分多轮归并(如每轮 10 路)
异步 I/O
读取下一个缓冲区时,同时处理当前缓冲区
压缩
对中间文件进行压缩,减少 I/O
Spark 的排序
1 2 3 4 val sorted = sc.textFile("hdfs:///data/large_file.txt" ) .map(_.toLong) .sortBy(identity) .saveAsTextFile("hdfs:///data/sorted_output" )
Spark 的排序使用 Range Partitioner :
采样 :从数据中采样,确定分区边界
分区 :按范围将数据分配到不同 Partition
局部排序 :每个 Partition 内部排序
输出 :按 Partition 顺序输出,得到全局有序结果
Part 3: 去重问题
问题描述
10 亿个 URL,去除重复的 URL。
解法一:哈希分区 + HashSet
对每个 URL 计算哈希值:hash(url) % 1000
将 URL 写入对应的文件(共 1000 个文件)
对每个文件,用 HashSet 去重
合并所有文件的结果
关键 :相同的 URL 一定会被分到同一个文件中,所以每个文件可以独立去重。
解法二:布隆过滤器(近似去重)
布隆过滤器(Bloom Filter) 是一种空间高效的概率数据结构:
插入 :将元素通过 K 个哈希函数映射到位数组的 K 个位置,置为 1
查询 :检查 K 个位置是否都为 1
特点 :可能有假阳性(误判存在),但不会有假阴性(不会漏判)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 public class BloomFilter { private final BitSet bitSet; private final int size; private final int hashCount; public BloomFilter (int expectedElements, double falsePositiveRate) { this .size = optimalSize(expectedElements, falsePositiveRate); this .hashCount = optimalHashCount(size, expectedElements); this .bitSet = new BitSet (size); } public void add (String element) { for (int i = 0 ; i < hashCount; i++) { int hash = hash(element, i); bitSet.set(Math.abs(hash % size)); } } public boolean mightContain (String element) { for (int i = 0 ; i < hashCount; i++) { int hash = hash(element, i); if (!bitSet.get(Math.abs(hash % size))) { return false ; } } return true ; } private int hash (String element, int seed) { return MurmurHash3.hash32(element.getBytes(), seed); } private static int optimalSize (int n, double p) { return (int ) (-n * Math.log(p) / (Math.log(2 ) * Math.log(2 ))); } private static int optimalHashCount (int m, int n) { return Math.max(1 , (int ) Math.round((double ) m / n * Math.log(2 ))); } }
布隆过滤器的参数推导 :
设 n 为期望插入的元素数量,m 为位数组长度(比特数),k 为哈希函数数量。
误判率计算 :
插入 n 个元素后,任意一位仍为 0 的概率:(1 - 1/m)^(kn) ≈ e^(-kn/m)
任意一位为 1 的概率:1 - e^(-kn/m)
查询一个不存在的元素时,k 个哈希位置都为 1 的概率(误判率):
p = (1 - e^(-kn/m))^k
最优哈希函数数量 :
对 p 关于 k 求导并令导数为 0,得到:
k = (m/n) × ln 2
最优位数组长度 :
将 k 代入误判率公式,给定目标误判率 p,解得:
m = -n × ln p / (ln 2)^2
参数选择示例 :
元素数量
误判率
位数组大小
哈希函数数
10 亿
1%
1.2 GB
7
10 亿
0.1%
1.8 GB
10
10 亿
0.01%
2.4 GB
13
解法三:HyperLogLog 近似去重
对于超大规模的去重场景(如亿级 UV 统计),可以使用 HyperLogLog :
HyperLogLog 原理 :
HyperLogLog 基于 LogLog 算法,通过观察哈希值的二进制表示中前导零的个数来估计基数。
基本思想 :
将每个元素通过哈希函数映射为均匀分布的 32 位或 64 位整数
计算哈希值的二进制表示中前导零的个数 ρ(h(x))
前导零越多,说明哈希值越小,对应的基数越大
使用 2^max(ρ) 作为基数的估计
调和平均数修正 :
LogLog 使用算术平均数,对极端值敏感
HyperLogLog 使用调和平均数,具有更好的鲁棒性
设 m 为桶数量,每个桶记录该桶内元素的最大前导零个数 M[j]
基数估计:1 E = α_m × m² / (Σ[1 /(2 ^M[j])])
其中 α_m 为修正系数:
α_m ≈ 0.7213/(1 + 1.079/m)(当 m ≥ 128 时)
为什么使用调和平均数 :
调和平均数对小值更敏感,能更好地处理哈希冲突
当某个桶的 M[j] 异常大时,调和平均数不会被过度影响
数学上,调和平均数 ≤ 几何平均数 ≤ 算术平均数
对于基数估计,调和平均数提供了更保守且准确的估计
空间复杂度 :
只需 m 个寄存器,每个寄存器 5-6 位
m = 2^16 时,仅需 12KB 内存
可以估计 2^27 个不同元素,误差率约 1.04%
修正规则 :
小范围修正 :当 E < 5m/2 时
使用线性计数修正:E’ = m × ln(m/V0)
其中 V0 为 M[j] = 0 的桶数量
大范围修正 :当 E > 2^32/30 时
修正上限:E’ = -2^32 × ln(1 - E/2^32)
解法四:Spark 去重
1 2 3 val uniqueUrls = sc.textFile("hdfs:///data/urls.txt" ) .distinct() .saveAsTextFile("hdfs:///data/unique_urls" )
distinct() 的内部实现:
map(x => (x, null)):将每个元素转为 KV 对
reduceByKey((a, b) => a):按 Key 去重
map(_._1):提取 Key
Part 4: 词频统计
问题描述
统计 10GB 文本文件中每个单词出现的次数,找出出现次数最多的 100 个单词。
解法一:哈希分区 + HashMap
逐行读取文件,对每个单词计算 hash(word) % 1000
将单词写入对应的文件
对每个文件,用 HashMap 统计词频
对每个文件的结果,用最小堆找出 Top 100
合并 1000 个文件的 Top 100,找出全局 Top 100
解法二:MapReduce 经典实现
1 2 3 4 5 6 7 8 9 Map 阶段: 输入:("Hello World Hello") → 输出:(Hello, 1 ), (World, 1 ), (Hello, 1 ) Shuffle 阶段: 按 Key 分组:(Hello, [1 , 1 ]), (World, [1 ]) Reduce 阶段: (Hello, [1 , 1 ]) → (Hello, 2 ) (World, [1 ]) → (World, 1 )
解法三:Spark WordCount
1 2 3 4 5 6 7 8 val wordCounts = sc.textFile("hdfs:///data/text.txt" ) .flatMap(_.split("\\s+" )) .map(word => (word.toLowerCase, 1 )) .reduceByKey(_ + _)val top100 = wordCounts .top(100 )(Ordering .by(_._2))
解法四:Flink 流式词频统计
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 DataStream<String> text = env.readTextFile("hdfs:///data/text.txt" ); DataStream<Tuple2<String, Integer>> wordCounts = text .flatMap(new Tokenizer ()) .keyBy(value -> value.f0) .sum(1 ); wordCounts .windowAll(TumblingProcessingTimeWindows.of(Time.minutes(5 ))) .process(new TopKWordFunction (100 )) .print();public static class Tokenizer implements FlatMapFunction <String, Tuple2<String, Integer>> { @Override public void flatMap (String value, Collector<Tuple2<String, Integer>> out) { for (String word : value.toLowerCase().split("\\s+" )) { if (!word.isEmpty()) { out.collect(new Tuple2 <>(word, 1 )); } } } }
Count-Min Sketch:近似词频统计
当精确统计内存不够时,可以使用 Count-Min Sketch :
1 2 3 4 5 6 7 8 9 10 11 12 13 Count-Min Sketch 结构: ┌─────────────────────────────────────┐ │ d 个哈希函数 × w 个计数器 │ │ │ │ h1 : [3] [0] [5] [1] [0] [2] [0] │ │ h2 : [1] [4] [0] [0] [3] [0] [1] │ │ h3 : [0] [2] [1] [3] [0] [4] [0] │ │ │ │ 查询 "hello" 的频率: │ │ min (h1 [hash1("hello" )] , │ │ h2 [hash2("hello" )] , │ │ h3 [hash3("hello" )] ) │ └─────────────────────────────────────┘
Count-Min Sketch 误差界分析 :
设 d 为哈希函数数量,w 为每行的计数器数量,ε 为误差率,δ 为失败概率。
误差上界推导 :
设元素 x 的真实频率为 f,估计频率为 f̂
对于任意哈希函数 hi,计数器 C[hi(x)] 的期望为 f
由于哈希冲突,C[hi(x)] = f + noise,其中 noise ≥ 0
取最小值:f̂ = min(C[h1(x)], …, C[hd(x)])
参数选择 :
宽度 w:控制单次哈希冲突的概率
深度 d:控制多次哈希同时冲突的概率
误差保证 :
以至少 1-δ 的概率,对于所有元素 x:
f̂ ≤ f + εN
其中 N 为所有元素的频率总和
空间复杂度 :
总空间:O(d × w) = O((1/ε) × ln(1/δ))
对于 ε = 0.01, δ = 0.01:约 4600 个计数器
特性 :
单调性:频率估计值 ≥ 真实值(只会高估,不会低估)
加性:支持增量更新,适合流式数据
并行性:多个 sketch 可以合并(对应位置相加)
Part 5: 共同元素问题
问题描述
两个各有 10 亿个 URL 的文件,找出共同的 URL。
解法一:哈希分区
对文件 A 的每个 URL,计算 hash(url) % 1000,写入 A_0, A_1, …, A_999
对文件 B 的每个 URL,计算 hash(url) % 1000,写入 B_0, B_1, …, B_999
对每对 (A_i, B_i),将 A_i 加载到 HashSet,遍历 B_i 查找交集
合并所有交集
关键 :使用相同的哈希函数,相同的 URL 一定在相同编号的文件中。
解法二:排序 + 归并
对文件 A 和文件 B 分别进行外部排序
使用双指针归并,找出相同的 URL
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public static List<String> findCommon (BufferedReader readerA, BufferedReader readerB) throws IOException { List<String> common = new ArrayList <>(); String lineA = readerA.readLine(); String lineB = readerB.readLine(); while (lineA != null && lineB != null ) { int cmp = lineA.compareTo(lineB); if (cmp == 0 ) { common.add(lineA); lineA = readerA.readLine(); lineB = readerB.readLine(); } else if (cmp < 0 ) { lineA = readerA.readLine(); } else { lineB = readerB.readLine(); } } return common; }
解法三:Spark 交集
1 2 3 4 5 val urlsA = sc.textFile("hdfs:///data/urls_a.txt" )val urlsB = sc.textFile("hdfs:///data/urls_b.txt" )val commonUrls = urlsA.intersection(urlsB) commonUrls.saveAsTextFile("hdfs:///data/common_urls" )
intersection() 的内部实现:
将两个 RDD 都转为 (url, null) 的 KV 对
使用 cogroup 将相同 Key 的数据聚合
过滤出在两个 RDD 中都存在的 Key
Part 6: 中位数问题
问题描述
100 亿个整数,找出中位数。
解法一:二分搜索
如果数据范围已知(如 32 位整数,范围 0 ~ 2^32-1):
猜测中位数为 mid
扫描所有数据,统计 ≤ mid 的个数 count
如果 count < N/2,说明中位数 > mid,搜索右半部分
如果 count ≥ N/2,说明中位数 ≤ mid,搜索左半部分
重复直到范围缩小到 1
时间复杂度 :O(N × log(MAX_VALUE)),对于 32 位整数是 O(32N)。
空间复杂度 :O(1)。
解法二:分桶计数
将数据范围分成 65536 个桶(每个桶覆盖 65536 个值)
第一遍扫描:统计每个桶中的元素个数
根据桶的计数,确定中位数在哪个桶中
第二遍扫描:只关注目标桶中的元素,精确找到中位数
空间复杂度 :O(桶数量) = O(65536) ≈ 256 KB。
解法三:Spark 近似中位数
1 2 3 4 5 6 7 8 9 10 11 12 13 val numbers = sc.textFile("hdfs:///data/numbers.txt" ).map(_.toLong)val count = numbers.count()val median = numbers.sortBy(identity) .zipWithIndex() .filter { case (_, index) => index == count / 2 } .map(_._1) .first()val approxMedian = numbers.sample(false , 0.01 ).collect().sortedval medianApprox = approxMedian(approxMedian.length / 2 )
Part 7: Spark vs Flink 对比
架构对比
维度
Spark
Flink
计算模型
微批处理(Micro-Batch)
真正的流处理(Event-by-Event)
延迟
秒级(批间隔)
毫秒级
状态管理
有限(需要外部存储)
内置强大的状态管理
容错
RDD Lineage(重算)
Checkpoint + Savepoint
窗口
基于批次
灵活的事件时间窗口
API
RDD / DataFrame / Dataset
DataStream / Table API
适用场景
批处理、ETL、机器学习
实时流处理、CEP
何时选择 Spark
批处理 ETL :大规模数据清洗和转换
机器学习 :MLlib 生态成熟
交互式查询 :Spark SQL 性能优秀
团队熟悉度 :Spark 社区更大,学习资源更多
何时选择 Flink
实时流处理 :毫秒级延迟要求
复杂事件处理(CEP) :模式匹配、异常检测
有状态计算 :需要精确的状态管理
事件时间处理 :需要处理乱序事件
容错机制对比
Spark RDD Lineage :
记录 RDD 的转换链(DAG)
某个 Partition 丢失时,根据 Lineage 重新计算
对于窄依赖,只需重算丢失的 Partition
对于宽依赖(Shuffle),可能需要重算多个 Partition
Flink Checkpoint :
基于 Chandy-Lamport 算法的分布式快照
定期将所有算子的状态保存到持久化存储
故障恢复时,从最近的 Checkpoint 恢复
支持 Exactly-Once 语义
Part 8: 实际生产中的注意事项
数据倾斜处理
问题 :某些 Key 的数据量远大于其他 Key,导致个别 Task 处理时间过长。
解决方案 :
方案
适用场景
实现方式
加盐打散
聚合操作
Key 加随机前缀,两阶段聚合
广播小表
Join 操作(一大一小)
将小表广播到所有节点
采样倾斜 Key
Join 操作(都大)
对倾斜 Key 单独处理
自定义分区器
通用
根据数据分布自定义分区逻辑
Spark 加盐打散示例 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 val result = data.reduceByKey(_ + _)val salted = data.map { case (key, value) => val salt = ThreadLocalRandom .current().nextInt(10 ) (s"${salt} _${key} " , value) }val partialResult = salted.reduceByKey(_ + _)val finalResult = partialResult.map { case (saltedKey, value) => val originalKey = saltedKey.split("_" , 2 )(1 ) (originalKey, value) }.reduceByKey(_ + _)
资源调优
参数
Spark
Flink
并行度
spark.default.parallelism
parallelism.default
内存
spark.executor.memory
taskmanager.memory.process.size
Shuffle
spark.shuffle.file.buffer
taskmanager.network.memory.fraction
总结
问题类型
单机解法
分布式解法
近似解法
TopK
最小堆 O(N log K)
Spark top(K)
—
排序
外部归并排序
Spark sortBy
—
去重
哈希分区 + HashSet
Spark distinct
布隆过滤器
词频统计
哈希分区 + HashMap
Spark reduceByKey
Count-Min Sketch
共同元素
哈希分区 + HashSet
Spark intersection
布隆过滤器
中位数
二分搜索 / 分桶
Spark sortBy + zipWithIndex
采样
核心思想 :分而治之 ——将大问题分解为内存可处理的小问题,然后合并结果。
参考资料