Threading

在 python 网络通信、爬虫等项目中,都需要使用并发编程。
本篇将讲解并发编程相关内容,包括线程、进程、携程的相关内容。
threading --- 基于线程的并行 — Python 3.12.2 官方文档
concurrent.futures --- 启动并行任务 — Python 3.12.2 官方文档
queue --- 一个同步的队列类 — Python 3.12.2 官方文档

进程和线程概述

先来了解一下进程和线程

  • 一个工厂,至少有一个车间,车间里至少有一个工人,最终是工人在工作。
  • 一个程序,至少有一个进程,一个进程中至少有一个线程,最终是线程在工作。

在运行.py 程序时,内部创建一个进程(主进程),在进程中创建了一个线程,由线程逐行运行代码。

线程 是计算机中可以被CPU调度的最小单元。
进程 是计算机资源分配的最小单元(进程为线程提供资源)。
一个进程中可以有多个线程,同一个进程中的线程可以共享此项进程中的资源。

以下示例程序将会执行ARP扫描,检测局域网内是否有IP开启了50000端口,在程序中,所有行为都只能通过串行的形式运行,逐一排队执行,前面未完成,后面也无法继续:

import subprocess import re import socket import time # 执行ARP扫描,获取局域网内所有活跃设备的IP地址 def arp_scan(): arp_result = subprocess.check_output(['arp', '-a']).decode('cp1252', errors='ignore') ip_list = re.findall(r'(\d+\.\d+\.\d+\.\d+)', arp_result) return ip_list # 尝试连接到指定IP的50000端口 def try_connect(ip): with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.settimeout(1) try: s.connect((ip, 50000)) return True except socket.error: return False # 执行ARP扫描,获取IP列表 ips = arp_scan() # 标记是否找到服务端 server_found = False # 遍历IP列表,尝试连接到50000端口 def scan_list(lst): for ip in ips: if try_connect(ip): print(f"服务端存在,IP地址为:{ip}") server_found = True break scan_list(lst=ips)

GIL 锁

GIL锁,全局解释器锁(Global Interpreter Lock),CPython所特有,让一个进程中同一时刻只能有一个线程可以被CPU调用
如果想利用计算机的多核心,让COU同时处理一些任务,适适合用多进程开发(资源开销大) 如果不需要利用计算机的多核优势,适合采用多进程开发

常见的程序开发中,计算操作需要使用CPU多核优势,IO操作不需要利用CPU多核优势

  • 计算密集型,用多进程,例如大量数据计算
  • IO密集型,用多线程,例如文件读写、网络数据传输

多线程

  • 线程 是独立的处理流程,可以和系统的其他线程并行或并发地执行。

  • 多线程可以共享数据和资源,利用所谓的共享内存空间。

  • 每一个线程基本上包含3个元素:程序计数器寄存器

  • 线程的状态大体上可以分为ready, running, blocked

  • 多线程编程一般使用共享内容空间进行线程间的通讯,这就使管理内容空间成为多线程编程的重点和难点。

  • 线程的典型应用是应用软件的并行化。

  • 相比于进程,使用线程的优势主要是 性能

threading 实现多线程

创建一个简单的多线程需要:

  • 1、创建一个线程列表,用于保留对所有线程的引用(非必须)threads = []
  • 2、需要定义一个线程 t = threading.Thread(target, args)target表示线程需要执行的函数,args表示函数需要传递的参数
  • 3、将线程对象假如到线程列表(非必须)threads.append(t)
  • 4、线程被创建后不会马上执行,需要手动调用 .start() 方法执行线程: t.start()
  • 5、阻塞调用 t 线程的主线程,t 线程执行结束,主线程才会继续执行: t.join()

多线程之间共享全局变量。

Class threading.Thread() 接收如下参数

class threading.Thread(group=None, ## 一般设置为 None ,这是为以后的一些特性预留的 target=None, ## 当线程启动的时候要执行的函数 name=None, ## 线程的名字,默认会分配一个唯一名字 Thread-N args=(), ## 传递给 target 的参数元组(整形和浮点型数据输入到args里)。 kwargs={}) ## 传递给 target 的关键字参数字典(字符串型数据输入到这里)。

常用线程方法

start():启动线程活动,当前线程准备就绪(等待CPU调度,具体时间由CPU决定)
run():定义线程的功能函数。
join([timeout]):等待子线程终止再继续执行主线程。timeout表示等待的最长时间。
is_alive():返回线程是否还活着。
threading.enumerate(): 查看当前正在运行的线程,返回一个包含线程对象的元素列表。
threading.current_thread().name: 获取当前线程名称 setDaemon(bool) : 设置守护线程(必须放在start之前)

  • t.setDaemon(True): 设置为守护前程,主线程执行完毕后,子线程也自动关闭
  • t.setDaemon(Flase): 设置为非守护前程,主线程等待子线程,子线程执行完毕后,主线程才结束(默认)

一个简单的调用示例如下:

import threading def function(i): print("function called by thread %i\n" % i) return #threads = [] for i in range(5): t = threading.Thread(target=function, args=(i,)) ## 用 function 函数初始化一个 Thread 对象 t,并将参数 i 传入; #threads.append(t) t.start() ## 线程被创建后不会马上执行,需要手动调用 .start() 方法执行线程 t.join() ## 阻塞调用 t 线程的主线程,t 线程执行结束,主线程才会继续执行

function 函数的输入只有一个 int 型数值,这里要注意的是,在使用 threading.Thread() 传参时,arg需要传入一个元组,所以输入的是 (i,) ,也就是说要加个逗号,。因为 type((i))<class 'int'>

函数传入参数同时包含浮点型和字符串型数值时

import threading # 定义一个线程函数,接受浮点型和字符串型参数 def calculate(data_float, data_string): result = data_float * 2 print(f"Thread result for {data_float}: {result}") print(f"Additional string data: {data_string}") # 创建多个线程并启动 threads = [] data_float = [1.5, 2.5, 3.5] # 浮点型数据 data_string = ["Hello", "World", "Python"] # 字符串型数据 for i in range(len(data_float)): thread = threading.Thread(target=calculate, args=(data_float[i], data_string[i])) threads.append(thread) thread.start() # 等待所有线程执行完成 for thread in threads: thread.join() print("All threads have finished execution.")

自定义线程类

除了直接实例化Thread类,我们还可通过自定义的方式创建线程,自定义类继承自threading.Thread
在自定义类中重写 run 方法,实例化后线程会执行run方法
自定义类的 args 使用self.args 访问

import threading class MyThread(threading.Thread): def run(self): print('执行线程', self.args) t = MyThread(args=(1,)) t.start()

线程池

线程池的是在程序启动时预先创建一定数量的线程并保存在内存中,等待任务的到来。当有新任务到来时,线程池会选择一个空闲的线程来执行任务,这样可以避免频繁地创建和销毁线程。当任务完成后,线程并不会立即被销毁,而是返回到线程池中等待下一个任务的到来。这种机制可以有效地减少系统开销,提高程序的性能。

线程池的使用

Python标准库中的concurrent.futures模块提供了线程池的实现。该模块包含一个Executor类,可以用于创建线程池。Executor类有两个子类:ThreadPoolExecutor和ProcessPoolExecutor

ThreadPoolExecutor用于创建线程池,而ProcessPoolExecutor用于创建进程池。

要使用线程池,首先需要导入concurrent.futures模块,然后创建一个Executor对象。接下来,可以使用Executor对象的submit()方法提交任务到线程池。submit()方法接受一个可调用对象和一个可选参数元组,以及任意数量的关键字参数。提交的任务可以是任何可调用对象,包括函数、类实例等。

下面是一个简单的示例代码,演示如何使用Python的线程池:

  • ThreadPoolExecutor(5)创建了一个包含5个线程的线程池。
  • task_function是一个简单的函数,它将输入值平方。
  • pool.submit(task_function,i)将任务提交给线程池,submit方法返回一个Future对象,可以用来获取任务的结果。
  • 通过future.result()获取每个任务的结果。
from concurrent.futures import ThreadPoolExecutor from unittest import result # 创建一个包含5个线程的线程池 pool = ThreadPoolExecutor(5) # 定义一个函数,该函数将被线程池中的线程执行 def task_function(x): return x * x # 使用线程池执行任务 results = [pool.submit(task_function, i) for i in range(10)] # 获取所有任务的结果 for future in results: print(future.result())

再来个例子思考体会一下:

  • 提交任务:使用executor.submit()方法将任务提交给线程池。submit()方法返回一个Future对象,该对象代表了正在执行的任务。我们使用字典futures来存储Future对象和对应的数字,以便在任务完成后可以找到原始的数字。
  • 获取结果:使用concurrent.futures.as_completed()函数来迭代已完成的Future对象。当一个任务完成时,我们调用future.result()来获取任务的结果。如果任务执行过程中抛出了异常,result()方法会重新抛出这个异常,我们可以在try…except语句中捕获它。
  • 处理异常:在try…except语句中,我们检查future.result()是否抛出异常。如果抛出异常,我们打印出异常信息;如果没有异常,我们打印出结果。
import math import concurrent.futures def calculate_factorial(number): """计算一个数的阶乘""" return math.factorial(number) def main(): numbers = [1, 2, 3, 4, 5] # 任务列表,计算这些数的阶乘 # 创建一个包含3个线程的线程池 with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: # 使用submit函数来提交任务 futures = {executor.submit(calculate_factorial, number): number for number in numbers} # 获取并打印结果 for future in concurrent.futures.as_completed(futures): number = futures[future] try: result = future.result() except Exception as exc: print(f"{number} generated an exception: {exc}") else: print(f"The factorial of {number} is {result}") if __name__ == "__main__": main()

使用map函数来提交任务并保持执行顺序

import math import concurrent.futures def calculate_factorial(number): """计算一个数的阶乘""" return math.factorial(number) def main(): numbers = [1, 2, 3, 4, 5] # 任务列表,计算这些数的阶乘 # 创建一个包含3个线程的线程池 with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: # 使用map函数来提交任务并保持执行顺序 # 使用executor.map()方法将任务提交给线程池。与executor.submit()不同,map()方法会等待所有任务完成,并返回一个结果列表,保持了输入列表的顺序 results = executor.map(calculate_factorial, numbers) # 打印结果 for number, result in zip(numbers, results): print(f"The factorial of {number} is {result}") if __name__ == "__main__": main()

线程池的工作流程

  • 当线程数小于核心线程数时,每提交一个任务就创建一个线程来执行,即使当前有线程处于空闲状态,直到当前线程数达到核心线程数
  • 当前线程数达到核心线程数时,如果这个时候还提交任务,这些任务会被放到工作队列里,等到线程处理完了手头的任务后,会从队列中取任务处理。
  • 当前线程数达到核心线程数并且工作队列也满了,如果这个时候还提交任务,则会继续创建线程来处理,直到线程数达到最大线程数。
  • 当前线程数达到最大线程数并且队列也满了,如果这个时候还提交任务,则会触发饱和策略,如抛出异常或拒绝执行任务。

线程安全

并发线程中,多个线程对共享内存进行操作,并且至少有一个可以改变数据。这种情况下如果没有同步机制,那么多个线程之间就会产生竞争,从而导致代码无效或出错。

互斥锁 Lock ()

解决多线程竞争问题的最简单的方法就是用锁 (Lock)。当一个线程需要访问共享内存时,它必须先获得 Lock 之后才能访问;当该线程对共享资源使用完成后,必须释放 Lock,然后其他线程在拿到 Lock 进行访问资源。因此,为了避免多线程竞争的出现,必须保证:同一时刻只能允许一个线程访问共享内存。

在实际使用中,该方法经常会导致一种 死锁 现象,原因是不同线程互相拿着对方需要的 Lock,导致死锁的发生(占有且等待)。

锁有两种状态: locked(被某一线程拿到)和unlocked(可用状态) 我们有两个方法来操作锁: acquire()release() 需要遵循以下规则:

  • 如果状态是unlocked, 可以调用 acquire() 将状态改为locked
  • 如果状态是locked, acquire() 会被block直到另一线程调用 release() 释放锁
  • 如果状态是unlocked, 调用 release() 将导致 RuntimError 异常
  • 如果状态是locked, 可以调用 release() 将状态改为unlocked
import threading shared_resource_with_lock = 0 shared_resource_with_no_lock = 0 COUNT = 100000 shared_resource_lock = threading.Lock() ## 锁 ## 有锁的情况 def increment_with_lock(): # shared_resource_with_lock 即最外面的 shared_resource_with_lock # 这样写就不需要再通过函数的参数引入 shared_resource_with_lock 了 global shared_resource_with_lock for _ in range(COUNT): shared_resource_lock.acquire() ## 获取 锁 shared_resource_with_lock += 1 shared_resource_lock.release() ## 释放 锁 def decrement_with_lock(): global shared_resource_with_lock for _ in range(COUNT): shared_resource_lock.acquire() shared_resource_with_lock -= 1 shared_resource_lock.release() ## 没有锁的情况 def increment_without_lock(): global shared_resource_with_no_lock for _ in range(COUNT): shared_resource_with_no_lock += 1 def decrement_without_lock(): global shared_resource_with_no_lock for _ in range(COUNT): shared_resource_with_no_lock -= 1 if __name__ == "__main__": t1 = threading.Thread(target=increment_with_lock) t2 = threading.Thread(target=decrement_with_lock) t3 = threading.Thread(target=increment_without_lock) t4 = threading.Thread(target=decrement_without_lock) ## 开启线程 t1.start() t2.start() t3.start() t4.start() ## .join() t1.join() t2.join() t3.join() t4.join() print ("the value of shared variable with lock management is %s" % shared_resource_with_lock) print ("the value of shared variable with race condition is %s" % shared_resource_with_no_lock)

代码运行结果如下

the value of shared variable with lock management is 0 the value of shared variable with race condition is 79714

尽管理论上用锁的策略可以避免多线程中的竞争问题,但是可能会对程序的其他方面产生负面影响。此外,锁的策略经常会导致不必要的开销,也会限制程序的可扩展性和可读性。更重要的是,有时候需要对多进程共享的内存分配优先级,使用锁可能和这种优先级冲突。从实践的经验来看,使用锁的应用将对debug带来不小的麻烦。所以,最好使用其他可选的方法确保同步读取共享内存,避免竞争条件。

递归锁 Rlock()

为了保证 “只有拿到锁的线程才能释放锁”,那么应该使用 RLock() 对象;

Lock() 一样,RLock()也有acquire()release()两种方法;

RLock() 有三个特点:

  • 谁拿到谁释放。如果线程A拿到锁,线程B无法释放这个锁,只有A可以释放;

  • 同一线程可以多次拿到该锁,即可以acquire多次;

  • acquire多少次就必须release多少次,只有最后一次release才能改变RLock的状态为unlocked;

import threading import time class Box(object): lock = threading.RLock() def __init__(self): self.total_items = 0 def execute(self, n): Box.lock.acquire() self.total_items += n Box.lock.release() def add(self): Box.lock.acquire() self.execute(1) Box.lock.release() def remove(self): Box.lock.acquire() self.execute(-1) Box.lock.release() def adder(box, items): while items > 0: print("adding 1 item in the box") box.add() time.sleep(1) items -= 1 def remover(box, items): while items > 0: print("removing 1 item in the box") box.remove() time.sleep(1) items -= 1 if __name__ == "__main__": items = 5 print("putting %s items in the box"% items) box = Box() t1 = threading.Thread(target=adder, args=(box, items)) t2 = threading.Thread(target=remover, args=(box, items)) t1.start() t2.start() t1.join() t2.join() print("%s items still remain in the box " % box.total_items)

代码运行结果如下:

putting 5 items in the box adding 1 item in the box removing 1 item in the box adding 1 item in the box removing 1 item in the box removing 1 item in the box adding 1 item in the box removing 1 item in the box adding 1 item in the box adding 1 item in the box removing 1 item in the box 0 items still remain in the box

Box类的execute()方法包含RLockadder()remover()方法也包含RLock,就是说无论是调用Box还是adder()或者remover(),每个线程的每一步都有拿到资源、释放资源的过程。

信号量 Semaphore()

信号量的定义: 信号量是一个内部数据,用于标明当前的共享资源可以有多少并发读取。

信号量是由操作系统管理的一种抽象数据类型,用于多线程中同步对共享资源的使用;

信号量是一个内部数据,用于表明当前共享资源可以有多少并发读取;

在 Threading 中,信号量的操作有两个函数:acquire()release();解释如下:

  • 每当线程想要读取关联了信号量的共享资源时,必须调用 acquire() ,此操作减少信号量的内部变量, 如果此变量的值非负,那么分配该资源的权限。如果是负值,那么线程被挂起,直到有其他的线程释放资源。
  • 当线程不再需要该共享资源,必须通过 release() 释放。这样,信号量的内部变量增加,在信号量等待队列中排在最前面的线程会拿到共享资源的权限。

虽然表面上看信号量机制没什么明显的问题,如果信号量的等待和通知操作都是原子的,确实没什么问题。但如果不是,或者两个操作有一个终止了,就会导致糟糕的情况。

  • 举个例子,假设有两个并发的线程,都在等待一个信号量,目前信号量的内部值为1。假设第线程A将信号量的值从1减到0,这时候控制权切换到了线程B,线程B将信号量的值从0减到-1,并且在这里被挂起等待,这时控制权回到线程A,信号量已经成为了负值,于是第一个线程也在等待。

  • 这样的话,尽管当时的信号量是可以让线程访问资源的,但是因为非原子操作导致了所有的线程都在等待状态。

注:“原子”指的是原子操作,即一个不可分割的操作。在多线程编程中,如果对信号量的等待和通知操作是原子的,意味着它们是以不可分割的方式执行的,其他线程无法在这些操作中插入。这样可以确保在多线程环境中,对信号量的操作是可靠的。

Semaphore()详解: threading.Semaphore() 可以创建一个信号量对象,它可以控制对共享资源的访问数量。在创建信号量对象时,可以指定初始的许可数量。每次访问资源时,线程需要获取一个许可;当许可数量不足时,线程将会被阻塞,直到有足够的许可可用。访问资源完成后,线程释放许可,使得其他线程可以继续访问资源。

threading.Semaphore(num): num表示初始的许可数量(比如这个数量为1)

下面的代码展示了信号量的使用,我们有两个线程, producer()consumer() ,它们使用共同的资源,即itemproducer() 的任务是生产itemconsumer() 的任务是消费item

item还没有被生产出来, consumer() 一直等待,当item生产出来, producer() 线程通知消费者资源可以使用了。

import threading import time import random # 创建一个信号量semaphore,初始值为0。 # 信号量是一种同步机制,用于控制对共享资源的访问。 semaphore = threading.Semaphore(0) print("init semaphore %s" % semaphore._value) # 打印初始信号量的值。 # 消费者线程将执行的函数。 def consumer(): print("consumer is waiting.") # 打印信息,表明消费者正在等待。 semaphore.acquire() # 消费者尝试获取信号量,如果信号量的值小于1,则等待。 print("consumer notify: consumed item number %s" % item) # 打印消费者消费的项目编号。 print("consumer semaphore %s" % semaphore._value) # 在消费后打印信号量的当前值。 # 生产者线程将执行的函数。 def producer(): global item # 声明item为全局变量,以便在函数内部修改。 time.sleep(10) # 生产者线程暂停10秒,模拟生产过程耗时。 item = random.randint(0, 1000) # 生产者生成一个随机的项目编号。 print("producer notify : produced item number %s" % item) # 打印生产者生产的产品编号。 semaphore.release() # 生产者释放信号量,增加信号量的值,允许其他等待的线程继续执行。 print("producer semaphore %s" % semaphore._value) # 在生产后打印信号量的当前值。 # 主程序入口。 if __name__ == "__main__": for _ in range(0, 5): # 循环5次,模拟生产和消费过程。 t1 = threading.Thread(target=producer) # 创建生产者线程。 t2 = threading.Thread(target=consumer) # 创建消费者线程。 t1.start() # 启动生产者线程。 t2.start() # 启动消费者线程。 t1.join() # 等待生产者线程完成。 t2.join() # 等待消费者线程完成。 print("program terminated") # 打印程序结束的信息。

多进程

进程的概念

程序:静态的,例如XX.py ,这是一个程序 进程:程序运行起来以后,代码+用到的资源 称之为进程,它是操作系统分配资源的基本单元。

进程的状态: 工作中,任务数往往大于CPU核数,即有一些任务正在执行,而另一些任务在等待CPU进行执行,因此导致了有不同的状态
新建、就绪、运行、等待(阻塞)、死亡

  • 就绪态:满足运行条件,可以被CPU调度执行
  • 执行态:CPU正在执行其功能
  • 阻塞态:等待某些条件满足,例如IO操作

multiprocessing 实现多进程

进程是通过 multiprocessing.Process() 来实现,执行start()后,子进程会将主进程的代码+资源复制一份。

class Process( group: None = None, target: ((...) -> object) | None = None, name: str | None = None, args: Iterable[Any] = (), kwargs: Mapping[str, Any] = {}, *, daemon: bool | None = None )

multiprocessing 实现多进程

实现多进程的方式与多线程类似,多进程对象需要通过 multiprocessing.Process() 来创建

常用进程方法

start():启动子进程实例(创建子进程) is_alive():判断子进程是否还在活着 join([timeout]):是否等待子进程执行结束,或等待多少秒 terminate():不管任务是否完成,立即终止子进程用

以下是一个简单的多进程示例,运行代码后,process和main几乎同时输出

import time import multiprocessing def fun_process(t): while True: print('-process-') time.sleep(t) if __name__ == '__main__': p = multiprocessing.Process(target=fun_process,args=(1,)) p.start() # 主进程单独执行的代码 while True: print("-main-") time.sleep(1)

在windows系统上写多进程时,需要将进程创建写在 if __name__ == '__main__': 中 Linux 创建进程基于 fork,windows基于spawn,mac都支持,但在python3.8之后,默认使用spawn

进程之间的通信

同一个进程内的多个线程之间共享内存,而多个进程在执行过程中拥有独立的内存单元,因此进程之间默认无法进行资源共享,操作系统提供了很多机制实现进程间通信,例如:队列(Queue)、socket

队列( Queue )

队列(Queue)是python 中一种非常重要的数据结构,但并不是python的数据类型,队列遵循先进先出(FIFO)的原则,即先插入的元素先被移除,主要的用途是支持线程安全和高效的队列操作。

  • 队列是一种特殊的线性数据结构;
  • 其中元素的插入和删除操作仅能在队列的两端进行,允许在一端插入数据,在另一端删除数据。
  • 通常用于管理任务、缓冲数据以及实现并发编程。

特征:

  • 先进先出(FIFO):最先添加到队列的元素最先被移除;
  • 线程安全:queue 模块中的队列是线程安全的,可以在多线程环境中安全使用;
  • 支持多种类型:queue 模块提供了多种类型的队列,如 FIFO 队列、优先队列和后进先出(LIFO)队列;

创建队列

FIFO(先进先出)、LIFO(后进先出)、优先队列的创建示例如下:

import queue # 创建一个 FIFO 队列 fifo_queue = queue.Queue() # 创建一个 LIFO 队列 lifo_queue = queue.LifoQueue() # 创建一个优先队列 priority_queue = queue.PriorityQueue()

队列操作

常用的队列操作方法有:

  • put(item, block=True, timeout=None):将元素 item 放入队列。如果 blockTrue,且队列已满,则阻塞直到有空位;如果 timeout 设置,超时后将引发异常。

  • put_nowait(item): 相当于put(item, block=False),如果队列已满,put_nowait() 方法将不会阻塞,而是立即引发 queue.Full 异常。

  • get(block=True, timeout=None):从队列中取出并返回一个元素。如果队列为空,且 blockTrue,则阻塞直到有元素可取;如果设置了 timeout,超时后将引发异常。

  • qsize():返回队列中元素的数量。

  • empty():如果队列为空,返回 True;否则返回 False

  • full():如果队列已满,返回 True;否则返回 False

在队列读取与写入时,推荐先判断队列是否已空或已满,再进行操作,例如:

# 写入时,判断是否已满 if not q.full(): q.put_nowait("message") # 读取时,判断是否已空 if not q.empty(): for i in range(q.size()): print(q.get_nowait())

FIFO

import queue # 创建一个 FIFO 队列 fifo_queue = queue.Queue(maxsize=3) # 最大长度为 3 # 添加元素 fifo_queue.put(1) fifo_queue.put(2) fifo_queue.put(3) print(fifo_queue.qsize()) # 输出:3 # 取出元素 print(fifo_queue.get()) # 输出:1 print(fifo_queue.get()) # 输出:2 print(fifo_queue.empty()) # 输出:False # 添加一个新元素 fifo_queue.put(4) print(fifo_queue.get()) # 输出:3 print(fifo_queue.get()) # 输出:4 print(fifo_queue.empty()) # 输出:True

LIFO

LifoQueue 的用法与 Queue 类似,但遵循后进先出的原则。

import queue # 创建一个 LIFO 队列 lifo_queue = queue.LifoQueue() # 添加元素 lifo_queue.put(1) lifo_queue.put(2) lifo_queue.put(3) # 取出元素 print(lifo_queue.get()) # 输出:3 print(lifo_queue.get()) # 输出:2

优先队列

优先队列根据优先级(小的值优先)来处理元素。

import queue # 创建一个优先队列 priority_queue = queue.PriorityQueue() # 添加元素,元素是元组 (优先级, 值) priority_queue.put((2, 'task2')) priority_queue.put((1, 'task1')) priority_queue.put((3, 'task3')) # 取出元素 print(priority_queue.get()) # 输出:(1, 'task1') print(priority_queue.get()) # 输出: (2, 'task2')

Queue 实现进程间通信