在 python 网络通信、爬虫等项目中,都需要使用并发编程。
本篇将讲解并发编程相关内容,包括线程、进程、携程的相关内容。
threading --- 基于线程的并行 — Python 3.12.2 官方文档
concurrent.futures --- 启动并行任务 — Python 3.12.2 官方文档
queue --- 一个同步的队列类 — Python 3.12.2 官方文档

# 进程和线程概述

先来了解一下进程和线程

  • 一个工厂,至少有一个车间,车间里至少有一个工人,最终是工人在工作。
  • 一个程序,至少有一个进程,一个进程中至少有一个线程,最终是线程在工作。

在运行.py 程序时,内部创建一个进程(主进程),在进程中创建了一个线程,由线程逐行运行代码。

线程 是计算机中可以被 CPU 调度的最小单元。
进程 是计算机资源分配的最小单元(进程为线程提供资源)。
一个进程中可以有多个线程,同一个进程中的线程可以共享此项进程中的资源。

以下示例程序将会执行 ARP 扫描,检测局域网内是否有 IP 开启了 50000 端口,在程序中,所有行为都只能通过串行的形式运行,逐一排队执行,前面未完成,后面也无法继续:

import subprocess
import re
import socket
import time
# 执行 ARP 扫描,获取局域网内所有活跃设备的 IP 地址
def arp_scan ():
    arp_result = subprocess.check_output (['arp', '-a']).decode ('cp1252', errors='ignore')
    ip_list = re.findall (r'(\d+\.\d+\.\d+\.\d+)', arp_result)
    return ip_list
# 尝试连接到指定 IP 的 50000 端口
def try_connect (ip):
    with socket.socket (socket.AF_INET, socket.SOCK_STREAM) as s:
        s.settimeout (1)
        try:
            s.connect ((ip, 50000))
            return True
        except socket.error:
            return False
# 执行 ARP 扫描,获取 IP 列表
ips = arp_scan ()
# 标记是否找到服务端
server_found = False
# 遍历 IP 列表,尝试连接到 50000 端口
def scan_list (lst):
    for ip in ips:
        if try_connect (ip):
            print (f"服务端存在,IP 地址为:{ip}")
            server_found = True
            break
scan_list (lst=ips)

# GIL 锁

GIL 锁,全局解释器锁(Global Interpreter Lock),CPython 所特有,让一个进程中同一时刻只能有一个线程可以被 CPU 调用
如果想利用计算机的多核心,让 COU 同时处理一些任务,适适合用多进程开发(资源开销大)
如果不需要利用计算机的多核优势,适合采用多进程开发

常见的程序开发中,计算操作需要使用 CPU 多核优势,IO 操作不需要利用 CPU 多核优势

  • 计算密集型,用多进程,例如大量数据计算
  • IO 密集型,用多线程,例如文件读写、网络数据传输

# 多线程

  • 线程 是独立的处理流程,可以和系统的其他线程并行或并发地执行。

  • 多线程可以共享数据和资源,利用所谓的共享内存空间。

  • 每一个线程基本上包含 3 个元素: 程序计数器寄存器

  • 线程的状态大体上可以分为 ready , running , blocked

  • 多线程编程一般使用共享内容空间进行线程间的通讯,这就使管理内容空间成为多线程编程的重点和难点。

  • 线程的典型应用是应用软件的并行化。

  • 相比于进程,使用线程的优势主要是 性能

# threading 实现多线程

创建一个简单的多线程需要:

  • 1、创建一个线程列表,用于保留对所有线程的引用(非必须) threads = []
  • 2、需要定义一个线程 t = threading.Thread (target, args)target 表示线程需要执行的函数,args 表示函数需要传递的参数
  • 3、将线程对象假如到线程列表(非必须) threads.append (t)
  • 4、线程被创建后不会马上执行,需要手动调用 .start () 方法执行线程: t.start ()
  • 5、阻塞调用 t 线程的主线程,t 线程执行结束,主线程才会继续执行: t.join ()

多线程之间共享全局变量。

Class threading.Thread () 接收如下参数

class threading.Thread (group=None,   ## 一般设置为 None ,这是为以后的一些特性预留的
                       target=None,  ## 当线程启动的时候要执行的函数
                       name=None,    ## 线程的名字,默认会分配一个唯一名字 Thread-N
                       args=(),      ## 传递给 target 的参数元组(整形和浮点型数据输入到 args 里)。
                       kwargs={})    ## 传递给 target 的关键字参数字典(字符串型数据输入到这里)。

# 常用线程方法

start () :启动线程活动,当前线程准备就绪(等待 CPU 调度,具体时间由 CPU 决定)
run () :定义线程的功能函数。
join ([timeout]) :等待子线程终止再继续执行主线程。timeout 表示等待的最长时间。
is_alive () :返回线程是否还活着。
threading.enumerate () : 查看 当前 正在运行的线程,返回一个包含线程对象的元素列表。
threading.current_thread ().name : 获取当前线程名称
setDaemon (bool) : 设置守护线程(必须放在 start 之前)

  • t.setDaemon (True) : 设置为守护前程,主线程执行完毕后,子线程也自动关闭
  • t.setDaemon (Flase) : 设置为非守护前程,主线程等待子线程,子线程执行完毕后,主线程才结束(默认)

一个简单的调用示例如下:

import threading
def function (i):
    print ("function called by thread % i\n" % i)
    return
#threads = []
for i in range (5):
    t = threading.Thread (target=function, args=(i,)) ## 用 function 函数初始化一个 Thread 对象 t,并将参数 i 传入;
    #threads.append (t) 
    t.start () ## 线程被创建后不会马上执行,需要手动调用 .start () 方法执行线程
    t.join () ## 阻塞调用 t 线程的主线程,t 线程执行结束,主线程才会继续执行

function 函数的输入只有一个 int 型数值,这里要注意的是,在使用 threading.Thread () 传参时, arg 需要传入一个元组,所以输入的是 (i,) ,也就是说要加个逗号,。因为 type ((i))<class 'int'>

函数传入参数同时包含浮点型和字符串型数值时

import threading
# 定义一个线程函数,接受浮点型和字符串型参数
def calculate (data_float, data_string):
    result = data_float * 2
    print (f"Thread result for {data_float}: {result}")
    print (f"Additional string data: {data_string}")
# 创建多个线程并启动
threads = []
data_float = [1.5, 2.5, 3.5]  # 浮点型数据
data_string = ["Hello", "World", "Python"]  # 字符串型数据
for i in range (len (data_float)):
    thread = threading.Thread (target=calculate, args=(data_float [i], data_string [i]))
    threads.append (thread)
    thread.start ()
# 等待所有线程执行完成
for thread in threads:
    thread.join ()
print ("All threads have finished execution.")

# 自定义线程类

除了直接实例化 Thread 类,我们还可通过自定义的方式创建线程,自定义类继承自 threading.Thread
在自定义类中重写 run 方法,实例化后线程会执行 run 方法
自定义类的 args 使用 self.args 访问

import threading
class MyThread (threading.Thread):
    def run (self):
        print (' 执行线程 ', self.args)
t = MyThread (args=(1,))
t.start ()

# 线程池

线程池的是在程序启动时预先创建一定数量的线程并保存在内存中,等待任务的到来。当有新任务到来时,线程池会选择一个空闲的线程来执行任务,这样可以避免频繁地创建和销毁线程。当任务完成后,线程并不会立即被销毁,而是返回到线程池中等待下一个任务的到来。这种机制可以有效地减少系统开销,提高程序的性能。

# 线程池的使用

Python 标准库中的 concurrent.futures 模块提供了线程池的实现。该模块包含一个 Executor 类,可以用于创建线程池。 Executor 类有两个子类: ThreadPoolExecutor 和 P rocessPoolExecutor

ThreadPoolExecutor 用于创建线程池,而 ProcessPoolExecutor 用于创建进程池。

要使用线程池,首先需要导入 concurrent.futures 模块,然后创建一个 Executor 对象。接下来,可以使用 Executor 对象的 submit () 方法提交任务到线程池。 submit () 方法接受一个可调用对象和一个可选参数元组,以及任意数量的关键字参数。提交的任务可以是任何可调用对象,包括函数、类实例等。

下面是一个简单的示例代码,演示如何使用 Python 的线程池:

  • ThreadPoolExecutor (5) 创建了一个包含 5 个线程的线程池。
  • task_function 是一个简单的函数,它将输入值平方。
  • pool.submit (task_function,i) 将任务提交给线程池,submit 方法返回一个 Future 对象,可以用来获取任务的结果。
  • 通过 future.result () 获取每个任务的结果。
from concurrent.futures import ThreadPoolExecutor
from unittest import result
# 创建一个包含 5 个线程的线程池
pool = ThreadPoolExecutor (5)
# 定义一个函数,该函数将被线程池中的线程执行
def task_function (x):
    return x * x
# 使用线程池执行任务
results = [pool.submit (task_function, i) for i in range (10)]
# 获取所有任务的结果
for future in results:
    print (future.result ())

再来个例子思考体会一下:

  • 提交任务:使用 executor.submit () 方法将任务提交给线程池。 submit () 方法返回一个 Future 对象,该对象代表了正在执行的任务。我们使用字典 futures 来存储 Future 对象和对应的数字,以便在任务完成后可以找到原始的数字。
  • 获取结果:使用 concurrent.futures.as_completed () 函数来迭代已完成的 Future 对象。当一个任务完成时,我们调用 future.result () 来获取任务的结果。如果任务执行过程中抛出了异常, result () 方法会重新抛出这个异常,我们可以在 try…except 语句中捕获它。
  • 处理异常:在 try…except 语句中,我们检查 future.result () 是否抛出异常。如果抛出异常,我们打印出异常信息;如果没有异常,我们打印出结果。
import math
import concurrent.futures
def calculate_factorial (number):
    """计算一个数的阶乘"""
    return math.factorial (number)
def main ():
    numbers = [1, 2, 3, 4, 5]  # 任务列表,计算这些数的阶乘
    # 创建一个包含 3 个线程的线程池
    with concurrent.futures.ThreadPoolExecutor (max_workers=3) as executor:
        # 使用 submit 函数来提交任务
        futures = {executor.submit (calculate_factorial, number): number for number in numbers}
        # 获取并打印结果
        for future in concurrent.futures.as_completed (futures):
            number = futures [future]
            try:
                result = future.result ()
            except Exception as exc:
                print (f"{number} generated an exception: {exc}")
            else:
                print (f"The factorial of {number} is {result}")
if __name__ == "__main__":
    main ()

使用 map 函数来提交任务并保持执行顺序

import math
import concurrent.futures
def calculate_factorial (number):
    """计算一个数的阶乘"""
    return math.factorial (number)
def main ():
    numbers = [1, 2, 3, 4, 5]  # 任务列表,计算这些数的阶乘
    # 创建一个包含 3 个线程的线程池
    with concurrent.futures.ThreadPoolExecutor (max_workers=3) as executor:
        # 使用 map 函数来提交任务并保持执行顺序
        # 使用 executor.map () 方法将任务提交给线程池。与 executor.submit () 不同,map () 方法会等待所有任务完成,并返回一个结果列表,保持了输入列表的顺序
        results = executor.map (calculate_factorial, numbers)
        # 打印结果
        for number, result in zip (numbers, results):
            print (f"The factorial of {number} is {result}")
if __name__ == "__main__":
    main ()

# 线程池的工作流程

  • 当线程数小于核心线程数时,每提交一个任务就创建一个线程来执行,即使当前有线程处于空闲状态,直到当前线程数达到核心线程数
  • 当前线程数达到核心线程数时,如果这个时候还提交任务,这些任务会被放到工作队列里,等到线程处理完了手头的任务后,会从队列中取任务处理。
  • 当前线程数达到核心线程数并且工作队列也满了,如果这个时候还提交任务,则会继续创建线程来处理,直到线程数达到最大线程数。
  • 当前线程数达到最大线程数并且队列也满了,如果这个时候还提交任务,则会触发饱和策略,如抛出异常或拒绝执行任务。

# 线程安全

并发线程中,多个线程对共享内存进行操作,并且至少有一个可以改变数据。这种情况下如果没有同步机制,那么多个线程之间就会产生竞争,从而导致代码无效或出错。

# 互斥锁 Lock ()

解决多线程竞争问题的最简单的方法就是用 锁 (Lock) 。当一个线程需要访问共享内存时,它必须先获得 Lock 之后才能访问;当该线程对共享资源使用完成后,必须释放 Lock ,然后其他线程在拿到 Lock 进行访问资源。因此,为了避免多线程竞争的出现,必须保证:同一时刻只能允许一个线程访问共享内存。

在实际使用中,该方法经常会导致一种 死锁 现象,原因是不同线程互相拿着对方需要的 Lock ,导致死锁的发生(占有且等待)。

锁有两种状态: locked (被某一线程拿到)和 unlocked (可用状态)
我们有两个方法来操作锁: acquire ()release ()
需要遵循以下规则:

  • 如果状态是 unlocked, 可以调用 acquire () 将状态改为 locked
  • 如果状态是 locked, acquire () 会被 block 直到另一线程调用 release () 释放锁
  • 如果状态是 unlocked, 调用 release () 将导致 RuntimError 异常
  • 如果状态是 locked, 可以调用 release () 将状态改为 unlocked
import threading
shared_resource_with_lock = 0
shared_resource_with_no_lock = 0
COUNT = 100000
shared_resource_lock = threading.Lock () ## 锁
## 有锁的情况
def increment_with_lock ():
    # shared_resource_with_lock 即最外面的 shared_resource_with_lock
    # 这样写就不需要再通过函数的参数引入 shared_resource_with_lock 了
    global shared_resource_with_lock
    for _ in range (COUNT):
        shared_resource_lock.acquire () ## 获取 锁
        shared_resource_with_lock += 1
        shared_resource_lock.release () ## 释放 锁
def decrement_with_lock ():
    global shared_resource_with_lock
    for _ in range (COUNT):
        shared_resource_lock.acquire ()
        shared_resource_with_lock -= 1
        shared_resource_lock.release ()
## 没有锁的情况
def increment_without_lock ():
    global shared_resource_with_no_lock
    for _ in range (COUNT):
        shared_resource_with_no_lock += 1
def decrement_without_lock ():
    global shared_resource_with_no_lock
    for _ in range (COUNT):
        shared_resource_with_no_lock -= 1
if __name__ == "__main__":
    t1 = threading.Thread (target=increment_with_lock)
    t2 = threading.Thread (target=decrement_with_lock)
    t3 = threading.Thread (target=increment_without_lock)
    t4 = threading.Thread (target=decrement_without_lock)
    ## 开启线程
    t1.start ()
    t2.start ()
    t3.start ()
    t4.start ()
    ## .join ()
    t1.join ()
    t2.join ()
    t3.join ()
    t4.join ()
    print ("the value of shared variable with lock management is % s" % shared_resource_with_lock)
    print ("the value of shared variable with race condition is % s" % shared_resource_with_no_lock)

代码运行结果如下

the value of shared variable with lock management is 0
the value of shared variable with race condition is 79714

尽管理论上用锁的策略可以避免多线程中的竞争问题,但是可能会对程序的其他方面产生负面影响。此外,锁的策略经常会导致不必要的开销,也会限制程序的可扩展性和可读性。更重要的是,有时候需要对多进程共享的内存分配优先级,使用锁可能和这种优先级冲突。从实践的经验来看,使用锁的应用将对 debug 带来不小的麻烦。所以,最好使用其他可选的方法确保同步读取共享内存,避免竞争条件。

# 递归锁 Rlock ()

为了保证 “只有拿到锁的线程才能释放锁”,那么应该使用 RLock () 对象;

Lock () 一样, RLock () 也有 acquire ()release () 两种方法;

RLock () 有三个特点:

  • 谁拿到谁释放。如果线程 A 拿到锁,线程 B 无法释放这个锁,只有 A 可以释放;

  • 同一线程可以多次拿到该锁,即可以 acquire 多次;

  • acquire 多少次就必须 release 多少次,只有最后一次 release 才能改变 RLock 的状态为 unlocked;

import threading
import time
class Box (object):
    lock = threading.RLock ()
    def __init__(self):
        self.total_items = 0
    def execute (self, n):
        Box.lock.acquire ()
        self.total_items += n
        Box.lock.release ()
    def add (self):
        Box.lock.acquire ()
        self.execute (1)
        Box.lock.release ()
    def remove (self):
        Box.lock.acquire ()
        self.execute (-1)
        Box.lock.release ()
def adder (box, items):
    while items > 0:
        print ("adding 1 item in the box")
        box.add ()
        time.sleep (1)
        items -= 1
def remover (box, items):
    while items > 0:
        print ("removing 1 item in the box")
        box.remove ()
        time.sleep (1)
        items -= 1
if __name__ == "__main__":
    items = 5
    print ("putting % s items in the box"% items)
    box = Box ()
    t1 = threading.Thread (target=adder, args=(box, items))
    t2 = threading.Thread (target=remover, args=(box, items))
    t1.start ()
    t2.start ()
    t1.join ()
    t2.join ()
    print ("% s items still remain in the box" % box.total_items)

代码运行结果如下:

putting 5 items in the box
adding 1 item in the box
removing 1 item in the box
adding 1 item in the box
removing 1 item in the box

removing 1 item in the box
adding 1 item in the box

removing 1 item in the box
adding 1 item in the box
adding 1 item in the box
removing 1 item in the box

0 items still remain in the box 

Box 类的 execute () 方法包含 RLockadder ()remover () 方法也包含 RLock ,就是说无论是调用 Box 还是 adder () 或者 remover () ,每个线程的每一步都有拿到资源、释放资源的过程。

# 信号量 Semaphore ()

信号量的定义: 信号量是一个内部数据,用于标明当前的共享资源可以有多少并发读取。

信号量是由操作系统管理的一种抽象数据类型,用于多线程中同步对共享资源的使用;

信号量是一个内部数据,用于表明当前共享资源可以有多少并发读取;

在 Threading 中,信号量的操作有两个函数: acquire ()release () ;解释如下:

  • 每当线程想要读取关联了信号量的共享资源时,必须调用 acquire () ,此操作减少信号量的内部变量,如果此变量的值非负,那么分配该资源的权限。如果是负值,那么线程被挂起,直到有其他的线程释放资源。
  • 当线程不再需要该共享资源,必须通过 release () 释放。这样,信号量的内部变量增加,在信号量等待队列中排在最前面的线程会拿到共享资源的权限。

虽然表面上看信号量机制没什么明显的问题,如果信号量的等待和通知操作都是原子的,确实没什么问题。但如果不是,或者两个操作有一个终止了,就会导致糟糕的情况。

  • 举个例子,假设有两个并发的线程,都在等待一个信号量,目前信号量的内部值为 1。假设第线程 A 将信号量的值从 1 减到 0,这时候控制权切换到了线程 B,线程 B 将信号量的值从 0 减到 - 1,并且在这里被挂起等待,这时控制权回到线程 A,信号量已经成为了负值,于是第一个线程也在等待。

  • 这样的话,尽管当时的信号量是可以让线程访问资源的,但是因为非原子操作导致了所有的线程都在等待状态。

注:"原子" 指的是原子操作,即一个不可分割的操作。在多线程编程中,如果对信号量的等待和通知操作是原子的,意味着它们是以不可分割的方式执行的,其他线程无法在这些操作中插入。这样可以确保在多线程环境中,对信号量的操作是可靠的。

Semaphore () 详解:
threading.Semaphore () 可以创建一个信号量对象,它可以控制对共享资源的访问数量。在创建信号量对象时,可以指定初始的许可数量。每次访问资源时,线程需要获取一个许可;当许可数量不足时,线程将会被阻塞,直到有足够的许可可用。访问资源完成后,线程释放许可,使得其他线程可以继续访问资源。

threading.Semaphore (num) : num 表示初始的许可数量(比如这个数量为 1)

下面的代码展示了信号量的使用,我们有两个线程, producer ()consumer () ,它们使用共同的资源,即 itemproducer () 的任务是生产 itemconsumer () 的任务是消费 item

item 还没有被生产出来, consumer () 一直等待,当 item 生产出来, producer () 线程通知消费者资源可以使用了。

import threading
import time
import random
# 创建一个信号量 semaphore,初始值为 0。
# 信号量是一种同步机制,用于控制对共享资源的访问。
semaphore = threading.Semaphore (0)
print ("init semaphore % s" % semaphore._value)  # 打印初始信号量的值。
# 消费者线程将执行的函数。
def consumer ():
    print ("consumer is waiting.")  # 打印信息,表明消费者正在等待。
    semaphore.acquire ()  # 消费者尝试获取信号量,如果信号量的值小于 1,则等待。
    print ("consumer notify: consumed item number % s" % item)  # 打印消费者消费的项目编号。
    print ("consumer semaphore % s" % semaphore._value)  # 在消费后打印信号量的当前值。
# 生产者线程将执行的函数。
def producer ():
    global item  # 声明 item 为全局变量,以便在函数内部修改。
    time.sleep (10)  # 生产者线程暂停 10 秒,模拟生产过程耗时。
    item = random.randint (0, 1000)  # 生产者生成一个随机的项目编号。
    print ("producer notify : produced item number % s" % item)  # 打印生产者生产的产品编号。
    semaphore.release ()  # 生产者释放信号量,增加信号量的值,允许其他等待的线程继续执行。
    print ("producer semaphore % s" % semaphore._value)  # 在生产后打印信号量的当前值。
# 主程序入口。
if __name__ == "__main__":
    for _ in range (0, 5):  # 循环 5 次,模拟生产和消费过程。
        t1 = threading.Thread (target=producer)  # 创建生产者线程。
        t2 = threading.Thread (target=consumer)  # 创建消费者线程。
        t1.start ()  # 启动生产者线程。
        t2.start ()  # 启动消费者线程。
        t1.join ()  # 等待生产者线程完成。
        t2.join ()  # 等待消费者线程完成。
    print ("program terminated")  # 打印程序结束的信息。

# 多进程

# 进程的概念

程序:静态的,例如 XX.py , 这是一个程序
进程:程序运行起来以后,代码 + 用到的资源 称之为进程,它是操作系统分配资源的基本单元。

进程的状态:
工作中,任务数往往大于 CPU 核数,即有一些任务正在执行,而另一些任务在等待 CPU 进行执行,因此导致了有不同的状态
新建、就绪、运行、等待(阻塞)、死亡

  • 就绪态:满足运行条件,可以被 CPU 调度执行
  • 执行态:CPU 正在执行其功能
  • 阻塞态:等待某些条件满足,例如 IO 操作

# multiprocessing 实现多进程

进程是通过 multiprocessing.Process () 来实现,执行 start () 后,子进程会将主进程的代码 + 资源复制一份。

class Process (
    group: None = None,
    target: ((...) -> object) | None = None,
    name: str | None = None,
    args: Iterable [Any] = (),
    kwargs: Mapping [str, Any] = {},
    *,
    daemon: bool | None = None
)

# multiprocessing 实现多进程

实现多进程的方式与多线程类似,多进程对象需要通过 multiprocessing.Process () 来创建

常用进程方法

start ():启动子进程实例(创建子进程)
is_alive ():判断子进程是否还在活着
join ([timeout]):是否等待子进程执行结束,或等待多少秒
terminate ():不管任务是否完成,立即终止子进程用

以下是一个简单的多进程示例,运行代码后,process 和 main 几乎同时输出

import time
import multiprocessing
def fun_process (t):
    while True:
        print ('-process-')
        time.sleep (t)
if __name__ == '__main__':
    p = multiprocessing.Process (target=fun_process,args=(1,))
    p.start ()
    # 主进程单独执行的代码
    while True:
        print ("-main-")
        time.sleep (1)

在 windows 系统上写多进程时,需要将进程创建写在 if __name__ == '__main__':
Linux 创建进程基于 fork,windows 基于 spawn,mac 都支持,但在 python3.8 之后,默认使用 spawn

# 进程之间的通信

同一个进程内的多个线程之间共享内存,而多个进程在执行过程中拥有独立的内存单元,因此进程之间默认无法进行资源共享,操作系统提供了很多机制实现进程间通信,例如:队列(Queue)、socket

# 队列 (Queue)

队列(Queue)是 python 中一种非常重要的数据结构,但并不是 python 的数据类型,队列遵循先进先出(FIFO)的原则,即先插入的元素先被移除,主要的用途是支持线程安全和高效的队列操作。

  • 队列是一种特殊的线性数据结构;
  • 其中元素的插入和删除操作仅能在队列的两端进行,允许在一端插入数据,在另一端删除数据。
  • 通常用于管理任务、缓冲数据以及实现并发编程。

特征:

  • 先进先出(FIFO):最先添加到队列的元素最先被移除;
  • 线程安全:queue 模块中的队列是线程安全的,可以在多线程环境中安全使用;
  • 支持多种类型:queue 模块提供了多种类型的队列,如 FIFO 队列、优先队列和后进先出(LIFO)队列;

# 创建队列

FIFO(先进先出)、LIFO(后进先出)、优先队列的创建示例如下:

import queue
# 创建一个 FIFO 队列
fifo_queue = queue.Queue ()
# 创建一个 LIFO 队列
lifo_queue = queue.LifoQueue ()
# 创建一个优先队列
priority_queue = queue.PriorityQueue ()

# 队列操作

常用的队列操作方法有:

  • put (item, block=True, timeout=None) :将元素 item 放入队列。如果 blockTrue ,且队列已满,则阻塞直到有空位;如果 timeout 设置,超时后将引发异常。

  • put_nowait (item) : 相当于 put (item, block=False) ,如果队列已满, put_nowait () 方法将不会阻塞,而是立即引发 queue.Full 异常。

  • get (block=True, timeout=None) :从队列中取出并返回一个元素。如果队列为空,且 blockTrue ,则阻塞直到有元素可取;如果设置了 timeout ,超时后将引发异常。

  • qsize () :返回队列中元素的数量。

  • empty () :如果队列为空,返回 True ;否则返回 False

  • full () :如果队列已满,返回 True ;否则返回 False

在队列读取与写入时,推荐先判断队列是否已空或已满,再进行操作,例如:

# 写入时,判断是否已满
if not q.full ():
   q.put_nowait ("message")
# 读取时,判断是否已空
if not q.empty ():
   for i in range (q.size ()):
      print (q.get_nowait ())

# FIFO

import queue
# 创建一个 FIFO 队列
fifo_queue = queue.Queue (maxsize=3)  # 最大长度为 3
# 添加元素
fifo_queue.put (1)
fifo_queue.put (2)
fifo_queue.put (3)
print (fifo_queue.qsize ())  # 输出:3
# 取出元素
print (fifo_queue.get ())  # 输出:1
print (fifo_queue.get ())  # 输出:2
print (fifo_queue.empty ())  # 输出:False
# 添加一个新元素
fifo_queue.put (4)
print (fifo_queue.get ())  # 输出:3
print (fifo_queue.get ())  # 输出:4
print (fifo_queue.empty ())  # 输出:True

# LIFO

LifoQueue 的用法与 Queue 类似,但遵循后进先出的原则。

import queue
# 创建一个 LIFO 队列
lifo_queue = queue.LifoQueue ()
# 添加元素
lifo_queue.put (1)
lifo_queue.put (2)
lifo_queue.put (3)
# 取出元素
print (lifo_queue.get ())  # 输出:3
print (lifo_queue.get ())  # 输出:2

# 优先队列

优先队列根据优先级(小的值优先)来处理元素。

import queue
# 创建一个优先队列
priority_queue = queue.PriorityQueue ()
# 添加元素,元素是元组 (优先级,值)
priority_queue.put ((2, 'task2'))
priority_queue.put ((1, 'task1'))
priority_queue.put ((3, 'task3'))
# 取出元素
print (priority_queue.get ())  # 输出:(1, 'task1')
print (priority_queue.get ())  # 输出: (2, 'task2')

# Queue 实现进程间通信