外观
多进程和多线程
多进程
创建进程有多种方法,主要包括使用 multiprocessing 包下的 Process 类、multiprocessing 包下的 multiprocessing 类以及使用进程池 Pool 类。让我们来简单介绍一下这三种方法:
- 使用
multiprocessing包下的Process类:这种方法是直接使用Process类来创建进程。你可以实例化Process类,并传入要执行的函数和函数所需的参数,然后调用start()方法启动进程。 - 使用
multiprocessing包下的multiprocessing类:这种方法是使用multiprocessing类的Process方法来创建进程。你可以调用multiprocessing.Process()方法,并传入要执行的函数和函数所需的参数,然后调用start()方法启动进程。 - 使用进程池
Pool类:这种方法是使用multiprocessing包下的Pool类来创建进程池,然后通过Pool类的apply_async()或map()方法来异步或同步地执行多个函数。进程池会管理一组工作进程,可以根据需要动态创建、重用和销毁进程。
使用Process类
Process 类是 multiprocessing 包中用于创建进程的类,它提供了一些常用的属性和方法:
is_alive(): 判断进程实例是否还在执行。join([timeout]): 等待进程实例执行结束,或等待多少秒(可选参数 timeout)。start(): 启动进程实例。run(): 若进程对象没有给定target参数,在调用start()方法时将执行该对象中的run()方法。terminate(): 不管任务是否完成,立即终止进程。name: 当前进程实例的别名,默认是Process-N,其中 N 是一个自动生成的数字。pid: 当前进程的进程 ID(PID)值。
可以通过实例化Process类,将target参数赋值为进程要运行的函数,然后调用start方法启动线程。
例:
# 使用Process类创建多线程
import os
import time
from multiprocessing import Process
def fun(args):
print('子进程开始,子进程的参数是:', args)
time.sleep(1)
print('子进程结束')
if __name__ == '__main__':
print('主进程开始执行')
print('主进程的pid:', os.getpid())
process1 = Process(target=fun, args=('我是子线程的参数',)) # Process类的构造方法可加target参数 意为线程执行的函数 还可以传递一个元组 意为传递到函数的参数
process1.start() # 开启子进程运行结果:
主线程开始执行
主线程的pid: 95552
子线程开始,子进程的参数是: 我是子线程的参数
子线程结束还可以继承Process类,然后实现run方法,最后调用start方法启动进程。
例:
# 继承至Process类
import os
import time
from multiprocessing import Process
class MyProcess(Process):
def __init__(self, name='', interval=0):
super().__init__()
self.interval = interval
if name:
self.name = name
def run(self):
print('子进程%s开始执行,它的名字为%s,它的父进程是%s' % (os.getpid(), self.name, os.getppid()))
time.sleep(self.interval)
print('子进程%s结束' % (os.getpid(),))
if __name__ == '__main__':
print('主进程开始,它的pid为%s' % os.getpid())
print('主进程的名称为%s' % os.name)
myProcess1 = MyProcess(interval=1)
myProcess2 = MyProcess('子进程2', interval=2)
myProcess1.start()
myProcess2.start()运行结果:
主进程开始,它的pid为96316
主进程的名称为nt
子进程98308开始执行,它的名字为MyProcess-1,它的父进程是96316
子进程96364开始执行,它的名字为子进程2,它的父进程是96316
子进程98308结束
子进程96364结束使用进程池Pool
若多个进程的目标函数是相同的,就可以直接使用进程池而不用实例化多个Process对象。
例:
# 使用进程池Pool创建进程
import os
import time
from multiprocessing import Pool
def task(name):
print('子进程%s开始执行任务%s' % (os.getpid(), name))
time.sleep(1)
if __name__ == '__main__':
print('父进程%s开始' % (os.getpid(),))
p = Pool(3) # 创建进程池 最多可以同时执行3个任务
for i in range(10):
p.apply_async(task,args=(i,)) # 使用非阻塞方式调用task函数
p.close() # 关闭进程池
p.join() # 主进程等待所有子进程结束再开始执行
print('所有子进程结束')
# apply() 使用阻塞方式调用函数
# terminate() 不管任务是否完成 立即终止运行结果:
父进程83920开始
子进程104952开始执行任务0
子进程106376开始执行任务1
子进程108372开始执行任务2
子进程104952开始执行任务3
子进程106376开始执行任务4
子进程108372开始执行任务5
子进程104952开始执行任务6
子进程108372开始执行任务7
子进程106376开始执行任务8
子进程104952开始执行任务9
所有子进程结束进程间通信
多线程之间的通信常常需要使用队列来进行数据的共享,Python 提供了 queue 模块来实现线程安全的队列操作。队列的特点是先进先出,即在队尾添加元素,在队头取出元素。
下面是队列的常用方法:
qsize(): 返回队列中剩余的消息数量。empty(): 若队列为空则返回True,否则返回False。full(): 若队列已满则返回True,否则返回False。put(item[, block[, timeout]]): 在队列末尾添加一个新元素。如果block参数使用默认值且没有设置等待时间timeout,若队列已满则会阻塞程序,直到队列有空间放入消息为止;若设置了timeout,则在超出等待时长后会抛出Queue.Full异常。若block参数为False且消息队列已满,则会立即抛出Queue.Full异常。put_nowait(item): 相当于put(item, False)。get([block[, timeout]]): 取出队列的第一个元素。如果block参数使用默认值且没有设置等待时间timeout,若队列为空则会阻塞程序,直到队列读取到消息为止;若设置了timeout,则在超出等待时长后会抛出Queue.Empty异常。若block参数为False且消息队列为空,则会立即抛出Queue.Empty异常。get_nowait(): 相当于get(False)。
例:
from multiprocessing import Queue
if __name__ == '__main__':
q = Queue(3) # 创建一个可以容纳三个元素的队列
print('队列是否为空:%s' % q.empty())
q.put('消息1')
q.put('消息2')
print('队列是否满了:%s' % q.full())
q.put('消息3')
print('队列是否满了:%s' % q.full())
# 下面尝试添加第四个元素
try:
q.put('消息4', True, 2) # 等待两秒
except ValueError:
print('消息已满 当前消息数量为%s' % q.qsize())
try:
q.put_nowait('消息4') # 等待两秒
except ValueError:
print('消息已满 当前消息数量为%s' % q.qsize())
if not q.empty():
print('--读取消息--')
for i in range(q.qsize()):
print(q.get())
# 此时取出全部消息 队列为空 再添加时就不会报错
if not q.empty():
q.put('消息4')
print('现在的队列长度为%s' % q.qsize())运行结果:
队列是否为空:True
队列是否满了:False
队列是否满了:True
Traceback (most recent call last):
File "A:\pythonProject\src\MultiprocessAndMultithreads\queue\useQueue.py", line 26, in <module>
q.put('消息4', True, 2) # 等待两秒
^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\Qi\AppData\Local\Programs\Python\Python311\Lib\multiprocessing\queues.py", line 90, in put
raise Full
queue.Full多线程
创建多线程可以使用 Thread 类。下面是 Thread 类的构造函数参数:
group: 值为None,为将来版本保留。target: 线程执行时运行的目标函数。如果为空,则在线程启动时会执行run方法。name: 线程的别名,默认为Thread-N,其中N是自动生成的数字。args: 传递给target函数的参数列表。kwargs: 传递给target函数的字典列表。
例:
import threading
import time
from threading import Thread
def task():
for i in range(3):
time.sleep(1)
print('当前线程的名字:%s' % threading.current_thread().name)
if __name__ == '__main__':
print('主线程开启')
threads = [Thread(target=task) for i in range(4)]
for t in threads:
t.start()
for t in threads:
t.join()
print('主线程结束')运行结果:
主线程开启
当前线程的名字:Thread-1 (task)
当前线程的名字:Thread-3 (task)
当前线程的名字:Thread-2 (task)
当前线程的名字:Thread-4 (task)
当前线程的名字:Thread-3 (task)
当前线程的名字:Thread-2 (task)
当前线程的名字:Thread-4 (task)
当前线程的名字:Thread-1 (task)
当前线程的名字:Thread-3 (task)
当前线程的名字:Thread-4 (task)
当前线程的名字:Thread-2 (task)
当前线程的名字:Thread-1 (task)
主线程结束锁
由于进程之间数据是互通的,若在一个线程中对数据的读取或更改还未完成时另外一个线程对这个数据进行更改的话会出现错误,如车站购票,购买的过程是先判断是否票数是否大于0,然后再将票数减一。若这两个线程是同时判断票数是否大于0,而此时票数只有一,那么这两个线程都会将票数减一,结果就是票数变成了负数。因此为了在一个线程尚未完成对数据的操作之前其他线程不能操作该数据,就会将该数据锁起来,这就是锁机制。
使用锁需要导入multiprocessing模块下的Mutex类,需要上锁时使用acquire()方法锁住资源,需要解锁时使用release()方法解锁资源。
例:
import random
import threading
import time
from threading import Thread, Lock
n = 15 # 代表总的资源
def task():
global n
mutex.acquire()
if n > 0:
time.sleep(random.random() * 0.1)
n -= 1
print('子线程%s获取一次资源, 剩余资源%d' % (threading.current_thread().name, n))
mutex.release()
if __name__ == '__main__':
mutex = Lock()
print('--主线程开始--')
# 共有20个线程取总计15个资源
threadList = [Thread(target=task, name='Thread-' + str(i)) for i in range(20)]
for i in threadList:
i.start()
for i in threadList:
i.join()
print('--所有子线程结束--')运行结果:
--主线程开始--
子线程Thread-0获取一次资源, 剩余资源14
子线程Thread-1获取一次资源, 剩余资源13
子线程Thread-2获取一次资源, 剩余资源12
子线程Thread-3获取一次资源, 剩余资源11
子线程Thread-4获取一次资源, 剩余资源10
子线程Thread-5获取一次资源, 剩余资源9
子线程Thread-6获取一次资源, 剩余资源8
子线程Thread-7获取一次资源, 剩余资源7
子线程Thread-8获取一次资源, 剩余资源6
子线程Thread-9获取一次资源, 剩余资源5
子线程Thread-10获取一次资源, 剩余资源4
子线程Thread-11获取一次资源, 剩余资源3
子线程Thread-12获取一次资源, 剩余资源2
子线程Thread-13获取一次资源, 剩余资源1
子线程Thread-14获取一次资源, 剩余资源0
--所有子线程结束--而将上锁和解锁的代码注释掉之后的运行结果:
--主线程开始--
子线程Thread-19获取一次资源, 剩余资源14
子线程Thread-4获取一次资源, 剩余资源13
子线程Thread-16获取一次资源, 剩余资源12
子线程Thread-14获取一次资源, 剩余资源11
子线程Thread-3获取一次资源, 剩余资源10
子线程Thread-17获取一次资源, 剩余资源9
子线程Thread-1获取一次资源, 剩余资源8
子线程Thread-12获取一次资源, 剩余资源7
子线程Thread-8获取一次资源, 剩余资源6
子线程Thread-2获取一次资源, 剩余资源5
子线程Thread-10获取一次资源, 剩余资源4
子线程Thread-7获取一次资源, 剩余资源3
子线程Thread-11获取一次资源, 剩余资源2
子线程Thread-15获取一次资源, 剩余资源1
子线程Thread-5获取一次资源, 剩余资源0
子线程Thread-6获取一次资源, 剩余资源-1
子线程Thread-0获取一次资源, 剩余资源-2
子线程Thread-18获取一次资源, 剩余资源-3
子线程Thread-13获取一次资源, 剩余资源-4
子线程Thread-9获取一次资源, 剩余资源-5
--所有子线程结束--线程间通信
线程的数据是共享的可以直接读取,如上面的代码中使用global关键字修饰的全局变量n就是在20个子线程中共享的。除此之外,也可以使用队列通信。
例:
# 使用queue模块的Queue类实现多线程通信 模拟生产者与消费者模式
from threading import Thread
from queue import Queue
import time, random
class Producer(Thread):
def __init__(self, name, queue1):
super().__init__(name=name)
self.queue = queue1
def run(self) -> None:
for i in range(5):
time.sleep(random.random())
self.queue.put(i)
print('%s将产品%d放入队列中' % (self.name, i))
class Consumer(Thread):
def __init__(self, name, queue1):
super().__init__(name=name)
self.queue = queue1
def run(self) -> None:
for i in range(5):
time.sleep(random.random())
value = self.queue.get()
print('%s将产品%d从队列中取出' % (self.name, value))
if __name__ == '__main__':
print('--主线程开始--')
queue = Queue()
p = Producer('生产者', queue)
p.start()
time.sleep(random.random())
c = Consumer('消费者', queue)
c.start()
p.join()
c.join()
print('--主线程结束--')运行结果:
--主线程开始--
生产者将产品0放入队列中
消费者将产品0从队列中取出
生产者将产品1放入队列中
消费者将产品1从队列中取出
生产者将产品2放入队列中
消费者将产品2从队列中取出
生产者将产品3放入队列中
生产者将产品4放入队列中
消费者将产品3从队列中取出
消费者将产品4从队列中取出
--主线程结束--