装饰器及闭包

装饰器的概念和意义

装饰器的常见类型

函数闭包

闭包与装饰器的区别

装饰器的概念和意义

用来装饰其他函数,为其他函数添加特定功能的函数

装饰器函数基本原则:

装饰器不能修改被装饰函数源码

装饰器不能修改装饰函数的调用方式

相关概念

函数即是变量

任何变量名都指向变量值的内存地址

函数也一样,函数的本质指向内存地址的引用,是function实例,函数名指向内存地址

def get_time(): print("TEST") count_time = get_time count_time()

高阶函数

符合下列条件

接受函数名作为形参

返回值包含函数名

# 导入Callable,专门用于标注函数/可调用对象 from typing import Callable def foo(): print("in foo") def boo(func: Callable) -> None: print(func) type(func) func() boo(foo)

exp:设计一个函数统计另一个函数运行时间

下示例不是装饰器,因为改变了foo函数调用方式

import time def foo(): time.sleep(3) print("In foo") def timer(func): start_time = time.time() func() end_time = time.time() print(f"timer {end_time - start_time}") timer(foo)

如下

正常情况下timer运行完了gf的内存会被回收,但是由于它被返回给了foo,所以foo就重新指向了gf,但是原foo由于在gf要被调用,所以它的内存也没有被回收

import time def foo(): time.sleep(1) print("in foo") print("原 foo 函数对象地址:", hex(id(foo))) def timer(func): print("timer 内部 func 指向的地址:", hex(id(func))) def timing(): start_time = time.time() func() end_time = time.time() print(f"耗时 {end_time - start_time} 秒") print("timer 内部 timing 函数地址:", hex(id(timing))) return timing foo = timer(foo) print("装饰后 foo 指向的地址:", hex(id(foo))) foo() # 原 foo 函数对象地址: 0x2448a258a40 # timer 内部 func 指向的地址: 0x2448a258a40 # timer 内部 timing 函数地址: 0x2448a489080 # 装饰后 foo 指向的地址: 0x2448a489080

语法糖

如上装饰器中,函数运行进行了重赋值foo = timer(foo)

为了简化这个过程,python提供了语法糖,用@符号放在需要被装饰的函数上

@timer等价于foo=timer(foo),会执行一次 timer函数,,将timer函数的返回值(timing函数)赋值给foo函数。

import time def timer(func): def timing(): start_time = time.time() func() end_time = time.time() print(f"耗时 {end_time - start_time} 秒") return timing @timer def foo(): time.sleep(1) print("in foo") foo()

装饰器编写

定义一个高阶函数,接受函数名作为形参,返回嵌套函数名

在高阶函数中定义嵌套代码,在嵌套代码中添加新的功能并调用形参函数

外层函数的作用:

接收被装饰的原函数,形成闭包,保存原函数引用,创建并返回内层包装函数

内层函数的作用:

执行新增的扩展功能

调用原来的旧函数,保留原有功能

作为新的 foo 被外界调用

@装饰器 的核心逻辑,就是把 @ 下方紧挨着定义的函数,作为参数传给 @ 后面的函数,再把返回值重新赋值给这个函数名。

装饰器的几种类型

被装饰函数带参数

可变参数自适应被装饰函数的参数

import time def timer(func): def timing(*args, **kwargs): start_time = time.time() func(*args, **kwargs) end_time = time.time() print(f"耗时 {end_time - start_time} 秒") return timing @timer def foo(n: int): time.sleep(n) print("in foo") foo(1)

装饰器本身带参数

import time def timer(timer_type='minites'): def outer_timing(func): def inner_timing(*args, **kwargs): start_time = time.time() func(*args, **kwargs) end_time = time.time() print(f"耗时 {end_time - start_time} {timer_type}") return inner_timing return outer_timing @timer(timer_type='minites') def foo(n: int): time.sleep(n) print("in foo") foo(1)

被装饰函数带返回值

多个返回值以元组的形式返回

import time def timer(timer_type='seconds'): def outer_timing(func): def inner_timing(*args, **kwargs): start_time = time.time() res = func(*args, **kwargs) end_time = time.time() print(f"耗时 {end_time - start_time} {timer_type}") return res return inner_timing return outer_timing @timer(timer_type='seconds') # foo = timer(imer_type='minites')(foo) def fibonacci(n: int): a, b = 0, 1 for _ in range(n): a, b = b, a + b return a fibonacci(500000)

闭包

下面给出两段代码示例,用于思考闭包的作用

func_list = [] for i in range(3): def myfunc(a): return i+a func_list.append(myfunc) for f in func_list: print(f(a=1))
func_list = [] for i in range(3): def deco(i): def myfunc(a): return i+a return myfunc func_list.append(deco(i)) for f in func_list: print(f(a=1))

如上,示例1返回333,示例2返回123

2中deco(i)函数让i通过参数传入变成了myfunc的自由变量

闭包的作用

用来在一个函数与一组私有变量之间创建关联关系,在给定函数被多次调用的过程中,这些私有变量能够保持其持久性(保存运行环境与变量的状态)

闭包特征:

必须要有函数嵌套,且外层函数必须返回内层函数,外层函数相当于给内层函数提供了一个包装起来的运行环境,在包装的运行环境中,内层函数可以完全自己掌握自由变量的值

内层函数一定要用到外层函数中定义的自由变量

装饰器和闭包的区别:

装饰器的外层函数主要是提供被装饰函数的引用,闭包的外层函数主要是为了一共自由变量

装饰器的外层函数不一定要提供变量,闭包的外层函数必须提供自由变量,否则闭包无意义

装饰器的目的是为被装饰函数提供额外的功能,闭包的目的是保存函数运行环境和局部变量值

从形式上看,闭包是装饰器的子集

多线程的高级应用

线程之间的通讯机制

消息队列、Event事件对象,Condition条件对象

Event事件对象

使用之前先初始化:event = threading.Event()

方法

重置代码中的event对象,使得所有该event事件都处于待命状态

event.clear()

阻塞线程,等待event指令

event.wait()

发送event指令,使得所有设置该event事件的线程执行

event.set()

如下示例,将10个线程共用一个event对象,实现集合点

import threading # 多线程核心模块 import time # 用于主线程休眠 class MyThread(threading.Thread): def __init__(self, event): super().__init__() # 调用父类构造方法 self.event = event # 保存事件对象 def run(self): # 线程启动后自动执行的核心逻辑 print(f"线程 {self.name} 已经初始化完成,等待启动...") self.event.wait() # 阻塞等待事件触发 print(f"{self.name} 开始执行") if __name__ == '__main__': event = threading.Event() # 创建线程同步事件 threads = [] # 存储所有线程 # 创建10个线程,全部共享同一个event事件 [threads.append(MyThread(event)) for i in range (1, 11)] event.clear() # 重置事件为【未触发】状态 # 启动所有10个线程 [t.start() for t in threads] time.sleep(3) # 主线程休眠3秒 event.set() # 触发事件,唤醒所有等待的线程 [t.join() for t in threads] # 主线程等待所有子线程执行完毕

Condition对象

多个线程之间由锁机制进行先后执行

Condition 必须遵循:acquire () → 操作 → release ()

主要方法

acquire() :手动获取锁

notify():唤醒其他wait状态的线程,有多个时随机唤醒一个

notify_all():唤醒所有wait状态的线程

wait():进入wait线程挂起状态,临时释放锁,等待notify通知后,自动重新获取锁

release():释放锁

更简单的方法,用with语句自动管理锁

示例1:手动管理锁的生命周期,实现生产者生产数据后,消费者消费

import threading import time class Producer(threading.Thread): def __init__(self, name: str, cond: threading.Condition, lst: list): super().__init__(name=name) self.cond = cond self.goods = lst def run(self): self.cond.acquire() # 获取锁 time.sleep(1) self.goods.append("data1") # 生产 self.cond.notify() # 唤起其余wait状态的线程 self.cond.wait() # 挂起状态,暂时释放锁,等其他线程通过notify唤醒 time.sleep(2) self.goods.append("data2") self.cond.notify() # 唤起其余wait状态的线程 self.cond.release() # 结束后释放锁 class Consumer(threading.Thread): def __init__(self, name: str, cond: threading.Condition, lst: list): super().__init__(name=name) self.cond = cond self.goods = lst def run(self): self.cond.acquire() # 获取锁 self.cond.wait() # 挂起状态,暂时释放锁,等待其他线程通过notify唤醒 print(self.goods.pop()) self.cond.notify() self.cond.wait() print(self.goods.pop()) self.cond.notify() self.cond.release() if __name__ == '__main__': # 创建条件变量 cond = threading.Condition() # 共享资源(仓库) goods = [] producer = Producer(name='Producer', cond=cond, lst=goods) consumer = Consumer(name='Consumer', cond=cond, lst=goods) consumer.start() producer.start() consumer.join() producer.join()

示例2:with语句自动管理生命周期

import threading import time class Producer(threading.Thread): def __init__(self, name: str, cond: threading.Condition, lst: list): super().__init__(name=name) self.cond = cond self.goods = lst def run(self): # with 语句自动获取/释放锁,无需手动acquire/release with cond: time.sleep(1) # 生产第一条数据 self.goods.append("data1") self.cond.notify(); self.cond.wait() # 唤醒消费者,自身挂起 time.sleep(2) # 生产第二条数据 self.goods.append("data2") self.cond.notify(); self.cond.wait() # 唤醒消费者,自身挂起 class Consumer(threading.Thread): def __init__(self, name: str, cond: threading.Condition, lst: list): super().__init__(name=name) self.cond = cond self.goods = lst def run(self): with cond: self.cond.wait() # 挂起状态,等待生产者完成生产 print(self.goods.pop()) # 消费第一条数据 self.cond.notify(); self.cond.wait() # 唤醒生产者,自身挂起 print(self.goods.pop()) # 消费第二条数据 self.cond.notify() # 唤醒生产者,消费完成后无需挂起 if __name__ == '__main__': # 创建条件变量 cond = threading.Condition() # 共享资源(仓库) goods = [] producer = Producer(name='Producer', cond=cond, lst=goods) consumer = Consumer(name='Consumer', cond=cond, lst=goods) consumer.start() producer.start() consumer.join() producer.join()

在有多个消费者的情况,可以使用notify_all()唤醒所有等待的消费者

如果想实现多个线程的优先级唤醒

在 Python 中,原生 threading.Condition 不支持优先级唤醒(它只有随机唤醒 1 个 / 全唤醒,无法指定优先级)。

想实现按优先级唤醒线程,我们必须自己封装一个支持优先级的同步工具,核心思路:

  1. 给线程绑定优先级数值(数字越小 / 越大,优先级越高,自定义规则)
  2. 用线程安全的容器维护「等待线程 + 优先级」
  3. 唤醒时主动筛选优先级最高的线程,而不是随机唤醒
import threading import time from typing import List, Dict # 优先级条件变量 class PriorityCondition: def __init__(self): self.lock = threading.Lock() # 基础锁 self.waiting_threads: List[Dict] = [] # 等待池:存储{线程, 优先级, 唤醒事件} # 线程进入等待,并传入自己的优先级 def wait(self, priority: int): # 创建事件:用于精准唤醒当前线程 event = threading.Event() with self.lock: # 把当前线程、优先级、事件加入等待池 self.waiting_threads.append({ "thread": threading.current_thread(), "priority": priority, "event": event }) # 释放锁,阻塞等待被唤醒 event.wait() # 【核心功能】唤醒:优先级最高的线程(数字越小,优先级越高) def notify_priority(self): with self.lock: if not self.waiting_threads: return # 按优先级排序(升序:0>1>2),取第一个 self.waiting_threads.sort(key=lambda x: x["priority"]) highest = self.waiting_threads.pop(0) highest["event"].set() # 精准唤醒优先级最高的线程 # 唤醒所有线程(兼容原有功能) def notify_all(self): with self.lock: for item in self.waiting_threads: item["event"].set() self.waiting_threads.clear() # 生产者/消费者 线程类 class Producer(threading.Thread): def __init__(self, name: str, cond: PriorityCondition, lst: list): super().__init__(name=name) self.cond = cond self.goods = lst def run(self): # 生产第1个数据 time.sleep(1) with self.cond.lock: self.goods.append("data1") print("生产者:生产 data1") self.cond.notify_priority() # 唤醒【优先级最高】的消费者 # 生产第2个数据 time.sleep(2) with self.cond.lock: self.goods.append("data2") print("生产者:生产 data2") self.cond.notify_priority() # 再次唤醒优先级最高的消费者 class Consumer(threading.Thread): # priority 参数,用于设置优先级 def __init__(self, name: str, cond: PriorityCondition, lst: list, priority: int): super().__init__(name=name) self.cond = cond self.goods = lst self.priority = priority # 线程优先级 def run(self): while True: # 带优先级进入等待 self.cond.wait(self.priority) # 被唤醒后消费数据 with self.cond.lock: if self.goods: data = self.goods.pop() print(f"{self.name}(优先级{self.priority}):消费 {data}") # 模拟消费完成 time.sleep(0.1) # ===================== 主程序 ===================== if __name__ == '__main__': cond = PriorityCondition() goods = [] # 创建2个消费者:【优先级0 > 优先级1】 consumer_high = Consumer(name="高优先级消费者", cond=cond, lst=goods, priority=0) consumer_low = Consumer(name="低优先级消费者", cond=cond, lst=goods, priority=1) producer = Producer(name="生产者", cond=cond, lst=goods) # 启动线程 consumer_high.start() consumer_low.start() time.sleep(0.5) producer.start() # 等待线程结束 producer.join() # 唤醒所有等待线程,安全退出 cond.notify_all() consumer_high.join() consumer_low.join()

消息隔离机制

全局变量,每一个线程都会对全局变量赋不同值,

为了满足这个需求,需要使用一个线程内部的全局变量treding.local()

如下示例中,主线程和两个子线程的name分别赋值

import threading local_data = threading.local() local_data.name = 'local_data' class treading_local_demo(threading.Thread): """设置一个共享全局变量, 2个线程分别设置全局变量的值并分别打印 """ def run(self): print("赋值前", threading.current_thread(), local_data.__dict__) local_data.name = self.name print("赋值后", threading.current_thread(), local_data.__dict__) if __name__ =='__main__': print("start", local_data.__dict__) t1 = treading_local_demo(name="t-1") t1.start() t1.join() t2 = treading_local_demo(name="t-2") t2.start() t2.join() print("end", local_data.__dict__)

线程池的基本应用

线程池:concurrent.futures中的ThreadPoolExcutor

主线程可以获取一个线程或任务的状态,以及返回值

当一个线程完成的时候,主线程可以立即感知

让多线程和多进程的编码接口一致

主线程频繁的创建和销毁线程,会造成不必要的资源损耗

线程池的创建和使用,示例如下

from concurrent.futures import ThreadPoolExecutor # 需要多线程执行的函数 def func(message: str): return message # 创建线程池 executor = ThreadPoolExecutor(max_workers=3) # 通过submit提交任务,submit会立即返回,不会阻塞主线程 future = executor.submit(func, message="xxx")

线程池常用方法

executor.submit方法会立即返回线程对象,该对象常用有如下方法

future = executor.submit(func, message="xxx")

  • done():检查任务是否完成并返回bool: True/false
future.done()
  • cancel():取消任务执行,该任务没有被放入线程池能被取消
future.cancel()
  • result():获取任务执行结果,阻塞方法, 可设置超时时间
future.result(timeout=1)
  • as_completed():生成器,任务未完成会阻塞

    as_completed()方法的return顺序取决于线程执行速度,最先执行完的线程最先返回

from concurrent.futures import ThreadPoolExecutor, as_complted urls = ["A", "B", "C"] # future_list的元素是线程对象 future_list = [executor.submit(func, url) for url in urls] # as_completed阻塞, 仅当有任务返回时才会进入循环体内部 for item in as_completed(future_list): data = item.result()
  • map():映射,任务未完成会阻塞

    map()方法的return顺序与传入list顺序一致,即无论执行快慢,最先被传入的线程最先返回

urls = ["A", "B", "C"] # map将参数自动映射,可直接传入多个线程的参数,且无需submit for data in executor.map(func, urls): print(data)
  • wait(): 阻塞主线程, 待条件成立才继续
from concurrent.futures import ThreadPoolExecutor, as_complted, wait, ALL_COMPLTED urls = ["A", "B", "C"] future_list = [executor.submit(func, url) for url in urls] for item in as_completed(future_list): data = item.result() # 需要传入多线程的指针以及等待条件,此处ALL_COMPLTED意为全部线程执行完毕 wait(future_list, ALL_COMPLTED)
  • add_done_callback(): 线程完成回调函数,给线程任务绑定一个任务执行完成后自动调用的函数, 即子线程完成后不会立即结束线程,而是调用回调函数

    add_done_callback()只能传入线程对象,如果在传入对象前需要给对象赋值,可以使用偏函数partial将赋值后的函数封装传入

from concurrent.futures import ThreadPoolExecutor from functools import partial # 需要多线程执行的函数 def func(message: str): return message # 线程执行后的回调函数 def callback(msg): print(msg) urls = ["A", "B", "C"] for url in urls: future = executor.submit(func, url) # 绑定回调函数,线程执行完毕后输出结果 future.add_done_callback(partial(callback, msg=""))

如下示例,创建一个线程池,用户能向AI发送多段对话,每个对话都在线程池中运行

"""线程池的基本应用 """ from concurrent.futures import ThreadPoolExecutor from functools import partial from openai import OpenAI from dotenv import load_dotenv import os class siliconflow_api: """硅基流动api """ def __init__(self, apikey: str): siliconflow_apikey = apikey self.client = OpenAI( api_key = siliconflow_apikey, base_url="https://api.siliconflow.cn/v1" ) def chat(self, message: str): response = self.client.chat.completions.create( model="Qwen/Qwen3-8B", messages=[ {"role": "system", "content": "你是一个有用的助手"}, {"role": "user", "content": message} ] ) return response.choices[0].message.content def handle_ai_response(future, user_msg): """ 线程回调函数: 处理AI返回结果并按格式输出 :param future: 线程执行结果对象 :param user_msg: 用户原始输入的消息 """ try: bot_reply = future.result() print(f'>>>{{"user":"{user_msg}", "bot":"{bot_reply}"}}') except Exception as e: print(f'>>>请求失败:{str(e)}') load_dotenv() chat_client = siliconflow_api(apikey=os.getenv("ApiKey")) # 创建一个线程池对象,指定线程池最大线程数量为3 executor = ThreadPoolExecutor(max_workers=3) if __name__ == '__main__': print("[ SiliconFlow Chat Demo ]") while True: user_input = input("<<<").strip() if not user_input: continue if user_input.lower() in ["exit"]: # 优雅退出: 等待提交的任务执行完毕再退出 # executor.shutdown(wait=True) break # 提交任务到线程池 future = executor.submit(chat_client.chat, user_input) # 绑定回调函数,线程执行完毕后输出结果 future.add_done_callback(partial(handle_ai_response, user_msg=user_input))

线程池

使用concurrent_futures模块的ProcessPoolExecutor来实现进程池