队列 Queue
根据教材 《数据结构与算法 Python 实现》 整理,代码和笔记可前往我的 Github 库下载 isKage/dsa-notes 。【建议 star !】
本章介绍了队列 (Queue) 数据类型,并实现了队列的抽象数据类型 (Queue ADT) 。并基于数组和链表分别实现了队列、循环队列和双端队列。由于队列的先进先出性 (FIFO) 能够实现很多应用,本章介绍了两个实际问题的应用:1. 使用队列实现杨辉三角;2. 使用栈+回溯法实现了迷宫问题的路径寻找、使用队列+洪水算法实现了迷宫问题的最短路寻找。
1 队列的概念
队列:限制数据插入在一端进行、删除在另一端进行的特殊序列(FIFO)
- 允许插入的一端称为队尾,允许删除的一端称为队头
- 在队尾插入元素称为入队(enqueue),在队头删除元素称为出队(dequeue)
- 队列中元素个数称为队列的长度
2 队列的抽象数据类型
1 2 3 4 5 6 7 8 9 10 11
| ADT Queue { 数据对象:Q = {Q1, Q2, ..., QN} 基本操作: Q.enqueue(e): 向队列 Q 的队尾添加一个元素 Q.dequeue(): 从队列 Q 中移除并返回第一个元素 Q.first(): 在不移除的前提下,返回队列的第一个元素 len(Q): 返回队列 Q 的长度 Q.init(Q0): 使用序列 Q0 初始化队列 Q Q.is_empty(): 检查队列 Q 是否为空,如为空则返回 True Q.clear(): 清空队列 Q } ADT Queue
|
例如:下面给出了一个队列的例子和一些操作的影响

3 基于数组的队列实现
对于队列,可以使用数组的方式来实现队列:
- 潜在的问题:
append(e)
高效,但 pop()
很低效。
- 解决方案:用一个变量存储当前队头元素的索引,用指针指向
None
表示数组中的离队元素。
- 缺点:存储队列的列表长度为
O(m)
,其中 m 为自队列创建以来追加元素操作的数量总和,可能远远大于队列长度。即列表可能长度不足,当队列满时,仍然需要进行动态序列扩展,效率会有所较低(摊销成本)。

例如:上面 f
指代了队列的头部,如果队列头部元素出队,则 f
指针后移,其前面的元素置空 None
从而实现出队操作。
3.1 循环使用数组
上面的方法会出现空间的浪费,例如 f
前面的出队后的空位,这部分的空间也可以利用起来。于是,循环使用数组的方法被发现。即:当新的元素入队时,如果列表在 f
的后面全满,则可以将新元素放在列表的头部(之前出队后空出的位置),这一个操作可以通过取模实现 f = (f + 1) % N
。

3.2 代码实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95
| class Empty(Exception): """队列为空的异常类""" pass
class ArrayQueue: """FIFO 队列实现""" DEFAULT_CAPACITY = 10
def __init__(self): """初始化空队列,预留空间 DEFAULT_CAPACITY""" self._data = [None] * ArrayQueue.DEFAULT_CAPACITY self._size = 0 self._front = 0
def __len__(self): """返回长度""" return self._size
def is_empty(self): """判断是否为空""" return self._size == 0
def first(self): """返回第一个元素""" if self.is_empty(): raise Empty("Queue is empty")
return self._data[self._front]
def dequeue(self): """从头部出队""" if self.is_empty(): raise Empty("Queue is empty")
answer = self._data[self._front] self._data[self._front] = None self._front = (self._front + 1) % len(self._data) self._size -= 1 return answer
def enqueue(self, e): """队尾插入元素 e""" if self._size == len(self._data): self._resize(2 * len(self._data))
avail = (self._front + self._size) % len(self._data) self._data[avail] = e self._size += 1
def _resize(self, capacity): """空间为 capacity 新列表""" old = self._data self._data = [None] * capacity
walk = self._front for k in range(self._size): self._data[k] = old[walk] walk = (walk + 1) % len(old) self._front = 0
@classmethod def from_list(cls, l): """从列表快速产生队列""" aq = cls() for e in l: aq.enqueue(e) return aq
def __str__(self): """以列表形式展示""" tmp = [] walk = self._front for _ in range(self._size): tmp.append(self._data[walk]) walk = (walk + 1) % len(self._data) return str(tmp)
def clear(self): """清空队列,保持当前容量,仅清除有效元素""" if not self.is_empty(): walk = self._front for _ in range(self._size): self._data[walk] = None walk = (walk + 1) % len(self._data) self._size = 0 self._front = 0
def __iter__(self): """返回一个迭代器,用于遍历队列中的元素""" walk = self._front for _ in range(self._size): yield self._data[walk] walk = (walk + 1) % len(self._data)
|
3.3 算法分析
基于数组的队列实现,在不考虑数组的前提下, 都是常数级的复杂度。考虑数组的动态扩展,则在摊销成本下常数级的复杂度。

4 基于单向链表的队列实现
这一部分可见链表章节的介绍 Blog Link 或 知乎链接 。代码实现见下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
| class LinkedQueue: """单向链表实现队列,先进先出"""
class _Node: """单向链表的节点,非公有,实现队列""" __slots__ = '_element', '_next'
def __init__(self, element, next): self._element = element self._next = next
def __init__(self): """初始化空队列""" self._head = None self._tail = None self._size = 0
def __len__(self): """返回队列长度""" return self._size
def is_empty(self): """检查是否为空队列""" return self._size == 0
def first(self): """展示队列第一个元素值,但不改变队列""" if self.is_empty(): raise Empty('Queue is empty') return self._head._element
def dequeue(self): """删除并返回队列第一个节点和元素""" if self.is_empty(): raise Empty('Queue is empty')
ans = self._head._element self._head = self._head._next self._size -= 1 if self.is_empty(): self._tail = None return ans
def enqueue(self, e): """在尾部增加新节点""" newest = self._Node(e, None) if self.is_empty(): self._head = newest else: self._tail._next = newest self._tail = newest self._size += 1
|
5 循环队列
轮转调度程序:以循环的方式迭代地遍历一个元素的集合,并通过执行一个给定的动作为集合中的每个元素进行“服务”。
例如:在队列 Q
反复进行下面的步骤,即可轮转调度为每个元素都进行“服务”。
e = Q.dequeue()
:从队列取出元素 e (下一个元素出队)
f(e)
:为元素 e 提供服务、进行操作(“服务”下一个元素)
Q.enqueue(e)
:e 被重新加入队列尾部(所“服务”的元素入队)
利用循环链表实现循环队列见 Blog Link 或 知乎链接 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
| class LinkedCircularQueue: """循环链表实现循环队列"""
class _Node: """单向链表的节点,非公有,实现循环队列""" __slots__ = '_element', '_next'
def __init__(self, element, next): self._element = element self._next = next
def __init__(self): """初始化空队列""" self._tail = None self._size = 0
def __len__(self): """返回长度""" return self._size
def is_empty(self): """判断是否为空""" return self._size == 0
def first(self): """展示队列第一个元素值 tail.next ,但不改变队列""" if self.is_empty(): raise Empty('Queue is empty') head = self._tail._next return head._element
def dequeue(self): """删除并返回队列头节点 tail.next 和元素""" if self.is_empty(): raise Empty('Queue is empty')
oldhead = self._tail._next if self._size == 1: self._tail = None else: self._tail._next = oldhead._next self._size -= 1 return oldhead._element
def enqueue(self, e): """在尾部 tail 增加新节点""" newest = self._Node(e, None) if self.is_empty(): newest._next = newest else: newest._next = self._tail._next self._tail._next = newest self._tail = newest self._size += 1
def rotate(self): """训练轮转一次""" if self.is_empty(): raise Empty('Queue is empty') self._tail = self._tail._next
|
6 双端队列
6.1 双端队列的概念和抽象数据类型
双端队列:类队列数据结构,支持在队列的头部和尾部都进行插入和删除。双端队列被称为 deque
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| ADT Deque { 数据对象:D = {D1, D2, ..., DN} 基本操作: D.add_first(e): 向双端队列 D 的队头添加一个元素 D.add_last(e): 向队列 D 的队尾添加一个元素 D.delete_first(): 从双端队列 Q 中移除并返回第一个元素 D.delete_last(): 从双端队列 Q 中移除并返回最后一个元素 D.first(): 在不移除的前提下,返回双端队列的第一个元素 D.last(): 在不移除的前提下,返回双端队列的最后一个元素 len(D): 返回双端队列 D 的长度 D.init(D0): 使用序列 D0 初始化队列 D D.is_empty(): 检查双端队列 D 是否为空,如为空则返回 True D.clear(): 清空队列 D } ADT Deque
|
6.2 基于双向链表实现双端队列
详细的搭建可见 Blog Link 或 知乎链接 。
先实现双向链表的基础类 _DoublyLinkedBase
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
| class _DoublyLinkedBase: """双向链表的基础类/父类"""
class _Node: """双向链表的节点类,包含元素值、prev 指针和 next 指针""" __slots__ = '_element', '_prev', '_next'
def __init__(self, element, prev, next): self._element = element self._prev = prev self._next = next
def __init__(self): """初始化一个空链表""" self._header = self._Node(None, None, None) self._trailer = self._Node(None, None, None)
self._header._next = self._trailer self._trailer._prev = self._header
self._size = 0
def __len__(self): """链表长度 len() 重载""" return self._size
def is_empty(self): """判断是否为空""" return self._size == 0
def _insert_between(self, e, predecessor, successor): """在节点 predecessor, successor 插入插入新节点,并返回这个新节点""" newest = self._Node(e, predecessor, successor)
predecessor._next = newest successor._prev = newest
self._size += 1 return newest
def _delete_node(self, node): """传入节点并删除,返回被删除的值""" predecessor = node._prev successor = node._next predecessor._next = successor successor._prev = predecessor
element = node._element node._prev, node._next, node._element = None, None, None
self._size -= 1 return element
|
再实现双向队列 LinkedDeque
,基础父类 _DoublyLinkedBase
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| class LinkedDeque(_DoublyLinkedBase): """双向链表实现双端队列"""
"""不需要定义 `__init__` `__len__` `is_empty` 方法"""
def first(self): """获取第一个元素的值,注意头节点是哨兵,没有值""" if self.is_empty(): raise Empty('Deque is empty')
return self._header._next._element
def last(self): """获取最后一个元素的值,注意尾节点是哨兵,没有值""" if self.is_empty(): raise Empty('Deque is empty')
return self._trailer._prev._element
def insert_first(self, e): """在头部插入元素""" return self._insert_between(e, self._header, self._header._next)
def insert_last(self, e): """在尾部插入元素""" return self._insert_between(e, self._trailer._prev, self._trailer)
def delete_first(self): """删除第一个元素""" if self.is_empty(): raise Empty('Deque is empty')
return self._delete_node(self._header._next)
def delete_last(self): """删除最后一个元素""" if self.is_empty(): raise Empty('Deque is empty')
return self._delete_node(self._trailer._prev)
|
7 队列的实际应用
7.1 杨辉三角
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| def YanghuiTri(n): """杨辉三角队列实现""" TriQ = ArrayQueue()
TriQ.enqueue(1) print('1'.center(5 * n))
for m in range(2, n + 1): Line = '1' TriQ.enqueue(1) p = TriQ.dequeue()
for k in range(2, m): q = TriQ.dequeue() TriQ.enqueue(p + q) Line += ' ' + str(p + q) p = q TriQ.enqueue(1) Line += ' ' + '1' print(Line.center(5 * n))
|
1 2 3 4 5 6 7 8 9 10
| if __name__ == '__main__': YanghuiTri(7)
|
7.2 迷宫问题
考虑一个长为 M 宽为 N 的迷官。其入口在 (1, 1)
处,出口在 (M, N)
处。例如,下图中 M = N = 5
。请设计一个算法,找到一条从入口到出口的路(也可能没有这样的路),并给出其运行的时间复杂度。
问题一:找出一条可能的路径(栈 | 回溯法)
问题二:找出最短的路径(队列 | 洪水算法)

7.2.1 利用栈:回溯法
回溯法的主要想法是深度优先,即一直走知道无路可走再回退。
考虑用栈来实现这个问题,每次存入经过的格子的坐标 (i, j)
,除去来时的路,下一步有 3 种走法,然后继续将新的位置压入栈。每次进行下一步都要检查是否为墙/是否在栈中,当无路可走时,将栈顶元素出栈,继续寻找。
有关栈的抽象数据类型见 Blog Link 或 知乎链接 。代码实现迷宫问题见下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| def maze_path(maze, M, N): """找出一条可能的路径: 栈 | 回溯法""" stack = ArrayStack()
stack.push((1, 1)) visited = set() visited.add((1, 1))
directions = [(0, -1), (0, 1), (-1, 0), (1, 0)]
while not stack.is_empty(): current = stack.top()
if current == (M, N): path = [None] * len(stack) i = len(stack) - 1 while not stack.is_empty(): path[i] = stack.pop() i -= 1 return path
found = False for d in directions: i, j = current[0] + d[0], current[1] + d[1]
if maze[i][j] == 0 and (i, j) not in visited: stack.push((i, j)) visited.add((i, j)) found = True break
if not found: stack.pop()
return None
|
例如:使用 1
代表墙,即不可走;0
表示可以走。最外围的 1
只是为了表示边界,减少对边界判断的繁琐工作。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| if __name__ == '__main__': maze = [ [1, 1, 1, 1, 1, 1, 1], [1, 0, 1, 0, 0, 0, 1], [1, 0, 0, 0, 1, 0, 1], [1, 0, 1, 1, 0, 0, 1], [1, 0, 1, 1, 0, 1, 1], [1, 0, 0, 0, 0, 0, 1], [1, 1, 1, 1, 1, 1, 1] ]
ans = maze_path(maze, len(maze) - 2, len(maze[0]) - 2) print(ans)
|
结果为:
1
| [(1, 1), (2, 1), (2, 2), (2, 3), (1, 3), (1, 4), (1, 5), (2, 5), (3, 5), (3, 4), (4, 4), (5, 4), (5, 5)]
|
算法分析 :按照回溯法,每个位置最多被首次访问一次并入栈,被回溯一次并出栈。在此之间,最多探索该位置周围的位置三次(即常数次操作)故复杂度为 O(MN)
。
7.2.2 利用队列:洪水算法
洪水算法主要思想是广度优先,即保存每一步可能的所有情况,最后先出最短的路。
我们可以使用队列,从 (1, 1)
开始,重复以下操作:取出队头元素,将队头元素可以到达但并未走过的相邻点放入队列,并记录下从队头元素可以立即到达某一相邻点这一信息:
- 如最终抵达终点,则已找到最短路,可根据之前记录的信息复原出这条路。
- 否则,如最终队列为空,则不存在这样的路。
代码实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| def maze_shortest_path(maze, M, N): """找出最短的路径: 队列 | 洪水算法"""
def backtrack_path(parent: dict, start: tuple, end: tuple): """从 end 回溯到 start,生成路径""" path = [] current = end while current != start: path.append(current) current = parent[current] path.append(start) path.reverse() return path
queue = ArrayQueue() queue.enqueue((1, 1)) parent = {}
directions = [(0, -1), (0, 1), (-1, 0), (1, 0)]
while not queue.is_empty(): current = queue.dequeue() maze[current[0]][current[1]] = 2
if current == (M, N): return backtrack_path(parent, (1, 1), (M, N))
for d in directions: i, j = current[0] + d[0], current[1] + d[1] if maze[i][j] == 0 and maze[i][j] != 2: queue.enqueue((i, j)) parent[(i, j)] = current
return None
|
例如:仍然是上面的例子,最短路应该为从左边到底然后向右
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| if __name__ == '__main__': maze = [ [1, 1, 1, 1, 1, 1, 1], [1, 0, 1, 0, 0, 0, 1], [1, 0, 0, 0, 1, 0, 1], [1, 0, 1, 1, 0, 0, 1], [1, 0, 1, 1, 0, 1, 1], [1, 0, 0, 0, 0, 0, 1], [1, 1, 1, 1, 1, 1, 1] ]
ans = maze_path(maze, len(maze) - 2, len(maze[0]) - 2) print(ans)
shortest_ans = maze_shortest_path(maze, len(maze) - 2, len(maze[0]) - 2) print(shortest_ans)
|
1 2 3 4 5 6
| """Output"""
[(1, 1), (2, 1), (2, 2), (2, 3), (1, 3), (1, 4), (1, 5), (2, 5), (3, 5), (3, 4), (4, 4), (5, 4), (5, 5)]
[(1, 1), (2, 1), (3, 1), (4, 1), (5, 1), (5, 2), (5, 3), (5, 4), (5, 5)]
|
算法分析:每个位置最多被放入队列一次、出队一次,算法复杂度 O(MN)
。