装饰器的概念和意义

装饰器的常见类型

函数闭包

闭包与装饰器的区别

装饰器的概念和意义

用来装饰其他函数,为其他函数添加特定功能的函数

装饰器函数基本原则:

装饰器不能修改被装饰函数源码

装饰器不能修改装饰函数的调用方式

相关概念

函数即是变量

任何变量名都指向变量值的内存地址

函数也一样,函数的本质指向内存地址的引用,是 function 实例,函数名指向内存地址

def get_time ():
    print ("TEST")
count_time = get_time
count_time ()

高阶函数

符合下列条件

接受函数名作为形参

返回值包含函数名

# 导入 Callable,专门用于标注函数 / 可调用对象
from typing import Callable
def foo ():
    print ("in foo")
def boo (func: Callable) -> None:
    print (func)
    type (func)
    func ()
boo (foo)

exp:设计一个函数统计另一个函数运行时间

下示例不是装饰器,因为改变了 foo 函数调用方式

import time
def foo ():
    time.sleep (3)
    print ("In foo")
def timer (func):
    start_time = time.time ()
    func ()
    end_time = time.time ()
    print (f"timer {end_time - start_time}")
timer (foo)

如下

正常情况下 timer 运行完了 gf 的内存会被回收,但是由于它被返回给了 foo, 所以 foo 就重新指向了 gf, 但是原 foo 由于在 gf 要被调用,所以它的内存也没有被回收

import time
def foo ():
    time.sleep (1)
    print ("in foo")
print ("原 foo 函数对象地址:", hex (id (foo)))
def timer (func):
    print ("timer 内部 func 指向的地址:", hex (id (func)))
    def timing ():
        start_time = time.time ()
        func ()
        end_time = time.time ()
        print (f"耗时 {end_time - start_time} 秒")
    
    print ("timer 内部 timing 函数地址:", hex (id (timing)))
    return timing
foo = timer (foo)
print ("装饰后 foo 指向的地址:", hex (id (foo)))
foo ()
# 原 foo 函数对象地址: 0x2448a258a40
# timer 内部 func 指向的地址: 0x2448a258a40
# timer 内部 timing 函数地址: 0x2448a489080
# 装饰后 foo 指向的地址: 0x2448a489080

语法糖

如上装饰器中,函数运行进行了重赋值 foo = timer (foo)

为了简化这个过程,python 提供了语法糖,用 @ 符号放在需要被装饰的函数上

@timer 等价于 foo=timer (foo) ,会执行一次 timer 函数,,将 timer 函数的返回值(timing 函数)赋值给 foo 函数。

import time
def timer (func):
    def timing ():
        start_time = time.time ()
        func ()
        end_time = time.time ()
        print (f"耗时 {end_time - start_time} 秒")
    return timing
@timer
def foo ():
    time.sleep (1)
    print ("in foo")
foo ()

装饰器编写

定义一个高阶函数,接受函数名作为形参,返回嵌套函数名

在高阶函数中定义嵌套代码,在嵌套代码中添加新的功能并调用形参函数

外层函数的作用:

接收被装饰的原函数,形成闭包,保存原函数引用,创建并返回内层包装函数

内层函数的作用:

执行新增的扩展功能

调用原来的旧函数,保留原有功能

作为新的 foo 被外界调用

@装饰器 的核心逻辑,就是把 @ 下方紧挨着定义的函数,作为参数传给 @ 后面的函数,再把返回值重新赋值给这个函数名。

装饰器的几种类型

被装饰函数带参数

可变参数自适应被装饰函数的参数

import time
def timer (func):
    def timing (*args, **kwargs):
        start_time = time.time ()
        func (*args, **kwargs)
        end_time = time.time ()
        print (f"耗时 {end_time - start_time} 秒")
    return timing
@timer
def foo (n: int):
    time.sleep (n)
    print ("in foo")
foo (1)

装饰器本身带参数

import time
def timer (timer_type='minites'):
    def outer_timing (func):
        def inner_timing (*args, **kwargs):
            start_time = time.time ()
            func (*args, **kwargs)
            end_time = time.time ()
            print (f"耗时 {end_time - start_time} {timer_type}")    
        return inner_timing
    return outer_timing
@timer (timer_type='minites')
def foo (n: int):
    time.sleep (n)
    print ("in foo")
foo (1)

被装饰函数带返回值

多个返回值以元组的形式返回

import time
def timer (timer_type='seconds'):
    def outer_timing (func):
        def inner_timing (*args, **kwargs):
            start_time = time.time ()
            res = func (*args, **kwargs)
            end_time = time.time ()
            print (f"耗时 {end_time - start_time} {timer_type}")
            return res    
        return inner_timing
    return outer_timing
@timer (timer_type='seconds') # foo = timer (imer_type='minites')(foo)
def fibonacci (n: int):
    a, b = 0, 1
    for _ in range (n):
        a, b = b, a + b
    return a
fibonacci (500000)

闭包

下面给出两段代码示例,用于思考闭包的作用

func_list = []
for i in range (3):
    def myfunc (a):
        return i+a
    func_list.append (myfunc)
for f in func_list:
    print (f (a=1))
func_list = []
for i in range (3):
    def deco (i):
        def myfunc (a):
            return i+a
        return myfunc    
    func_list.append (deco (i))
for f in func_list:
    print (f (a=1))

如上,示例 1 返回 333,示例 2 返回 123

2 中 deco (i) 函数让 i 通过参数传入变成了 myfunc 的自由变量

闭包的作用

用来在一个函数与一组私有变量之间创建关联关系,在给定函数被多次调用的过程中,这些私有变量能够保持其持久性(保存运行环境与变量的状态)

闭包特征:

必须要有函数嵌套,且外层函数必须返回内层函数,外层函数相当于给内层函数提供了一个包装起来的运行环境,在包装的运行环境中,内层函数可以完全自己掌握自由变量的值

内层函数一定要用到外层函数中定义的自由变量

装饰器和闭包的区别:

装饰器的外层函数主要是提供被装饰函数的引用,闭包的外层函数主要是为了一共自由变量

装饰器的外层函数不一定要提供变量,闭包的外层函数必须提供自由变量,否则闭包无意义

装饰器的目的是为被装饰函数提供额外的功能,闭包的目的是保存函数运行环境和局部变量值

从形式上看,闭包是装饰器的子集

多线程的高级应用

线程之间的通讯机制

消息队列、Event 事件对象,Condition 条件对象

Event 事件对象

使用之前先初始化:event = threading.Event ()

方法

重置代码中的 event 对象,使得所有该 event 事件都处于待命状态

event.clear ()

阻塞线程,等待 event 指令

event.wait ()

发送 event 指令,使得所有设置该 event 事件的线程执行

event.set ()

如下示例,将 10 个线程共用一个 event 对象,实现集合点

import threading  # 多线程核心模块
import time       # 用于主线程休眠
class MyThread (threading.Thread):
    def __init__(self, event):
        super ().__init__()  # 调用父类构造方法
        self.event = event  # 保存事件对象
    def run (self):
        # 线程启动后自动执行的核心逻辑
        print (f"线程 {self.name} 已经初始化完成,等待启动...")
        self.event.wait ()   # 阻塞等待事件触发
        print (f"{self.name} 开始执行")
if __name__ == '__main__':
    event = threading.Event ()  # 创建线程同步事件
    threads = []               # 存储所有线程
    
    # 创建 10 个线程,全部共享同一个 event 事件
    [threads.append (MyThread (event)) for i in range (1, 11)]
    event.clear ()  # 重置事件为【未触发】状态
    # 启动所有 10 个线程
    [t.start () for t in threads]
    time.sleep (3)  # 主线程休眠 3 秒
    event.set ()    # 触发事件,唤醒所有等待的线程
    [t.join () for t in threads]  # 主线程等待所有子线程执行完毕

Condition 对象

多个线程之间由锁机制进行先后执行

Condition 必须遵循: acquire () → 操作 → release ()

主要方法

acquire () :手动获取锁

notify () :唤醒其他 wait 状态的线程,有多个时随机唤醒一个

notify_all () :唤醒所有 wait 状态的线程

wait () :进入 wait 线程挂起状态,临时释放锁,等待 notify 通知后,自动重新获取锁

release () :释放锁

更简单的方法,用 with 语句自动管理锁

示例 1:手动管理锁的生命周期,实现生产者生产数据后,消费者消费

import threading
import time
class Producer (threading.Thread):
    def __init__(self, name: str, cond: threading.Condition, lst: list):
        super ().__init__(name=name)
        self.cond = cond
        self.goods = lst
    
    def run (self):
        self.cond.acquire () # 获取锁
        time.sleep (1)
        self.goods.append ("data1")  # 生产
        self.cond.notify ()  # 唤起其余 wait 状态的线程
        self.cond.wait ()    # 挂起状态,暂时释放锁,等其他线程通过 notify 唤醒
        time.sleep (2)
        self.goods.append ("data2")
        self.cond.notify ()  # 唤起其余 wait 状态的线程
        self.cond.release ()    # 结束后释放锁
class Consumer (threading.Thread):
    def __init__(self, name: str, cond: threading.Condition, lst: list):
        super ().__init__(name=name)
        self.cond = cond
        self.goods = lst
    
    def run (self):
        self.cond.acquire () # 获取锁
        self.cond.wait ()    # 挂起状态,暂时释放锁,等待其他线程通过 notify 唤醒
        print (self.goods.pop ())
        self.cond.notify ()
        self.cond.wait ()
        print (self.goods.pop ())
        self.cond.notify ()
        self.cond.release ()
if __name__ == '__main__':
    # 创建条件变量
    cond = threading.Condition ()
    # 共享资源(仓库)
    goods = []
    producer = Producer (name='Producer', cond=cond, lst=goods)
    consumer = Consumer (name='Consumer', cond=cond, lst=goods)
    consumer.start ()
    producer.start ()
    consumer.join ()
    producer.join ()

示例 2:with 语句自动管理生命周期

import threading
import time
class Producer (threading.Thread):
    def __init__(self, name: str, cond: threading.Condition, lst: list):
        super ().__init__(name=name)
        self.cond = cond
        self.goods = lst
    
    def run (self):
        # with 语句自动获取 / 释放锁,无需手动 acquire/release
        with cond:
            time.sleep (1)   # 生产第一条数据
            self.goods.append ("data1")
            self.cond.notify (); self.cond.wait () # 唤醒消费者,自身挂起
            time.sleep (2)   # 生产第二条数据
            self.goods.append ("data2")
            self.cond.notify (); self.cond.wait () # 唤醒消费者,自身挂起
class Consumer (threading.Thread):
    def __init__(self, name: str, cond: threading.Condition, lst: list):
        super ().__init__(name=name)
        self.cond = cond
        self.goods = lst
    
    def run (self):
        with cond:
            self.cond.wait ()    # 挂起状态,等待生产者完成生产
            print (self.goods.pop ()) # 消费第一条数据
            self.cond.notify (); self.cond.wait () # 唤醒生产者,自身挂起
            print (self.goods.pop ()) # 消费第二条数据
            self.cond.notify ()  # 唤醒生产者,消费完成后无需挂起
if __name__ == '__main__':
    # 创建条件变量
    cond = threading.Condition ()
    # 共享资源(仓库)
    goods = []
    producer = Producer (name='Producer', cond=cond, lst=goods)
    consumer = Consumer (name='Consumer', cond=cond, lst=goods)
    consumer.start ()
    producer.start ()
    consumer.join ()
    producer.join ()

在有多个消费者的情况,可以使用 notify_all () 唤醒所有等待的消费者

如果想实现多个线程的优先级唤醒

在 Python 中,原生 threading.Condition 不支持优先级唤醒(它只有随机唤醒 1 个 / 全唤醒,无法指定优先级)。

想实现按优先级唤醒线程,我们必须自己封装一个支持优先级的同步工具,核心思路:

  1. 给线程绑定优先级数值(数字越小 / 越大,优先级越高,自定义规则)
  2. 用线程安全的容器维护「等待线程 + 优先级」
  3. 唤醒时主动筛选优先级最高的线程,而不是随机唤醒
import threading
import time
from typing import List, Dict
# 优先级条件变量
class PriorityCondition:
    def __init__(self):
        self.lock = threading.Lock ()          # 基础锁
        self.waiting_threads: List [Dict] = [] # 等待池:存储 {线程,优先级,唤醒事件}
    # 线程进入等待,并传入自己的优先级
    def wait (self, priority: int):
        # 创建事件:用于精准唤醒当前线程
        event = threading.Event ()
        with self.lock:
            # 把当前线程、优先级、事件加入等待池
            self.waiting_threads.append ({
                "thread": threading.current_thread (),
                "priority": priority,
                "event": event
            })
        # 释放锁,阻塞等待被唤醒
        event.wait ()
    # 【核心功能】唤醒:优先级最高的线程(数字越小,优先级越高)
    def notify_priority (self):
        with self.lock:
            if not self.waiting_threads:
                return
            # 按优先级排序(升序:0>1>2),取第一个
            self.waiting_threads.sort (key=lambda x: x ["priority"])
            highest = self.waiting_threads.pop (0)
            highest ["event"].set ()  # 精准唤醒优先级最高的线程
    # 唤醒所有线程(兼容原有功能)
    def notify_all (self):
        with self.lock:
            for item in self.waiting_threads:
                item ["event"].set ()
            self.waiting_threads.clear ()
# 生产者 / 消费者 线程类
class Producer (threading.Thread):
    def __init__(self, name: str, cond: PriorityCondition, lst: list):
        super ().__init__(name=name)
        self.cond = cond
        self.goods = lst
    def run (self):
        # 生产第 1 个数据
        time.sleep (1)
        with self.cond.lock:
            self.goods.append ("data1")
            print ("生产者:生产 data1")
            self.cond.notify_priority ()  # 唤醒【优先级最高】的消费者
        
        # 生产第 2 个数据
        time.sleep (2)
        with self.cond.lock:
            self.goods.append ("data2")
            print ("生产者:生产 data2")
            self.cond.notify_priority ()  # 再次唤醒优先级最高的消费者
class Consumer (threading.Thread):
    # priority 参数,用于设置优先级
    def __init__(self, name: str, cond: PriorityCondition, lst: list, priority: int):
        super ().__init__(name=name)
        self.cond = cond
        self.goods = lst
        self.priority = priority  # 线程优先级
    def run (self):
        while True:
            # 带优先级进入等待
            self.cond.wait (self.priority)
            
            # 被唤醒后消费数据
            with self.cond.lock:
                if self.goods:
                    data = self.goods.pop ()
                    print (f"{self.name}(优先级 {self.priority}):消费 {data}")
            
            # 模拟消费完成
            time.sleep (0.1)
# ===================== 主程序 =====================
if __name__ == '__main__':
    cond = PriorityCondition ()
    goods = []
    # 创建 2 个消费者:【优先级 0 > 优先级 1】
    consumer_high = Consumer (name="高优先级消费者", cond=cond, lst=goods, priority=0)
    consumer_low = Consumer (name="低优先级消费者", cond=cond, lst=goods, priority=1)
    
    producer = Producer (name="生产者", cond=cond, lst=goods)
    # 启动线程
    consumer_high.start ()
    consumer_low.start ()
    time.sleep (0.5)
    producer.start ()
    # 等待线程结束
    producer.join ()
    # 唤醒所有等待线程,安全退出
    cond.notify_all ()
    consumer_high.join ()
    consumer_low.join ()