1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 process = multiprocessing.Process() def __init__ (self, group=None , target=None , name=None , args=( ), kwargs={}, *, daemon=None ): ''' group : 当前进程组,默认None,基本不修改 target: 表示当前需要创建子进程的函数对象 name : 当前子进程的名字,默认不会改 args : 在调用上面子进程函数的时候需要传递进去的参数 按照位置传递 kwargs: 在调用上面子进程函数的时候需要传递进去的参数 按照关键字传递 daemon:守护进程是否开启 ''' process.start() process.run() process.is_alive() process.join() process.terminate() process.close() process.daemon process.name process.pid process.exitcode process.authkey
1 2 from multiprocessing import current_processcurrent_process().pid
不同方式创建子进程 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 from multiprocessing import Processdef work (name ): print (f"{name} is starting \n" ) sleep_time = random.randint(1 , 6 ) print (f"{name} is sleeping {sleep_time} s \n" ) time.sleep(sleep_time) print (f"{name} is ending \n" ) def main_object (): process_one = Process( target=work, args=("work_1" ,) ) process_two = Process( target=work, kwargs={'name' : 'work_2' } ) process_one.start() process_two.start() if __name__ = '__main__' : main_object()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 class MyProcess (multiprocessing.Process): def __init__ (self, name ): super ().__init__() self .name = name def run (self ): print (f"{self.name} is starting \n" ) sleep_time = random.randint(1 , 6 ) print (f"{self.name} is sleeping {sleep_time} s \n" ) time.sleep(sleep_time) print (f"{self.name} is ending \n" ) def main_class (): process_one = MyProcess(name='work_1' ) process_two = MyProcess(name='work_2' ) process_one.start() process_two.start() if __name__ = '__main__' : main_class()
示例 示例 1 模拟 TCP 服务 模拟 TCP 服务端接收多个客户端请求,可启动多个客户端向服务端发送数据
file:server.py 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 import multiprocessing from socket import socket, AF_INET, SOCK_STREAM,SOL_SOCKET,SO_REUSEADDR def work (conn,addr ): while True : data_from_client = conn.recv(1024 ) if not data_from_client: conn.close() break print (f"来自于客户端 {addr} 的数据 :>>>> {data_from_client.decode()} " ) data_from_client_upper = data_from_client.decode().upper() conn.send(data_from_client_upper.encode()) def main (): server = socket(family=AF_INET, type =SOCK_STREAM) server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1 ) server.bind(('127.0.0.1' , 9696 )) server.listen(5 ) while True : conn, addr = server.accept() task = multiprocessing.Process( target=work, args=(conn,addr) ) task.start() if __name__ == '__main__' : main()
file:client.py 1 2 3 4 5 6 7 8 9 10 11 12 13 from socket import socket, AF_INET, SOCK_STREAM client = socket(family=AF_INET, type =SOCK_STREAM) client.connect(('127.0.0.1' , 9696 )) while True : msg = input ('请输入要发送的消息:' ) if not msg: continue client.send(msg.encode('utf-8' )) data_from_server = client.recv(1024 ) print (f"服务器返回的数据:{data_from_server.decode('utf-8' )} " )
示例 2 模拟并发 模拟并发执行,子进程结束后,主进程才会结束
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 from multiprocessing import Processimport random import time def timer (func ): def inner (*args, **kwargs ): start_time = time.time( ) res = func(*args, **kwargs) end_time = time.time() print (f"函数 {func.__name__} 运行时间:{end_time - start_time} 秒" ) return res return inner def work (name ): sleep_time = random.randint(1 , 4 ) print (f"{name} is starting sleeping {sleep_time} " ) time.sleep(sleep_time) print (f"{name} is ending sleeping {sleep_time} " ) @timer def main_object (): task_list = [Process( target=work, args=(f"work_{i} " ,) ) for i in range (5 )] ''' task = multiprocessing.Process( target=work, kwargs={'name': 'work_2'} ) ''' [task.start() for task in task_list] [task.join() for task in task_list] ''' 先 start(),后立即 join() (导致 串行) 全部 start(),后全部 join() (实现 并发/并行) ''' if __name__ == '__main__' : print (f"main process is starting " ) process_work_wait_main() print (f"main process is ending " )
进程间通信 队列实现进程间通信 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 from multiprocessing import Process, Queue def producer (queue ): print (f"这是来自主进程的数据 :>>>> {queue.get()} " ) queue.put("son process" ) def process_main_to_son (): queue = Queue() queue.put(f"main process" ) process_son = Process( target=producer, args=(queue,) ) process_son.start() process_son.join() print (f"这是来自子进程的数据 :>>>> {queue.get()} " ) if __name__ == '__main__' : process_main_to_son()
示例:模拟生产者消费者通信
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 import random import time from multiprocessing import JoinableQueue, Process def producer (name, food, queue ): for i in range (2 ): data = f"{name} 生产了第{i} 个{food} " time.sleep(random.randint(1 , 4 )) queue.put(data) print (f"{name} 生产了{data} " ) queue.join() def consumer (name, queue ): while True : data = queue.get() time.sleep(random.randint(1 , 4 )) print (f"{name} 消费了{data} " ) queue.task_done() def process_one (): queue = JoinableQueue() producer_dream = Process(target=producer, args=("dream" , "apple" , queue)) producer_lucy = Process(target=producer, args=("lucy" , "pear" , queue)) customer_one = Process(target=consumer, args=("customer_one" , queue)) customer_two = Process(target=consumer, args=("customer_two" , queue)) customer_one.daemon = True customer_two.daemon = True process_list = [producer_dream, producer_lucy, customer_one, customer_two] [task.start() for task in process_list] producer_dream.join() producer_lucy.join() if __name__ == '__main__' : process_one()
管道实现进程间通信 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 import random import time from multiprocessing import Pipe, Process def producer (name, conn ): left_conn, right_conn = conn right_conn.close() for i in range (2 ): data = f"当前大厨 {name} 生产出了第{i} 份!" left_conn.send(data) print (f"生产者 {name} :>>>> {data} " ) time.sleep(random.randint(1 , 4 )) left_conn.close() def customer (name, conn ): left_conn, right_conn = conn left_conn.close() while True : try : food = right_conn.recv() time.sleep(random.randint(1 , 4 )) print (f"消费者 {name} :>>>> {food} " ) except : right_conn.close() break def process_one (): left_conn, _right_conn = Pipe() ''' (<multiprocessing.connection.Connection object at 0x12533fee0>, <multiprocessing.connection.Connection object at 0x12533feb0>) ''' customer_smile = Process(target=customer, args=("smile" , (left_conn, _right_conn))) customer_smile.start() producer(name="dream" , conn=(left_conn, _right_conn)) left_conn.close() _right_conn.close() customer_smile.join() if __name__ == '__main__' : process_one()
互斥锁 [[多进程&多线程&协程-threading#互斥锁|多线程互斥锁]] 多进程的内存地址空间是相互隔离的,通常不需要锁。但在以下情况必须使用:
共享内存 (Shared Memory) :如 Value 或 Array;
外部共享资源 :如多个进程同时往同一个日志文件里写数据,如果不加锁,行与行之间可能会交织错乱。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 import multiprocessing import time def task (shared_val, lock ): for _ in range (50 ): time.sleep(0.01 ) with lock: shared_val.value += 1 print (f"子进程{multiprocessing.current_process().name} 正在运行,当前共享变量值为{shared_val.value} " ) if __name__ == '__main__' : counter = multiprocessing.Value('i' , 0 ) lock = multiprocessing.Lock() processes = [] for i in range (3 ): p = multiprocessing.Process(target=task, args=(counter, lock)) processes.append(p) p.start() for p in processes: p.join() print (f"主进程运行结束,共享变量值为{counter.value} " )
计算密集型 任务使用多进程 可以充分利用多核 CPU 的优势,而 IO 密集型 任务使用多线程 能够更好地处理 IO 操作,避免频繁的进程切换开销。根据任务的特性选择合适的并发方式可以有效提高任务的执行效率。
信号量 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 import multiprocessingimport timeimport osdef hardware_access_task (sem, i ): print (f"进程 {os.getpid()} (任务{i} ) 正在排队..." ) with sem: print (f"==> 进程 {os.getpid()} 成功抢占硬件访问权!" ) time.sleep(2 ) print (f"<== 进程 {os.getpid()} 任务完成,退出。" ) if __name__ == "__main__" : sem = multiprocessing.Semaphore(2 ) processes = [] for i in range (5 ): p = multiprocessing.Process(target=hardware_access_task, args=(sem, i)) processes.append(p) p.start() for p in processes: p.join()
进程池 [[多进程&多线程&协程-threading#线程池|池化技术]]
适用场景:CPU 密集型任务,如大规模数据加密、密码哈希爆破、流量包特征深度解析。
由于 Python 存在 GIL(全局解释器锁) ,在同一个进程内,同一时间只有一个线程能执行字节码。因此,对于需要消耗大量计算资源的任务,必须使用多进程 来利用多核 CPU。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 import osimport timefrom concurrent.futures import ProcessPoolExecutordef heavy_computation (data ): pid = os.getpid() print (f"进程 {pid} 正在计算数据: {data} " ) start_time = time.time() count = 0 for i in range (10 **7 ): count += i return f"PID {pid} 计算完成,耗时 {time.time() - start_time:.2 f} s" def process_pool_demo (): tasks = ["Task_A" , "Task_B" , "Task_C" , "Task_D" ] with ProcessPoolExecutor(max_workers=os.cpu_count()) as executor: print (f"主进程 ID: {os.getpid()} ,准备分发任务..." ) results = executor.map (heavy_computation, tasks) print ("\n--- 计算结果 ---" ) for res in results: print (res) if __name__ == "__main__" : process_pool_demo()
PS:
由于 Windows 没有 fork,多处理模块启动一个新的 Python 进程并导入调用模块。如果在导入时调用 Process(),那么这将启动无限继承的新进程(或直到机器耗尽资源)。这是隐藏对 Process()内部调用的原,使用 if __name__ == "__main__",这个 if 语句中的语句将不会在导入时被调用。