和多进程的操作基本相同[[多进程&多线程&协程-multiprocessing]]
1 2 3
| from threading import current_thread, active_count current_process().name active_count()
|
不同方式创建子线程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| import random import time from threading import Thread def timer(func): def inner(*args, **kwargs): start_time = time.time() func(*args, **kwargs) end_time = time.time() print(f"{func.__name__} 运行时间:{end_time - start_time}") return inner
def work_thread(name): sleep_time = random.randint(1, 6) print(f"{name} is start sleeping {sleep_time}") time.sleep(sleep_time) print(f"{name} is end sleeping ") @timer def process_thread_work(): task_list = [Thread(target=work_thread, args=(i,)) for i in range(4)] [task.start() for task in task_list] [task.join() for task in task_list] if __name__ == '__main__': print(f"main process start .... ") process_thread_work() print(f"main process end .... ")
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| import random import time from threading import Thread def timer(func): def inner(*args, **kwargs): start_time = time.time() func(*args, **kwargs) end_time = time.time() print(f"{func.__name__} 运行时间:{end_time - start_time}") return inner class MyThread(Thread): def __init__(self, input_name): super().__init__() self.name = input_name def run(self): sleep_time = random.randint(1, 6) print(f"{self.name} is start sleeping {sleep_time}") time.sleep(sleep_time) print(f"{self.name} is end sleeping ") @timer def my_process_thread(): task_list = [MyThread(input_name=i) for i in range(5)] [task.start() for task in task_list] [task.join() for task in task_list] if __name__ == '__main__': print(f"main process start .... ") my_process_thread() print(f"main process end .... ")
|
示例 TCP 多线程并发
TCP 多线程并发,只需要将 multiprocessing/Process 替换为 threading/Thread 即可。
[[多进程&多线程&协程-multiprocessing#示例1 模拟 TCP 服务|TCP 多进程并发]]
互斥锁
解决子线程之间对共享资源的访问冲突问题,保证在同一时刻只有一个线程在访问共享资源,但多进程之间也存在这种问题(多个进程共享同一块内存)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
| import threading import time import random
counter = 0
lock = threading.Lock() def task(thread_name): global counter for i in range(3): time.sleep(random.uniform(0.1, 0.3)) print(f"--- [{thread_name}] 尝试获取锁...") lock.acquire() try: print(f"[{thread_name}] 成功进入临界区!当前值: {counter}") temp = counter time.sleep(0.2) counter = temp + 1 print(f"[{thread_name}] 修改完毕,新值: {counter}") except Exception as e: print(f"[{thread_name}] 运行出错: {e}") finally: lock.release() print(f"[{thread_name}] 已手动释放锁。\n") """ # 使用 with lock 替代 acquire/release,能自动处理异常释放锁 with lock: print(f"[{thread_name}] 已获得锁。当前 counter 值: {counter}") # 模拟临界区内的操作耗时 temp = counter time.sleep(0.1) counter = temp + 1 print(f"[{thread_name}] 修改完成。新 counter 值: {counter}") # 退出 with 代码块后,锁会自动释放 print(f"[{thread_name}] 释放了锁。") """ if __name__ == "__main__": threads = [] for i in range(3): t = threading.Thread(target=task, args=(f"Thread-{i + 1}",)) threads.append(t) t.start() for t in threads: t.join() print(f"所有任务完成,最终计数: {counter}")
|
递归锁
在处理复杂的系统架构(尤其是包含大量函数嵌套调用)时,你可能会遇到“同一个线程多次请求同一把锁”的情况。这时,普通的 Lock 会导致程序卡死(死锁),而 递归锁(RLock, Reentrant Lock) 就是专门为此设计的。
一种特殊的互斥锁,它允许同一个线程多次获得(acquire)同一把锁,而不会产生死锁。
- 计数器机制:在递归锁内部维护着一个
owner(当前持有锁的线程 ID)和一个 counter(嵌套层数)。
- 规则:
- 如果一个线程已经持有了锁,它可以再次调用
acquire(),计数器随之加 1。
- 每次
acquire() 必须对应一个 release()。
- 只有当计数器归零时,锁才会被真正释放,其他线程才能获取。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| import threading
lock = threading.Lock()
def function_A(): lock.acquire() print("Function A 拿到锁了") function_B() lock.release()
def function_B(): lock.acquire() print("Function B 拿到锁了") lock.release()
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
| import threading import time
rlock = threading.RLock() class SecurityScanner: def __init__(self): self.vuln_count = 0 def scan_port(self): """子模块:扫描端口""" rlock.acquire() try: print(f"[{threading.current_thread().name}] 正在扫描端口... (递归计数: 2)") self.vuln_count += 1 time.sleep(0.1) finally: rlock.release() print(f"[{threading.current_thread().name}] 子模块释放锁") def full_scan(self): """主模块:执行全项扫描,内部调用子模块""" print(f"\n--- [{threading.current_thread().name}] 启动全项扫描 ---") rlock.acquire() try: print(f"[{threading.current_thread().name}] 进入主扫描逻辑 (递归计数: 1)") self.scan_port() print(f"[{threading.current_thread().name}] 完成子项扫描,准备退出主逻辑") finally: rlock.release() print(f"[{threading.current_thread().name}] 主逻辑释放锁,其他线程现在可以进场了") if __name__ == "__main__": scanner = SecurityScanner() t1 = threading.Thread(target=scanner.full_scan, name="Scanner-1") t2 = threading.Thread(target=scanner.full_scan, name="Scanner-2") t1.start() t2.start() t1.join() t2.join() print(f"最终扫描发现漏洞总数: {scanner.vuln_count}")
|
信号量
与递归锁不同的是递归锁允许同时访问的任务数为 1 (但允许同线程重入),信号量可以自定义允许同时访问的任务数。[[多进程&多线程&协程-multiprocessing#信号量|多进程也有信号量]]
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| import threading import time import random
semaphore = threading.Semaphore(3)
def scan_vulnerability(thread_name): print(f"[*] [{thread_name}] 正在排队等待进入扫描通道...") semaphore.acquire() try: print(f"[{thread_name}] 获得许可,开始扫描目标任务!") duration = random.uniform(1, 3) time.sleep(duration) print(f"[{thread_name}] 扫描完成,用时 {duration:.2f}s") finally: print(f"[{thread_name}] 离开通道,释放许可。") semaphore.release()
if __name__ == "__main__": threads = [] for i in range(8): t = threading.Thread(target=scan_vulnerability, args=(f"Thread-{i+1}",)) threads.append(t) t.start()
for t in threads: t.join()
print("\n--- 所有扫描任务已结束 ---")
|
事件
“锁”是为了争夺资源,那么“事件”就是为了同步节奏。像一个“发令枪”:多个线程可以同时停下来等待一个信号,一旦信号发出,所有等待的线程都会同时开始运行。
Event 拥有一个全局标志位(Flag),初始值为 False。
四个核心方法:
wait(): 线程进入阻塞状态,等待标志位变为 True。如果标志位已经是 True,则直接通过。
set(): 将标志位设置为 True,并唤醒所有正在等待的线程。
clear(): 将标志位重置为 False。
is_set(): 查询当前标志位的状态。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| import threading import time
loading_event = threading.Event()
def scan_task(node_id): print(f"[*] 扫描节点-{node_id} 已就绪,等待特征库加载...") loading_event.wait() print(f"[Event Triggered] 节点-{node_id} 收到信号,开始扫描目标!")
def prepare_data(): print("\n[Admin] 正在从服务器下载最新的漏洞特征库...") time.sleep(3) print("[Admin] 特征库加载完毕!") loading_event.set()
if __name__ == "__main__": for i in range(3): threading.Thread(target=scan_task, args=(i,)).start()
prepare_data()
|
线程池
频繁地创建和销毁进程/线程会带来巨大的系统开销(上下文切换、内存分配等)。为此引入了池化技术(Pooling)。
池化技术的核心思想是:空间换时间,循环利用。
- 预创建:程序启动时先创建好固定数量的进程/线程。
- 任务队列:将任务丢进池子的队列里。
- 自动调度:池子里的“工人”谁空闲了,谁就去领任务做,做完后不销毁,直接等下一个任务。
适用场景:IO 密集型任务,如爬虫、端口扫描、API 请求、数据库读写。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| import time from concurrent.futures import ThreadPoolExecutor, as_completed
def scan_port(ip, port): print(f"[+] 正在扫描 {ip}:{port}...") time.sleep(1) return f"结果: {ip}:{port} 开放"
def thread_pool_demo(): hosts = ["192.168.1.1", "192.168.1.2", "192.168.1.3"] ports = [80, 443, 8080]
with ThreadPoolExecutor(max_workers=4) as executor:
print("--- 使用 map 批量提交 ---") results = executor.map(lambda p: scan_port("127.0.0.1", p), ports) for res in results: print(res)
print("\n--- 使用 submit 提交 ---") future_tasks = [executor.submit(scan_port, host, 80) for host in hosts]
for future in as_completed(future_tasks): try: data = future.result() print(f"任务完成: {data}") except Exception as e: print(f"异常: {e}")
if __name__ == "__main__": thread_pool_demo()
|
混合模式:在某些复杂的安全系统中,可以结合进程池和线程池使用。例如:使用进程池分配不同的流量包文件,在每个进程内部使用线程池并行处理该文件中的不同流。
Future 对象的作用:future 允许查询任务状态(running(), done())以及取消尚未开始的任务(cancel())。在编写具有交互功能的扫描器时,对于随时停止扫描非常有用。
内存安全性:在进程池中,由于数据是深拷贝到子进程的,修改全局变量不会影响主进程,这在处理敏感数据时是一种天然的隔离保护。