装饰器的概念和意义
装饰器的常见类型
函数闭包
闭包与装饰器的区别
装饰器的概念和意义
用来装饰其他函数,为其他函数添加特定功能的函数
装饰器函数基本原则:
装饰器不能修改被装饰函数源码
装饰器不能修改装饰函数的调用方式
相关概念
函数即是变量
任何变量名都指向变量值的内存地址
函数也一样,函数的本质指向内存地址的引用,是 function 实例,函数名指向内存地址
def get_time (): | |
print ("TEST") | |
count_time = get_time | |
count_time () |
高阶函数
符合下列条件
接受函数名作为形参
返回值包含函数名
# 导入 Callable,专门用于标注函数 / 可调用对象 | |
from typing import Callable | |
def foo (): | |
print ("in foo") | |
def boo (func: Callable) -> None: | |
print (func) | |
type (func) | |
func () | |
boo (foo) |
exp:设计一个函数统计另一个函数运行时间
下示例不是装饰器,因为改变了 foo 函数调用方式
import time | |
def foo (): | |
time.sleep (3) | |
print ("In foo") | |
def timer (func): | |
start_time = time.time () | |
func () | |
end_time = time.time () | |
print (f"timer {end_time - start_time}") | |
timer (foo) |
如下
正常情况下 timer 运行完了 gf 的内存会被回收,但是由于它被返回给了 foo, 所以 foo 就重新指向了 gf, 但是原 foo 由于在 gf 要被调用,所以它的内存也没有被回收
import time | |
def foo (): | |
time.sleep (1) | |
print ("in foo") | |
print ("原 foo 函数对象地址:", hex (id (foo))) | |
def timer (func): | |
print ("timer 内部 func 指向的地址:", hex (id (func))) | |
def timing (): | |
start_time = time.time () | |
func () | |
end_time = time.time () | |
print (f"耗时 {end_time - start_time} 秒") | |
print ("timer 内部 timing 函数地址:", hex (id (timing))) | |
return timing | |
foo = timer (foo) | |
print ("装饰后 foo 指向的地址:", hex (id (foo))) | |
foo () | |
# 原 foo 函数对象地址: 0x2448a258a40 | |
# timer 内部 func 指向的地址: 0x2448a258a40 | |
# timer 内部 timing 函数地址: 0x2448a489080 | |
# 装饰后 foo 指向的地址: 0x2448a489080 |
语法糖
如上装饰器中,函数运行进行了重赋值 foo = timer (foo)
为了简化这个过程,python 提供了语法糖,用 @ 符号放在需要被装饰的函数上
@timer 等价于 foo=timer (foo) ,会执行一次 timer 函数,,将 timer 函数的返回值(timing 函数)赋值给 foo 函数。
import time | |
def timer (func): | |
def timing (): | |
start_time = time.time () | |
func () | |
end_time = time.time () | |
print (f"耗时 {end_time - start_time} 秒") | |
return timing | |
@timer | |
def foo (): | |
time.sleep (1) | |
print ("in foo") | |
foo () |
装饰器编写
定义一个高阶函数,接受函数名作为形参,返回嵌套函数名
在高阶函数中定义嵌套代码,在嵌套代码中添加新的功能并调用形参函数
外层函数的作用:
接收被装饰的原函数,形成闭包,保存原函数引用,创建并返回内层包装函数
内层函数的作用:
执行新增的扩展功能
调用原来的旧函数,保留原有功能
作为新的 foo 被外界调用
@装饰器 的核心逻辑,就是把 @ 下方紧挨着定义的函数,作为参数传给 @ 后面的函数,再把返回值重新赋值给这个函数名。
装饰器的几种类型
被装饰函数带参数
可变参数自适应被装饰函数的参数
import time | |
def timer (func): | |
def timing (*args, **kwargs): | |
start_time = time.time () | |
func (*args, **kwargs) | |
end_time = time.time () | |
print (f"耗时 {end_time - start_time} 秒") | |
return timing | |
@timer | |
def foo (n: int): | |
time.sleep (n) | |
print ("in foo") | |
foo (1) |
装饰器本身带参数
import time | |
def timer (timer_type='minites'): | |
def outer_timing (func): | |
def inner_timing (*args, **kwargs): | |
start_time = time.time () | |
func (*args, **kwargs) | |
end_time = time.time () | |
print (f"耗时 {end_time - start_time} {timer_type}") | |
return inner_timing | |
return outer_timing | |
@timer (timer_type='minites') | |
def foo (n: int): | |
time.sleep (n) | |
print ("in foo") | |
foo (1) |
被装饰函数带返回值
多个返回值以元组的形式返回
import time | |
def timer (timer_type='seconds'): | |
def outer_timing (func): | |
def inner_timing (*args, **kwargs): | |
start_time = time.time () | |
res = func (*args, **kwargs) | |
end_time = time.time () | |
print (f"耗时 {end_time - start_time} {timer_type}") | |
return res | |
return inner_timing | |
return outer_timing | |
@timer (timer_type='seconds') # foo = timer (imer_type='minites')(foo) | |
def fibonacci (n: int): | |
a, b = 0, 1 | |
for _ in range (n): | |
a, b = b, a + b | |
return a | |
fibonacci (500000) |
闭包
下面给出两段代码示例,用于思考闭包的作用
func_list = [] | |
for i in range (3): | |
def myfunc (a): | |
return i+a | |
func_list.append (myfunc) | |
for f in func_list: | |
print (f (a=1)) |
func_list = [] | |
for i in range (3): | |
def deco (i): | |
def myfunc (a): | |
return i+a | |
return myfunc | |
func_list.append (deco (i)) | |
for f in func_list: | |
print (f (a=1)) |
如上,示例 1 返回 333,示例 2 返回 123
2 中 deco (i) 函数让 i 通过参数传入变成了 myfunc 的自由变量
闭包的作用
用来在一个函数与一组私有变量之间创建关联关系,在给定函数被多次调用的过程中,这些私有变量能够保持其持久性(保存运行环境与变量的状态)
闭包特征:
必须要有函数嵌套,且外层函数必须返回内层函数,外层函数相当于给内层函数提供了一个包装起来的运行环境,在包装的运行环境中,内层函数可以完全自己掌握自由变量的值
内层函数一定要用到外层函数中定义的自由变量
装饰器和闭包的区别:
装饰器的外层函数主要是提供被装饰函数的引用,闭包的外层函数主要是为了一共自由变量
装饰器的外层函数不一定要提供变量,闭包的外层函数必须提供自由变量,否则闭包无意义
装饰器的目的是为被装饰函数提供额外的功能,闭包的目的是保存函数运行环境和局部变量值
从形式上看,闭包是装饰器的子集
多线程的高级应用
线程之间的通讯机制
消息队列、Event 事件对象,Condition 条件对象
Event 事件对象
使用之前先初始化:event = threading.Event ()
方法
重置代码中的 event 对象,使得所有该 event 事件都处于待命状态
event.clear ()
阻塞线程,等待 event 指令
event.wait ()
发送 event 指令,使得所有设置该 event 事件的线程执行
event.set ()
如下示例,将 10 个线程共用一个 event 对象,实现集合点
import threading # 多线程核心模块 | |
import time # 用于主线程休眠 | |
class MyThread (threading.Thread): | |
def __init__(self, event): | |
super ().__init__() # 调用父类构造方法 | |
self.event = event # 保存事件对象 | |
def run (self): | |
# 线程启动后自动执行的核心逻辑 | |
print (f"线程 {self.name} 已经初始化完成,等待启动...") | |
self.event.wait () # 阻塞等待事件触发 | |
print (f"{self.name} 开始执行") | |
if __name__ == '__main__': | |
event = threading.Event () # 创建线程同步事件 | |
threads = [] # 存储所有线程 | |
# 创建 10 个线程,全部共享同一个 event 事件 | |
[threads.append (MyThread (event)) for i in range (1, 11)] | |
event.clear () # 重置事件为【未触发】状态 | |
# 启动所有 10 个线程 | |
[t.start () for t in threads] | |
time.sleep (3) # 主线程休眠 3 秒 | |
event.set () # 触发事件,唤醒所有等待的线程 | |
[t.join () for t in threads] # 主线程等待所有子线程执行完毕 |
Condition 对象
多个线程之间由锁机制进行先后执行
Condition 必须遵循: acquire () → 操作 → release ()
主要方法
acquire () :手动获取锁
notify () :唤醒其他 wait 状态的线程,有多个时随机唤醒一个
notify_all () :唤醒所有 wait 状态的线程
wait () :进入 wait 线程挂起状态,临时释放锁,等待 notify 通知后,自动重新获取锁
release () :释放锁
更简单的方法,用 with 语句自动管理锁
示例 1:手动管理锁的生命周期,实现生产者生产数据后,消费者消费
import threading | |
import time | |
class Producer (threading.Thread): | |
def __init__(self, name: str, cond: threading.Condition, lst: list): | |
super ().__init__(name=name) | |
self.cond = cond | |
self.goods = lst | |
def run (self): | |
self.cond.acquire () # 获取锁 | |
time.sleep (1) | |
self.goods.append ("data1") # 生产 | |
self.cond.notify () # 唤起其余 wait 状态的线程 | |
self.cond.wait () # 挂起状态,暂时释放锁,等其他线程通过 notify 唤醒 | |
time.sleep (2) | |
self.goods.append ("data2") | |
self.cond.notify () # 唤起其余 wait 状态的线程 | |
self.cond.release () # 结束后释放锁 | |
class Consumer (threading.Thread): | |
def __init__(self, name: str, cond: threading.Condition, lst: list): | |
super ().__init__(name=name) | |
self.cond = cond | |
self.goods = lst | |
def run (self): | |
self.cond.acquire () # 获取锁 | |
self.cond.wait () # 挂起状态,暂时释放锁,等待其他线程通过 notify 唤醒 | |
print (self.goods.pop ()) | |
self.cond.notify () | |
self.cond.wait () | |
print (self.goods.pop ()) | |
self.cond.notify () | |
self.cond.release () | |
if __name__ == '__main__': | |
# 创建条件变量 | |
cond = threading.Condition () | |
# 共享资源(仓库) | |
goods = [] | |
producer = Producer (name='Producer', cond=cond, lst=goods) | |
consumer = Consumer (name='Consumer', cond=cond, lst=goods) | |
consumer.start () | |
producer.start () | |
consumer.join () | |
producer.join () |
示例 2:with 语句自动管理生命周期
import threading | |
import time | |
class Producer (threading.Thread): | |
def __init__(self, name: str, cond: threading.Condition, lst: list): | |
super ().__init__(name=name) | |
self.cond = cond | |
self.goods = lst | |
def run (self): | |
# with 语句自动获取 / 释放锁,无需手动 acquire/release | |
with cond: | |
time.sleep (1) # 生产第一条数据 | |
self.goods.append ("data1") | |
self.cond.notify (); self.cond.wait () # 唤醒消费者,自身挂起 | |
time.sleep (2) # 生产第二条数据 | |
self.goods.append ("data2") | |
self.cond.notify (); self.cond.wait () # 唤醒消费者,自身挂起 | |
class Consumer (threading.Thread): | |
def __init__(self, name: str, cond: threading.Condition, lst: list): | |
super ().__init__(name=name) | |
self.cond = cond | |
self.goods = lst | |
def run (self): | |
with cond: | |
self.cond.wait () # 挂起状态,等待生产者完成生产 | |
print (self.goods.pop ()) # 消费第一条数据 | |
self.cond.notify (); self.cond.wait () # 唤醒生产者,自身挂起 | |
print (self.goods.pop ()) # 消费第二条数据 | |
self.cond.notify () # 唤醒生产者,消费完成后无需挂起 | |
if __name__ == '__main__': | |
# 创建条件变量 | |
cond = threading.Condition () | |
# 共享资源(仓库) | |
goods = [] | |
producer = Producer (name='Producer', cond=cond, lst=goods) | |
consumer = Consumer (name='Consumer', cond=cond, lst=goods) | |
consumer.start () | |
producer.start () | |
consumer.join () | |
producer.join () |
在有多个消费者的情况,可以使用 notify_all () 唤醒所有等待的消费者
如果想实现多个线程的优先级唤醒
在 Python 中,原生 threading.Condition 不支持优先级唤醒(它只有随机唤醒 1 个 / 全唤醒,无法指定优先级)。
想实现按优先级唤醒线程,我们必须自己封装一个支持优先级的同步工具,核心思路:
- 给线程绑定优先级数值(数字越小 / 越大,优先级越高,自定义规则)
- 用线程安全的容器维护「等待线程 + 优先级」
- 唤醒时主动筛选优先级最高的线程,而不是随机唤醒
import threading | |
import time | |
from typing import List, Dict | |
# 优先级条件变量 | |
class PriorityCondition: | |
def __init__(self): | |
self.lock = threading.Lock () # 基础锁 | |
self.waiting_threads: List [Dict] = [] # 等待池:存储 {线程,优先级,唤醒事件} | |
# 线程进入等待,并传入自己的优先级 | |
def wait (self, priority: int): | |
# 创建事件:用于精准唤醒当前线程 | |
event = threading.Event () | |
with self.lock: | |
# 把当前线程、优先级、事件加入等待池 | |
self.waiting_threads.append ({ | |
"thread": threading.current_thread (), | |
"priority": priority, | |
"event": event | |
}) | |
# 释放锁,阻塞等待被唤醒 | |
event.wait () | |
# 【核心功能】唤醒:优先级最高的线程(数字越小,优先级越高) | |
def notify_priority (self): | |
with self.lock: | |
if not self.waiting_threads: | |
return | |
# 按优先级排序(升序:0>1>2),取第一个 | |
self.waiting_threads.sort (key=lambda x: x ["priority"]) | |
highest = self.waiting_threads.pop (0) | |
highest ["event"].set () # 精准唤醒优先级最高的线程 | |
# 唤醒所有线程(兼容原有功能) | |
def notify_all (self): | |
with self.lock: | |
for item in self.waiting_threads: | |
item ["event"].set () | |
self.waiting_threads.clear () | |
# 生产者 / 消费者 线程类 | |
class Producer (threading.Thread): | |
def __init__(self, name: str, cond: PriorityCondition, lst: list): | |
super ().__init__(name=name) | |
self.cond = cond | |
self.goods = lst | |
def run (self): | |
# 生产第 1 个数据 | |
time.sleep (1) | |
with self.cond.lock: | |
self.goods.append ("data1") | |
print ("生产者:生产 data1") | |
self.cond.notify_priority () # 唤醒【优先级最高】的消费者 | |
# 生产第 2 个数据 | |
time.sleep (2) | |
with self.cond.lock: | |
self.goods.append ("data2") | |
print ("生产者:生产 data2") | |
self.cond.notify_priority () # 再次唤醒优先级最高的消费者 | |
class Consumer (threading.Thread): | |
# priority 参数,用于设置优先级 | |
def __init__(self, name: str, cond: PriorityCondition, lst: list, priority: int): | |
super ().__init__(name=name) | |
self.cond = cond | |
self.goods = lst | |
self.priority = priority # 线程优先级 | |
def run (self): | |
while True: | |
# 带优先级进入等待 | |
self.cond.wait (self.priority) | |
# 被唤醒后消费数据 | |
with self.cond.lock: | |
if self.goods: | |
data = self.goods.pop () | |
print (f"{self.name}(优先级 {self.priority}):消费 {data}") | |
# 模拟消费完成 | |
time.sleep (0.1) | |
# ===================== 主程序 ===================== | |
if __name__ == '__main__': | |
cond = PriorityCondition () | |
goods = [] | |
# 创建 2 个消费者:【优先级 0 > 优先级 1】 | |
consumer_high = Consumer (name="高优先级消费者", cond=cond, lst=goods, priority=0) | |
consumer_low = Consumer (name="低优先级消费者", cond=cond, lst=goods, priority=1) | |
producer = Producer (name="生产者", cond=cond, lst=goods) | |
# 启动线程 | |
consumer_high.start () | |
consumer_low.start () | |
time.sleep (0.5) | |
producer.start () | |
# 等待线程结束 | |
producer.join () | |
# 唤醒所有等待线程,安全退出 | |
cond.notify_all () | |
consumer_high.join () | |
consumer_low.join () |