数据流的中位数

标签: 设计 双指针 数据流 排序 堆(优先队列)

难度: Hard

中位数是有序整数列表中的中间值。如果列表的大小是偶数,则没有中间值,中位数是两个中间值的平均值。

  • 例如 arr = [2,3,4] 的中位数是 3 。
  • 例如 arr = [2,3] 的中位数是 (2 + 3) / 2 = 2.5

实现 MedianFinder 类:

  • MedianFinder() 初始化 MedianFinder 对象。

  • void addNum(int num) 将数据流中的整数 num 添加到数据结构中。

  • double findMedian() 返回到目前为止所有元素的中位数。与实际答案相差 10-5 以内的答案将被接受。

示例 1:

输入
["MedianFinder", "addNum", "addNum", "findMedian", "addNum", "findMedian"]
[[], [1], [2], [], [3], []]
输出
[null, null, null, 1.5, null, 2.0]

解释
MedianFinder medianFinder = new MedianFinder();
medianFinder.addNum(1);    // arr = [1]
medianFinder.addNum(2);    // arr = [1, 2]
medianFinder.findMedian(); // 返回 1.5 ((1 + 2) / 2)
medianFinder.addNum(3);    // arr[1, 2, 3]
medianFinder.findMedian(); // return 2.0

提示:

  • -105 <= num <= 105
  • 在调用 findMedian 之前,数据结构中至少有一个元素
  • 最多 5 * 104 次调用 addNum 和 findMedian

Submission

运行时间: 272 ms

内存: 37.0 MB

class MedianFinder:

    def __init__(self):
        self.left = []
        self.right = []

    def addNum(self, num: int) -> None:
        if len(self.left)==len(self.right):
            if not self.right:
                heapq.heappush(self.left,-num)
            else:
                if num<=self.right[0]:
                    heapq.heappush(self.left,-num)
                else:
                    r_top = heapq.heappop(self.right)
                    heapq.heappush(self.left,-r_top)
                    heapq.heappush(self.right,num)
        else:
            if num>=-self.left[0]:
                heapq.heappush(self.right,num)
            else:
                l_top = -heapq.heappop(self.left)
                heapq.heappush(self.right,l_top)
                heapq.heappush(self.left,-num)
        

    def findMedian(self) -> float:
        if len(self.left)==len(self.right):
            return float((self.right[0]-self.left[0]))/2
        else:
            return -self.left[0]


# Your MedianFinder object will be instantiated and called as such:
# obj = MedianFinder()
# obj.addNum(num)
# param_2 = obj.findMedian()

Explain

这个题解使用了两个堆来实现数据流的中位数查找。左边的堆是一个大顶堆,存储较小的一半数据;右边的堆是一个小顶堆,存储较大的一半数据。通过维护两个堆的大小平衡,并且保证左边堆的最大值小于等于右边堆的最小值,就可以在 O(1) 的时间复杂度内找到中位数。当数据总数为奇数时,中位数就是左边堆的堆顶元素;当数据总数为偶数时,中位数就是左右两个堆堆顶元素的平均值。

时间复杂度: O(log n)

空间复杂度: O(n)

class MedianFinder:

    def __init__(self):
        self.left = []  # 大顶堆,存储较小的一半数据
        self.right = []  # 小顶堆,存储较大的一半数据

    def addNum(self, num: int) -> None:
        if len(self.left) == len(self.right):
            if not self.right:
                heapq.heappush(self.left, -num)  # 如果右边堆为空,直接加入左边堆
            else:
                if num <= self.right[0]:
                    heapq.heappush(self.left, -num)  # 如果新元素小于等于右边堆的最小值,加入左边堆
                else:
                    r_top = heapq.heappop(self.right)  # 否则,将右边堆的最小值移到左边堆,新元素加入右边堆
                    heapq.heappush(self.left, -r_top)
                    heapq.heappush(self.right, num)
        else:
            if num >= -self.left[0]:
                heapq.heappush(self.right, num)  # 如果新元素大于等于左边堆的最大值,加入右边堆
            else:
                l_top = -heapq.heappop(self.left)  # 否则,将左边堆的最大值移到右边堆,新元素加入左边堆
                heapq.heappush(self.right, l_top)
                heapq.heappush(self.left, -num)
        
    def findMedian(self) -> float:
        if len(self.left) == len(self.right):
            return float((self.right[0] - self.left[0])) / 2  # 如果两个堆大小相等,返回两个堆顶元素的平均值
        else:
            return -self.left[0]  # 如果两个堆大小不相等,返回左边堆的堆顶元素

Explore

在`addNum`方法中,比较数字与小顶堆的最小值和大顶堆的最大值是为了维护两个堆的性质,即大顶堆存储较小的一半数据,小顶堆存储较大的一半数据。通过这种比较,我们可以正确地将新数放入适当的堆中。此外,这种比较确保大顶堆中的任何元素都不会大于小顶堆中的任何元素,这是计算中位数的关键前提。

当新元素加入时,可能破坏两个堆的大小平衡或堆的性质。如果新元素属于当前较小堆(大顶堆)的范围但两堆大小已不平衡,需要将大顶堆的最大元素移至小顶堆,并将新元素放入大顶堆,以保持平衡。反之亦然。这种调整确保了两个堆的大小最多相差一个元素,同时保持大顶堆的所有值仍然小于或等于小顶堆的所有值,这对于快速找到正确的中位数是必要的。

通过在`addNum`方法中适当地比较和调整元素,确保只有当新元素大于大顶堆的最大值时,它才可能被添加到小顶堆;如果它小于小顶堆的最小值,则被添加到大顶堆。若需要调整以保持堆的性质和大小平衡,将大顶堆的最大值移到小顶堆,或将小顶堆的最小值移到大顶堆,从而始终保持大顶堆的所有值小于或等于小顶堆的所有值。

当两个堆的大小相等时,它们各包含了一半的元素。在这种情况下,大顶堆的堆顶元素是较小一半中的最大值,小顶堆的堆顶元素是较大一半中的最小值。因此,这两个堆顶元素的平均值代表了整个数据流的中间值,即中位数。这种方法可以快速有效地计算出中位数,无需对数据进行完全排序。