数据流中的中位数

标签: 设计 双指针 数据流 排序 堆(优先队列)

难度: Hard

中位数 是有序整数列表中的中间值。如果列表的大小是偶数,则没有中间值,中位数是两个中间值的平均值。

例如,
[2,3,4] 的中位数是 3
[2,3] 的中位数是 (2 + 3) / 2 = 2.5
设计一个支持以下两种操作的数据结构:

  • void addNum(int num) - 从数据流中添加一个整数到数据结构中。
  • double findMedian() - 返回目前所有元素的中位数。

示例 1:

输入:
["MedianFinder","addNum","addNum","findMedian","addNum","findMedian"]
[[],[1],[2],[],[3],[]]
输出:[null,null,null,1.50000,null,2.00000]

示例 2:

输入:
["MedianFinder","addNum","findMedian","addNum","findMedian"]
[[],[2],[],[3],[]]
输出:[null,null,2.00000,null,2.50000]

提示:

  • 最多会对 addNum、findMedian 进行 50000 次调用。

注意:本题与主站 295 题相同:https://leetcode-cn.com/problems/find-median-from-data-stream/

Submission

运行时间: 212 ms

内存: 25.5 MB

from heapq import heappush, heappop


class MedianFinder:

    def __init__(self):
        """
        initialize your data structure here.
        """
        self.A = []
        self.B = []

    def addNum(self, num: int) -> None:
        if len(self.A) != len(self.B):
            heappush(self.A, num)
            heappush(self.B, -heappop(self.A))
        else:
            heappush(self.B, -num)
            heappush(self.A, -heappop(self.B))

    def findMedian(self) -> float:
        return self.A[0] if len(self.A) != len(self.B) else (self.A[0] - self.B[0]) / 2.



# Your MedianFinder object will be instantiated and called as such:
# obj = MedianFinder()
# obj.addNum(num)
# param_2 = obj.findMedian()

Explain

这个解决方案使用了两个堆,一个最大堆(A)和一个最小堆(B),来维护数据流的中位数。最大堆A负责存储数据流中较小的一半,而最小堆B存储较大的一半。通过确保最大堆的大小总是等于或比最小堆多一个元素,我们可以有效地计算中位数。当添加一个新元素时,我们根据两个堆的大小来决定将其放入哪个堆,并可能需要调整堆,以保持大小的平衡。当寻找中位数时,如果两个堆的大小相同,则中位数是两个堆顶元素的平均值;如果不同,则是元素更多的堆的堆顶元素。

时间复杂度: O(log n)

空间复杂度: O(n)

from heapq import heappush, heappop


class MedianFinder:

    def __init__(self):
        \"\"\"
        initialize your data structure here.
        初始化两个堆:A(最大堆,用负数实现)和 B(最小堆)。
        \"\"\"
        self.A = []  # 最大堆,存储较小一半的元素,使用负数来表示最大堆
        self.B = []  # 最小堆,存储较大一半的元素

    def addNum(self, num: int) -> None:
        if len(self.A) != len(self.B):
            heappush(self.A, num)  # 先尝试加入最大堆
            heappush(self.B, -heappop(self.A))  # 平衡两个堆的元素数量
        else:
            heappush(self.B, -num)  # 先尝试加入最小堆(注意转为负数)
            heappush(self.A, -heappop(self.B))  # 平衡两个堆的元素数量

    def findMedian(self) -> float:
        if len(self.A) != len(self.B):
            return self.A[0]  # 如果最大堆元素较多,直接返回最大堆的堆顶元素
        else:
            return (self.A[0] - self.B[0]) / 2.0  # 如果堆的大小相同,返回两个堆顶元素的平均值

# Your MedianFinder object will be instantiated and called as such:
# obj = MedianFinder()
# obj.addNum(num)
# param_2 = obj.findMedian()

Explore

在添加元素时,根据两个堆的当前大小决定元素加入哪个堆的目的是保持两个堆的大小平衡或仅相差一个元素。这样做可以确保中位数可以快速从堆中被提取。最大堆存储较小的一半元素,而最小堆存储较大的一半元素。通过维持两个堆的平衡,可以确保中位数是可达的,无论元素总数是奇数还是偶数。

在addNum方法中,先将新元素加入一个堆然后将顶部元素移动到另一个堆的目的是保证两个堆维护的元素仍满足最大堆存储较小一半、最小堆存储较大一半的原则,并且两个堆的大小差不超过1。这种操作可以保证新加入的元素被正确分类,并且维持了堆的结构和大小平衡,从而快速有效地找到中位数。

在findMedian方法中,如果两个堆的大小相同,意味着数据流中的元素数量是偶数。根据中位数的定义,在偶数个数的数据集中,中位数是中间两个数的平均值。因此,最大堆的堆顶元素(较小的一半中最大的元素)和最小堆的堆顶元素(较大的一半中最小的元素)的平均值,正是整个数据集中间的两个数的平均值,也就是中位数。

Python中的heapq库只提供了最小堆的实现。为了使用最小堆的特性实现最大堆的效果,代码中将最大堆中的元素以负数形式存储。这样,原本较大的数在取负后变得较小,可以在最小堆的顶部被提取出来,实现了最大值的效果。通过这种方式,我们可以使用heapq库来管理一个最大堆。