Skip to content

海量数据求TopK与频率统计

场景引入

面试中有两道经典的海量数据题:

题目一:10亿个整数存在文件中,内存有限(比如只有 512MB),如何找出最大的 Top 100 个整数?

题目二:100GB 的搜索关键字日志文件,如何统计出现频率最高的 Top 100 个关键词?

这两道题看似相似,但解法不同。第一题的关键是极值筛选,第二题的关键是频率聚合

它们有一个共同点:数据量远超内存,不可能一次加载到内存中处理。

下面分别拆解。

题型三:海量数据求TopK

核心思路

这道题的标准答案是小顶堆(Min-Heap)

思路非常直觉:维护一个大小为 K 的小顶堆,从文件中分批读取数据,每读到一个数就往堆里塞。堆满了(size > K)就弹出堆顶——堆顶是当前堆中最小的元素,弹掉它正好保留了更大的数。

遍历完所有数据后,堆中剩下的就是最大的 K 个元素。

小顶堆TopK流程

为什么用小顶堆而不是大顶堆?

  • 大顶堆:堆顶是最大值,弹出的是最大值,留下的反而是小的——方向反了。
  • 小顶堆:堆顶是最小值,弹出最小值,留下的都是大的——正好是我们要的。

复杂度分析

  • 时间:O(N log K),N 是总数据量,每个元素最多做一次堆插入 O(log K)
  • 空间:O(K),堆的大小固定为 K

三个层次的解法

面试中如果面试官追问"还能怎么优化",可以逐步升级:

层级方案适用场景
单机单线程小顶堆数据量大但单机能处理
单机多线程小顶堆 + 多线程分片尝试用多核加速
多机分布式小顶堆 + MapReduce数据量极大,单机处理太慢

下面用代码实验每个层级的实际表现。

单线程实现

最基础的版本,用 Java 内置的 PriorityQueue(默认就是小顶堆):

java
static PriorityQueue<Integer> findTopK(){
    LargeFileReader largeFileReader = new LargeFileReader(SOURCE_FILE_PATH, BATCH_SIZE);
    PriorityQueue<Integer> queue = new PriorityQueue<>();
    while (largeFileReader.canRead()){
        List<String> nextLines = largeFileReader.getNextLines();
        for (String nextLine : nextLines) {
            queue.add(Integer.parseInt(nextLine));
            if (queue.size() > K_VALUE){
                queue.poll();
            }
        }
    }
    return queue;
}

代码逻辑

  1. LargeFileReader 封装了分批读取大文件的逻辑,每次读 BATCH_SIZE
  2. 对每一行解析成整数,加入小顶堆
  3. 堆的大小超过 K 就弹出堆顶(最小值)
  4. 遍历结束,堆中就是最大的 K 个数

实验结果:10亿整数文件,K=100,耗时约 180s

180秒——能不能用多线程加速?

多线程实现

思路:把文件分给多个线程读,每个线程各自维护一个小顶堆求局部 TopK,最后把所有线程的结果合并。

java
public static PriorityQueue<Integer> findTopKV2() throws InterruptedException, IOException, ExecutionException {
    LargeFileReader largeFileReader = new LargeFileReader(SOURCE_FILE_PATH, BATCH_SIZE);
    ExecutorService executorService = Executors.newFixedThreadPool(CORE_SIZE);
    List<Future<PriorityQueue<Integer>>> futures = new ArrayList<>();

    for (int i = 0; i < CORE_SIZE; i++) {
        TopKFinder finder = new TopKFinder(K_VALUE, largeFileReader);
        Future<PriorityQueue<Integer>> future = executorService.submit(finder);
        futures.add(future);
    }

    List<PriorityQueue<Integer>> queues = new ArrayList<>();
    for (Future<PriorityQueue<Integer>> future : futures) {
        queues.add(future.get());
    }

    executorService.shutdown();
    return MergeTopK.merge(queues, K_VALUE);
}

每个线程执行的 TopKFinder

java
class TopKFinder implements Callable<PriorityQueue<Integer>> {
    private final int kValue;
    private final LargeFileReader reader;

    public TopKFinder(int kValue, LargeFileReader largeFileReader) {
        this.reader = largeFileReader;
        this.kValue = kValue;
    }

    @Override
    public PriorityQueue<Integer> call() {
        PriorityQueue<Integer> queue = new PriorityQueue<>();
        while (reader.canRead()) {
            List<String> nextLines = reader.getNextLines();
            for (String line : nextLines) {
                int value = Integer.parseInt(line);
                queue.add(value);
                if (queue.size() > kValue) {
                    queue.poll();
                }
            }
        }
        return queue;
    }
}

多个线程的局部结果需要合并,MergeTopK 把所有局部堆合成一个全局 TopK:

java
class MergeTopK {
    public static PriorityQueue<Integer> merge(List<PriorityQueue<Integer>> queues, int kValue) {
        PriorityQueue<Integer> finalQueue = new PriorityQueue<>();
        for (PriorityQueue<Integer> queue : queues) {
            while (!queue.isEmpty()) {
                finalQueue.add(queue.poll());
                if (finalQueue.size() > kValue) {
                    finalQueue.poll();
                }
            }
        }
        return finalQueue;
    }
}

实验对比

多线程真的能加速吗?实验结果出乎意料:

方案耗时说明
单线程小顶堆180s基准方案
多线程(预处理分片)>180s光是把数据分片写到多个文件就超过 180s
多线程(加锁共享读)257s为线程安全对文件读取加锁,CPU 利用率反而很低
多线程(无锁方案)~120s有提升但不明显

TopK多线程堆内存持续增长

为什么多线程没用?

瓶颈在 IO,不在 CPU。

TopK 问题的计算量极小——每个元素只做一次堆插入,O(log K) 在 K=100 时几乎可以忽略。真正耗时的是从磁盘读取 10 亿行数据。

多线程的问题:

  1. 文件读取是天然的串行操作。多个线程共享一个文件读取器,要么加锁(降低并发度),要么无锁(需要复杂的线程安全设计)
  2. 加锁后反而更慢。线程大部分时间在等锁,CPU 利用率只有个位数百分比
  3. 预处理分片本身就很慢。把一个大文件拆成多个小文件,IO 开销已经超过了单线程直接处理的时间

结论:对于 IO 密集型的 TopK 问题,多线程并没有太大提升。单线程小顶堆就是最实用的方案。

如果真的要加速,方向应该是:

  • SSD 替换 HDD:提升磁盘读取速度
  • 多机分布式:数据本身就分布在多台机器上,各自求局部 TopK 再合并
  • 内存映射文件(mmap):减少系统调用开销

题型四:海量数据频率统计

核心思路

频率统计和 TopK 不同——你不仅要知道谁大,还要先数清楚每个元素出现了多少次

两种思路:

思路一:排序再统计

先对 100GB 的数据做外部排序,排好序后相同的元素一定相邻,再扫描一遍统计频率。

问题:这又回到了海量数据排序问题,排序本身就很慢。

思路二:Hash 分片 → 分片统计 → 汇总 TopK

这是更好的方案:

  1. 分片:对每个关键字取 hash,按 hash 值分到不同的小文件中。相同的关键字一定会被分到同一个文件
  2. 分片统计:对每个小文件,用 HashMap 统计词频,再用小顶堆求该文件的 Top K
  3. 全局汇总:把所有分片的 Top K 结果合并,再用小顶堆求全局 Top K

Hash分片流程

为什么 Hash 分片能工作?

关键在于:hash(keyword) % N 保证相同的关键字一定落在同一个分片中。这样每个分片可以独立统计词频,不会漏算。

分片实现

先把大文件按 hash 分成多个小文件:

java
public static void divide(){
    for (int i = 0; i < TOTAL_INT_COUNT; i++) {
        LargeFileReader reader = new LargeFileReader(SOURCE_FILE_PATH, BATCH_SIZE);
        List<Integer> outPutList = new LinkedList<>();
        while (reader.canRead()){
            List<String> nextLines = reader.getNextLines();
            for (String nextLine : nextLines) {
                int intVal = Integer.parseInt(nextLine);
                if (intVal % 10 == i){
                    outPutList.add(intVal);
                }
            }
        }
        FileUtil.appendToFile(outPutList, TEMP_FOLDER_PATH + "/" + i + ".txt");
    }
}

这里用 intVal % 10 做分片,把数据分成 10 个文件。实际生产中分片数可以根据内存大小调整——确保每个分片的数据能放进内存做 HashMap 统计。

分片统计 + 汇总 TopK

对每个分片文件,用 HashMap 统计词频,再用小顶堆求局部 TopK,最后合并所有分片的结果:

java
public static LinkedHashMap<Integer,Integer> topFreqK() {
    divide();
    File file = new File(TEMP_FOLDER_PATH);
    File[] files = file.listFiles();
    PriorityQueue<Map.Entry<Integer,Integer>> q = new PriorityQueue<>(Map.Entry.comparingByValue());
    for (File f : files) {
       Map<Integer, Integer> freq = new HashMap<>();
        LargeFileReader largeFileReader = new LargeFileReader(f.getAbsolutePath(), BATCH_SIZE);
        while (largeFileReader.canRead()){
            List<String> nextLines = largeFileReader.getNextLines();
            for (String nextLine : nextLines) {
                int value = Integer.parseInt(nextLine);
                freq.put(value,freq.getOrDefault(value,0)+1);
            }
        }
        PriorityQueue<Map.Entry<Integer, Integer>> entries = fromMap(freq);
        q.addAll(entries);
        while (q.size() > K_VALUE) {
            q.poll();
        }
    }

    LinkedHashMap<Integer, Integer> res = new LinkedHashMap<>();
    for (Map.Entry<Integer, Integer> et : q) {
        res.put(et.getKey(), et.getValue());
    }
    return res;
}

代码逻辑

  1. 先调用 divide() 完成分片
  2. 遍历每个分片文件,用 HashMap<Integer, Integer> 统计词频
  3. 把当前分片的词频结果加入全局小顶堆(按频率排序)
  4. 堆大小超过 K 就弹出频率最低的
  5. 最终堆中就是全局频率 Top K

分片数怎么定?

分片数的选择原则:每个分片的数据量 < 可用内存

举个例子:

  • 100GB 数据,可用内存 4GB
  • 分片数 = 100GB / 4GB = 25,取 32(2 的幂次更方便取模)
  • 每个分片约 3.1GB,可以放进内存用 HashMap 统计

如果某个分片因为 hash 冲突导致数据倾斜,超出内存限制,可以对该分片再做二次分片。

面试要点总结

TopK 问题

维度要点
核心数据结构小顶堆(PriorityQueue)
时间复杂度O(N log K)
空间复杂度O(K)
为什么不排序排序是 O(N log N),且需要 O(N) 空间
多线程有用吗帮助有限,瓶颈在 IO 不在 CPU
分布式方案各节点求局部 TopK,汇总节点合并

频率统计问题

维度要点
核心思路Hash 分片 + HashMap 统计 + 小顶堆
分片目的让相同元素落在同一分片,且单个分片能放进内存
时间复杂度分片 O(N) + 统计 O(N) + TopK O(N log K)
分布式方案MapReduce:Map 阶段按 key 分片统计,Reduce 阶段汇总

面试回答模板

TopK 问题

维护一个大小为 K 的小顶堆,分批从文件读取数据,每个元素入堆后如果堆大小超过 K 就弹出堆顶。遍历完成后堆中就是最大的 K 个元素。时间 O(N log K),空间 O(K)。如果是分布式场景,每台机器各自求局部 TopK,最后合并。

频率统计问题

先用 Hash 分片把数据拆成多个小文件,保证相同的关键字落在同一个文件。然后对每个小文件用 HashMap 统计词频,再用小顶堆求局部 Top K。最后把所有分片的结果合并求全局 Top K。分布式场景就是 MapReduce。

延伸问题

面试官可能会继续追问:

  • 如果 K 很大怎么办? K 很大时堆操作的 log K 不可忽略,可以考虑快速选择算法 O(N)
  • 如果数据是流式的怎么办? 用堆在线维护 TopK,每来一个新数据就更新堆
  • 如果要求近似结果可以吗? 可以用 Count-Min Sketch 做近似频率统计,用 Lossy Counting 做近似 TopK
  • MapReduce 具体怎么做? Map 阶段输出 (key, 1),Combiner 做局部聚合,Reduce 阶段求全局频率再取 TopK

面试算法可视化图解