在 python 网络通信、爬虫等项目中,都需要使用并发编程。
本篇将讲解并发编程相关内容,包括线程、进程、携程的相关内容。
threading --- 基于线程的并行 — Python 3.12.2 官方文档
concurrent.futures --- 启动并行任务 — Python 3.12.2 官方文档
queue --- 一个同步的队列类 — Python 3.12.2 官方文档
# 进程和线程概述
先来了解一下进程和线程
- 一个工厂,至少有一个车间,车间里至少有一个工人,最终是工人在工作。
- 一个程序,至少有一个进程,一个进程中至少有一个线程,最终是线程在工作。
在运行.py 程序时,内部创建一个进程(主进程),在进程中创建了一个线程,由线程逐行运行代码。
线程
是计算机中可以被 CPU 调度的最小单元。进程
是计算机资源分配的最小单元(进程为线程提供资源)。
一个进程中可以有多个线程,同一个进程中的线程可以共享此项进程中的资源。
以下示例程序将会执行 ARP 扫描,检测局域网内是否有 IP 开启了 50000 端口,在程序中,所有行为都只能通过串行的形式运行,逐一排队执行,前面未完成,后面也无法继续:
import subprocess | |
import re | |
import socket | |
import time | |
# 执行 ARP 扫描,获取局域网内所有活跃设备的 IP 地址 | |
def arp_scan (): | |
arp_result = subprocess.check_output (['arp', '-a']).decode ('cp1252', errors='ignore') | |
ip_list = re.findall (r'(\d+\.\d+\.\d+\.\d+)', arp_result) | |
return ip_list | |
# 尝试连接到指定 IP 的 50000 端口 | |
def try_connect (ip): | |
with socket.socket (socket.AF_INET, socket.SOCK_STREAM) as s: | |
s.settimeout (1) | |
try: | |
s.connect ((ip, 50000)) | |
return True | |
except socket.error: | |
return False | |
# 执行 ARP 扫描,获取 IP 列表 | |
ips = arp_scan () | |
# 标记是否找到服务端 | |
server_found = False | |
# 遍历 IP 列表,尝试连接到 50000 端口 | |
def scan_list (lst): | |
for ip in ips: | |
if try_connect (ip): | |
print (f"服务端存在,IP 地址为:{ip}") | |
server_found = True | |
break | |
scan_list (lst=ips) |
# GIL 锁
GIL 锁,全局解释器锁(Global Interpreter Lock),CPython 所特有,让一个进程中同一时刻只能有一个线程可以被 CPU 调用
如果想利用计算机的多核心,让 COU 同时处理一些任务,适适合用多进程开发(资源开销大)
如果不需要利用计算机的多核优势,适合采用多进程开发
常见的程序开发中,计算操作需要使用 CPU 多核优势,IO 操作不需要利用 CPU 多核优势
- 计算密集型,用多进程,例如大量数据计算
- IO 密集型,用多线程,例如文件读写、网络数据传输
# 多线程
线程
是独立的处理流程,可以和系统的其他线程并行或并发地执行。多线程可以共享数据和资源,利用所谓的共享内存空间。
每一个线程基本上包含 3 个元素:
程序计数器
、寄存器
、栈
。线程的状态大体上可以分为
ready
,running
,blocked
。多线程编程一般使用共享内容空间进行线程间的通讯,这就使管理内容空间成为多线程编程的重点和难点。
线程的典型应用是应用软件的并行化。
相比于进程,使用线程的优势主要是
性能
。
# threading 实现多线程
创建一个简单的多线程需要:
- 1、创建一个线程列表,用于保留对所有线程的引用(非必须)
threads = []
- 2、需要定义一个线程
t = threading.Thread (target, args)
,target
表示线程需要执行的函数,args 表示函数需要传递的参数 - 3、将线程对象假如到线程列表(非必须)
threads.append (t)
- 4、线程被创建后不会马上执行,需要手动调用 .start () 方法执行线程:
t.start ()
- 5、阻塞调用 t 线程的主线程,t 线程执行结束,主线程才会继续执行:
t.join ()
多线程之间共享全局变量。
Class threading.Thread ()
接收如下参数
class threading.Thread (group=None, ## 一般设置为 None ,这是为以后的一些特性预留的 | |
target=None, ## 当线程启动的时候要执行的函数 | |
name=None, ## 线程的名字,默认会分配一个唯一名字 Thread-N | |
args=(), ## 传递给 target 的参数元组(整形和浮点型数据输入到 args 里)。 | |
kwargs={}) ## 传递给 target 的关键字参数字典(字符串型数据输入到这里)。 |
# 常用线程方法
start ()
:启动线程活动,当前线程准备就绪(等待 CPU 调度,具体时间由 CPU 决定)run ()
:定义线程的功能函数。join ([timeout])
:等待子线程终止再继续执行主线程。timeout 表示等待的最长时间。is_alive ()
:返回线程是否还活着。threading.enumerate ()
: 查看 当前
正在运行的线程,返回一个包含线程对象的元素列表。threading.current_thread ().name
: 获取当前线程名称setDaemon (bool)
: 设置守护线程(必须放在 start 之前)
t.setDaemon (True)
: 设置为守护前程,主线程执行完毕后,子线程也自动关闭t.setDaemon (Flase)
: 设置为非守护前程,主线程等待子线程,子线程执行完毕后,主线程才结束(默认)
一个简单的调用示例如下:
import threading | |
def function (i): | |
print ("function called by thread % i\n" % i) | |
return | |
#threads = [] | |
for i in range (5): | |
t = threading.Thread (target=function, args=(i,)) ## 用 function 函数初始化一个 Thread 对象 t,并将参数 i 传入; | |
#threads.append (t) | |
t.start () ## 线程被创建后不会马上执行,需要手动调用 .start () 方法执行线程 | |
t.join () ## 阻塞调用 t 线程的主线程,t 线程执行结束,主线程才会继续执行 |
function
函数的输入只有一个int
型数值,这里要注意的是,在使用threading.Thread ()
传参时,arg
需要传入一个元组,所以输入的是(i,)
,也就是说要加个逗号,。因为type ((i))
是<class 'int'>
。
函数传入参数同时包含浮点型和字符串型数值时
import threading | |
# 定义一个线程函数,接受浮点型和字符串型参数 | |
def calculate (data_float, data_string): | |
result = data_float * 2 | |
print (f"Thread result for {data_float}: {result}") | |
print (f"Additional string data: {data_string}") | |
# 创建多个线程并启动 | |
threads = [] | |
data_float = [1.5, 2.5, 3.5] # 浮点型数据 | |
data_string = ["Hello", "World", "Python"] # 字符串型数据 | |
for i in range (len (data_float)): | |
thread = threading.Thread (target=calculate, args=(data_float [i], data_string [i])) | |
threads.append (thread) | |
thread.start () | |
# 等待所有线程执行完成 | |
for thread in threads: | |
thread.join () | |
print ("All threads have finished execution.") |
# 自定义线程类
除了直接实例化 Thread 类,我们还可通过自定义的方式创建线程,自定义类继承自 threading.Thread
在自定义类中重写 run
方法,实例化后线程会执行 run 方法
自定义类的 args
使用 self.args
访问
import threading | |
class MyThread (threading.Thread): | |
def run (self): | |
print (' 执行线程 ', self.args) | |
t = MyThread (args=(1,)) | |
t.start () |
# 线程池
线程池的是在程序启动时预先创建一定数量的线程并保存在内存中,等待任务的到来。当有新任务到来时,线程池会选择一个空闲的线程来执行任务,这样可以避免频繁地创建和销毁线程。当任务完成后,线程并不会立即被销毁,而是返回到线程池中等待下一个任务的到来。这种机制可以有效地减少系统开销,提高程序的性能。
# 线程池的使用
Python
标准库中的 concurrent.futures
模块提供了线程池的实现。该模块包含一个 Executor
类,可以用于创建线程池。 Executor
类有两个子类: ThreadPoolExecutor
和 P rocessPoolExecutor
。
ThreadPoolExecutor
用于创建线程池,而 ProcessPoolExecutor
用于创建进程池。
要使用线程池,首先需要导入 concurrent.futures
模块,然后创建一个 Executor
对象。接下来,可以使用 Executor
对象的 submit ()
方法提交任务到线程池。 submit ()
方法接受一个可调用对象和一个可选参数元组,以及任意数量的关键字参数。提交的任务可以是任何可调用对象,包括函数、类实例等。
下面是一个简单的示例代码,演示如何使用 Python 的线程池:
ThreadPoolExecutor (5)
创建了一个包含 5 个线程的线程池。task_function
是一个简单的函数,它将输入值平方。pool.submit (task_function,i)
将任务提交给线程池,submit 方法返回一个Future
对象,可以用来获取任务的结果。- 通过
future.result ()
获取每个任务的结果。
from concurrent.futures import ThreadPoolExecutor | |
from unittest import result | |
# 创建一个包含 5 个线程的线程池 | |
pool = ThreadPoolExecutor (5) | |
# 定义一个函数,该函数将被线程池中的线程执行 | |
def task_function (x): | |
return x * x | |
# 使用线程池执行任务 | |
results = [pool.submit (task_function, i) for i in range (10)] | |
# 获取所有任务的结果 | |
for future in results: | |
print (future.result ()) |
再来个例子思考体会一下:
- 提交任务:使用
executor.submit ()
方法将任务提交给线程池。submit ()
方法返回一个Future
对象,该对象代表了正在执行的任务。我们使用字典futures
来存储Future
对象和对应的数字,以便在任务完成后可以找到原始的数字。 - 获取结果:使用
concurrent.futures.as_completed ()
函数来迭代已完成的Future
对象。当一个任务完成时,我们调用future.result ()
来获取任务的结果。如果任务执行过程中抛出了异常,result ()
方法会重新抛出这个异常,我们可以在try…except
语句中捕获它。 - 处理异常:在
try…except
语句中,我们检查future.result ()
是否抛出异常。如果抛出异常,我们打印出异常信息;如果没有异常,我们打印出结果。
import math | |
import concurrent.futures | |
def calculate_factorial (number): | |
"""计算一个数的阶乘""" | |
return math.factorial (number) | |
def main (): | |
numbers = [1, 2, 3, 4, 5] # 任务列表,计算这些数的阶乘 | |
# 创建一个包含 3 个线程的线程池 | |
with concurrent.futures.ThreadPoolExecutor (max_workers=3) as executor: | |
# 使用 submit 函数来提交任务 | |
futures = {executor.submit (calculate_factorial, number): number for number in numbers} | |
# 获取并打印结果 | |
for future in concurrent.futures.as_completed (futures): | |
number = futures [future] | |
try: | |
result = future.result () | |
except Exception as exc: | |
print (f"{number} generated an exception: {exc}") | |
else: | |
print (f"The factorial of {number} is {result}") | |
if __name__ == "__main__": | |
main () |
使用 map 函数来提交任务并保持执行顺序
import math | |
import concurrent.futures | |
def calculate_factorial (number): | |
"""计算一个数的阶乘""" | |
return math.factorial (number) | |
def main (): | |
numbers = [1, 2, 3, 4, 5] # 任务列表,计算这些数的阶乘 | |
# 创建一个包含 3 个线程的线程池 | |
with concurrent.futures.ThreadPoolExecutor (max_workers=3) as executor: | |
# 使用 map 函数来提交任务并保持执行顺序 | |
# 使用 executor.map () 方法将任务提交给线程池。与 executor.submit () 不同,map () 方法会等待所有任务完成,并返回一个结果列表,保持了输入列表的顺序 | |
results = executor.map (calculate_factorial, numbers) | |
# 打印结果 | |
for number, result in zip (numbers, results): | |
print (f"The factorial of {number} is {result}") | |
if __name__ == "__main__": | |
main () |
# 线程池的工作流程
- 当线程数小于核心线程数时,每提交一个任务就创建一个线程来执行,即使当前有线程处于空闲状态,直到当前线程数达到核心线程数
- 当前线程数达到核心线程数时,如果这个时候还提交任务,这些任务会被放到工作队列里,等到线程处理完了手头的任务后,会从队列中取任务处理。
- 当前线程数达到核心线程数并且工作队列也满了,如果这个时候还提交任务,则会继续创建线程来处理,直到线程数达到最大线程数。
- 当前线程数达到最大线程数并且队列也满了,如果这个时候还提交任务,则会触发饱和策略,如抛出异常或拒绝执行任务。
# 线程安全
并发线程中,多个线程对共享内存进行操作,并且至少有一个可以改变数据。这种情况下如果没有同步机制,那么多个线程之间就会产生竞争,从而导致代码无效或出错。
# 互斥锁 Lock ()
解决多线程竞争问题的最简单的方法就是用 锁 (Lock)
。当一个线程需要访问共享内存时,它必须先获得 Lock
之后才能访问;当该线程对共享资源使用完成后,必须释放 Lock
,然后其他线程在拿到 Lock
进行访问资源。因此,为了避免多线程竞争的出现,必须保证:同一时刻只能允许一个线程访问共享内存。
在实际使用中,该方法经常会导致一种 死锁
现象,原因是不同线程互相拿着对方需要的 Lock
,导致死锁的发生(占有且等待)。
锁有两种状态: locked
(被某一线程拿到)和 unlocked
(可用状态)
我们有两个方法来操作锁: acquire ()
和 release ()
需要遵循以下规则:
- 如果状态是 unlocked, 可以调用 acquire () 将状态改为 locked
- 如果状态是 locked, acquire () 会被 block 直到另一线程调用 release () 释放锁
- 如果状态是 unlocked, 调用 release () 将导致 RuntimError 异常
- 如果状态是 locked, 可以调用 release () 将状态改为 unlocked
import threading | |
shared_resource_with_lock = 0 | |
shared_resource_with_no_lock = 0 | |
COUNT = 100000 | |
shared_resource_lock = threading.Lock () ## 锁 | |
## 有锁的情况 | |
def increment_with_lock (): | |
# shared_resource_with_lock 即最外面的 shared_resource_with_lock | |
# 这样写就不需要再通过函数的参数引入 shared_resource_with_lock 了 | |
global shared_resource_with_lock | |
for _ in range (COUNT): | |
shared_resource_lock.acquire () ## 获取 锁 | |
shared_resource_with_lock += 1 | |
shared_resource_lock.release () ## 释放 锁 | |
def decrement_with_lock (): | |
global shared_resource_with_lock | |
for _ in range (COUNT): | |
shared_resource_lock.acquire () | |
shared_resource_with_lock -= 1 | |
shared_resource_lock.release () | |
## 没有锁的情况 | |
def increment_without_lock (): | |
global shared_resource_with_no_lock | |
for _ in range (COUNT): | |
shared_resource_with_no_lock += 1 | |
def decrement_without_lock (): | |
global shared_resource_with_no_lock | |
for _ in range (COUNT): | |
shared_resource_with_no_lock -= 1 | |
if __name__ == "__main__": | |
t1 = threading.Thread (target=increment_with_lock) | |
t2 = threading.Thread (target=decrement_with_lock) | |
t3 = threading.Thread (target=increment_without_lock) | |
t4 = threading.Thread (target=decrement_without_lock) | |
## 开启线程 | |
t1.start () | |
t2.start () | |
t3.start () | |
t4.start () | |
## .join () | |
t1.join () | |
t2.join () | |
t3.join () | |
t4.join () | |
print ("the value of shared variable with lock management is % s" % shared_resource_with_lock) | |
print ("the value of shared variable with race condition is % s" % shared_resource_with_no_lock) |
代码运行结果如下
the value of shared variable with lock management is 0
the value of shared variable with race condition is 79714
尽管理论上用锁的策略可以避免多线程中的竞争问题,但是可能会对程序的其他方面产生负面影响。此外,锁的策略经常会导致不必要的开销,也会限制程序的可扩展性和可读性。更重要的是,有时候需要对多进程共享的内存分配优先级,使用锁可能和这种优先级冲突。从实践的经验来看,使用锁的应用将对 debug 带来不小的麻烦。所以,最好使用其他可选的方法确保同步读取共享内存,避免竞争条件。
# 递归锁 Rlock ()
为了保证 “只有拿到锁的线程才能释放锁”,那么应该使用 RLock ()
对象;
和 Lock ()
一样, RLock ()
也有 acquire ()
和 release ()
两种方法;
RLock ()
有三个特点:
谁拿到谁释放。如果线程 A 拿到锁,线程 B 无法释放这个锁,只有 A 可以释放;
同一线程可以多次拿到该锁,即可以 acquire 多次;
acquire 多少次就必须 release 多少次,只有最后一次 release 才能改变 RLock 的状态为 unlocked;
import threading | |
import time | |
class Box (object): | |
lock = threading.RLock () | |
def __init__(self): | |
self.total_items = 0 | |
def execute (self, n): | |
Box.lock.acquire () | |
self.total_items += n | |
Box.lock.release () | |
def add (self): | |
Box.lock.acquire () | |
self.execute (1) | |
Box.lock.release () | |
def remove (self): | |
Box.lock.acquire () | |
self.execute (-1) | |
Box.lock.release () | |
def adder (box, items): | |
while items > 0: | |
print ("adding 1 item in the box") | |
box.add () | |
time.sleep (1) | |
items -= 1 | |
def remover (box, items): | |
while items > 0: | |
print ("removing 1 item in the box") | |
box.remove () | |
time.sleep (1) | |
items -= 1 | |
if __name__ == "__main__": | |
items = 5 | |
print ("putting % s items in the box"% items) | |
box = Box () | |
t1 = threading.Thread (target=adder, args=(box, items)) | |
t2 = threading.Thread (target=remover, args=(box, items)) | |
t1.start () | |
t2.start () | |
t1.join () | |
t2.join () | |
print ("% s items still remain in the box" % box.total_items) |
代码运行结果如下:
putting 5 items in the box
adding 1 item in the box
removing 1 item in the box
adding 1 item in the box
removing 1 item in the box
removing 1 item in the box
adding 1 item in the box
removing 1 item in the box
adding 1 item in the box
adding 1 item in the box
removing 1 item in the box
0 items still remain in the box
Box
类的execute ()
方法包含RLock
,adder ()
和remover ()
方法也包含RLock
,就是说无论是调用Box
还是adder ()
或者remover ()
,每个线程的每一步都有拿到资源、释放资源的过程。
# 信号量 Semaphore ()
信号量的定义: 信号量是一个内部数据,用于标明当前的共享资源可以有多少并发读取。
信号量是由操作系统管理的一种抽象数据类型,用于多线程中同步对共享资源的使用;
信号量是一个内部数据,用于表明当前共享资源可以有多少并发读取;
在 Threading 中,信号量的操作有两个函数: acquire ()
和 release ()
;解释如下:
- 每当线程想要读取关联了信号量的共享资源时,必须调用
acquire ()
,此操作减少信号量的内部变量,如果此变量的值非负,那么分配该资源的权限。如果是负值,那么线程被挂起,直到有其他的线程释放资源。 - 当线程不再需要该共享资源,必须通过
release ()
释放。这样,信号量的内部变量增加,在信号量等待队列中排在最前面的线程会拿到共享资源的权限。
虽然表面上看信号量机制没什么明显的问题,如果信号量的等待和通知操作都是原子的,确实没什么问题。但如果不是,或者两个操作有一个终止了,就会导致糟糕的情况。
举个例子,假设有两个并发的线程,都在等待一个信号量,目前信号量的内部值为 1。假设第线程 A 将信号量的值从 1 减到 0,这时候控制权切换到了线程 B,线程 B 将信号量的值从 0 减到 - 1,并且在这里被挂起等待,这时控制权回到线程 A,信号量已经成为了负值,于是第一个线程也在等待。
这样的话,尽管当时的信号量是可以让线程访问资源的,但是因为非原子操作导致了所有的线程都在等待状态。
注:"原子" 指的是原子操作,即一个不可分割的操作。在多线程编程中,如果对信号量的等待和通知操作是原子的,意味着它们是以不可分割的方式执行的,其他线程无法在这些操作中插入。这样可以确保在多线程环境中,对信号量的操作是可靠的。
Semaphore ()
详解:threading.Semaphore ()
可以创建一个信号量对象,它可以控制对共享资源的访问数量。在创建信号量对象时,可以指定初始的许可数量。每次访问资源时,线程需要获取一个许可;当许可数量不足时,线程将会被阻塞,直到有足够的许可可用。访问资源完成后,线程释放许可,使得其他线程可以继续访问资源。
threading.Semaphore (num)
: num 表示初始的许可数量(比如这个数量为 1)
下面的代码展示了信号量的使用,我们有两个线程, producer ()
和 consumer ()
,它们使用共同的资源,即 item
。 producer ()
的任务是生产 item
, consumer ()
的任务是消费 item
。
当 item
还没有被生产出来, consumer ()
一直等待,当 item
生产出来, producer ()
线程通知消费者资源可以使用了。
import threading | |
import time | |
import random | |
# 创建一个信号量 semaphore,初始值为 0。 | |
# 信号量是一种同步机制,用于控制对共享资源的访问。 | |
semaphore = threading.Semaphore (0) | |
print ("init semaphore % s" % semaphore._value) # 打印初始信号量的值。 | |
# 消费者线程将执行的函数。 | |
def consumer (): | |
print ("consumer is waiting.") # 打印信息,表明消费者正在等待。 | |
semaphore.acquire () # 消费者尝试获取信号量,如果信号量的值小于 1,则等待。 | |
print ("consumer notify: consumed item number % s" % item) # 打印消费者消费的项目编号。 | |
print ("consumer semaphore % s" % semaphore._value) # 在消费后打印信号量的当前值。 | |
# 生产者线程将执行的函数。 | |
def producer (): | |
global item # 声明 item 为全局变量,以便在函数内部修改。 | |
time.sleep (10) # 生产者线程暂停 10 秒,模拟生产过程耗时。 | |
item = random.randint (0, 1000) # 生产者生成一个随机的项目编号。 | |
print ("producer notify : produced item number % s" % item) # 打印生产者生产的产品编号。 | |
semaphore.release () # 生产者释放信号量,增加信号量的值,允许其他等待的线程继续执行。 | |
print ("producer semaphore % s" % semaphore._value) # 在生产后打印信号量的当前值。 | |
# 主程序入口。 | |
if __name__ == "__main__": | |
for _ in range (0, 5): # 循环 5 次,模拟生产和消费过程。 | |
t1 = threading.Thread (target=producer) # 创建生产者线程。 | |
t2 = threading.Thread (target=consumer) # 创建消费者线程。 | |
t1.start () # 启动生产者线程。 | |
t2.start () # 启动消费者线程。 | |
t1.join () # 等待生产者线程完成。 | |
t2.join () # 等待消费者线程完成。 | |
print ("program terminated") # 打印程序结束的信息。 |
# 多进程
# 进程的概念
程序:静态的,例如 XX.py , 这是一个程序
进程:程序运行起来以后,代码 + 用到的资源 称之为进程,它是操作系统分配资源的基本单元。
进程的状态:
工作中,任务数往往大于 CPU 核数,即有一些任务正在执行,而另一些任务在等待 CPU 进行执行,因此导致了有不同的状态
新建、就绪、运行、等待(阻塞)、死亡
- 就绪态:满足运行条件,可以被 CPU 调度执行
- 执行态:CPU 正在执行其功能
- 阻塞态:等待某些条件满足,例如 IO 操作
# multiprocessing 实现多进程
进程是通过 multiprocessing.Process ()
来实现,执行 start ()
后,子进程会将主进程的代码 + 资源复制一份。
class Process ( | |
group: None = None, | |
target: ((...) -> object) | None = None, | |
name: str | None = None, | |
args: Iterable [Any] = (), | |
kwargs: Mapping [str, Any] = {}, | |
*, | |
daemon: bool | None = None | |
) |
# multiprocessing 实现多进程
实现多进程的方式与多线程类似,多进程对象需要通过 multiprocessing.Process ()
来创建
常用进程方法
start ():启动子进程实例(创建子进程) | |
is_alive ():判断子进程是否还在活着 | |
join ([timeout]):是否等待子进程执行结束,或等待多少秒 | |
terminate ():不管任务是否完成,立即终止子进程用 |
以下是一个简单的多进程示例,运行代码后,process 和 main 几乎同时输出
import time | |
import multiprocessing | |
def fun_process (t): | |
while True: | |
print ('-process-') | |
time.sleep (t) | |
if __name__ == '__main__': | |
p = multiprocessing.Process (target=fun_process,args=(1,)) | |
p.start () | |
# 主进程单独执行的代码 | |
while True: | |
print ("-main-") | |
time.sleep (1) |
在 windows 系统上写多进程时,需要将进程创建写在
if __name__ == '__main__':
中
Linux 创建进程基于 fork,windows 基于 spawn,mac 都支持,但在 python3.8 之后,默认使用 spawn
# 进程之间的通信
同一个进程内的多个线程之间共享内存,而多个进程在执行过程中拥有独立的内存单元,因此进程之间默认无法进行资源共享,操作系统提供了很多机制实现进程间通信,例如:队列(Queue)、socket
# 队列 (Queue)
队列(Queue)是 python 中一种非常重要的数据结构,但并不是 python 的数据类型,队列遵循先进先出(FIFO)的原则,即先插入的元素先被移除,主要的用途是支持线程安全和高效的队列操作。
- 队列是一种特殊的线性数据结构;
- 其中元素的插入和删除操作仅能在队列的两端进行,允许在一端插入数据,在另一端删除数据。
- 通常用于管理任务、缓冲数据以及实现并发编程。
特征:
- 先进先出(FIFO):最先添加到队列的元素最先被移除;
- 线程安全:queue 模块中的队列是线程安全的,可以在多线程环境中安全使用;
- 支持多种类型:queue 模块提供了多种类型的队列,如 FIFO 队列、优先队列和后进先出(LIFO)队列;
# 创建队列
FIFO(先进先出)、LIFO(后进先出)、优先队列的创建示例如下:
import queue | |
# 创建一个 FIFO 队列 | |
fifo_queue = queue.Queue () | |
# 创建一个 LIFO 队列 | |
lifo_queue = queue.LifoQueue () | |
# 创建一个优先队列 | |
priority_queue = queue.PriorityQueue () |
# 队列操作
常用的队列操作方法有:
put (item, block=True, timeout=None)
:将元素item
放入队列。如果block
为True
,且队列已满,则阻塞直到有空位;如果timeout
设置,超时后将引发异常。put_nowait (item)
: 相当于put (item, block=False)
,如果队列已满,put_nowait ()
方法将不会阻塞,而是立即引发queue.Full
异常。get (block=True, timeout=None)
:从队列中取出并返回一个元素。如果队列为空,且block
为True
,则阻塞直到有元素可取;如果设置了timeout
,超时后将引发异常。qsize ()
:返回队列中元素的数量。empty ()
:如果队列为空,返回True
;否则返回False
。full ()
:如果队列已满,返回True
;否则返回False
。
在队列读取与写入时,推荐先判断队列是否已空或已满,再进行操作,例如:
# 写入时,判断是否已满
if not q.full (): q.put_nowait ("message")# 读取时,判断是否已空
if not q.empty (): for i in range (q.size ()): print (q.get_nowait ())
# FIFO
import queue | |
# 创建一个 FIFO 队列 | |
fifo_queue = queue.Queue (maxsize=3) # 最大长度为 3 | |
# 添加元素 | |
fifo_queue.put (1) | |
fifo_queue.put (2) | |
fifo_queue.put (3) | |
print (fifo_queue.qsize ()) # 输出:3 | |
# 取出元素 | |
print (fifo_queue.get ()) # 输出:1 | |
print (fifo_queue.get ()) # 输出:2 | |
print (fifo_queue.empty ()) # 输出:False | |
# 添加一个新元素 | |
fifo_queue.put (4) | |
print (fifo_queue.get ()) # 输出:3 | |
print (fifo_queue.get ()) # 输出:4 | |
print (fifo_queue.empty ()) # 输出:True |
# LIFO
LifoQueue 的用法与 Queue 类似,但遵循后进先出的原则。
import queue | |
# 创建一个 LIFO 队列 | |
lifo_queue = queue.LifoQueue () | |
# 添加元素 | |
lifo_queue.put (1) | |
lifo_queue.put (2) | |
lifo_queue.put (3) | |
# 取出元素 | |
print (lifo_queue.get ()) # 输出:3 | |
print (lifo_queue.get ()) # 输出:2 |
# 优先队列
优先队列根据优先级(小的值优先)来处理元素。
import queue | |
# 创建一个优先队列 | |
priority_queue = queue.PriorityQueue () | |
# 添加元素,元素是元组 (优先级,值) | |
priority_queue.put ((2, 'task2')) | |
priority_queue.put ((1, 'task1')) | |
priority_queue.put ((3, 'task3')) | |
# 取出元素 | |
print (priority_queue.get ()) # 输出:(1, 'task1') | |
print (priority_queue.get ()) # 输出: (2, 'task2') |