【注意】最后更新于 May 1, 2020,文中内容可能已过时,请谨慎使用。
多进程的含义
进程(Process)
- 是具有一定独立功能的程序关于某个数据集合上的一次运行活动
- 是系统进行资源分配和调度的一个独立单位
多进程就是启动多个进程同时运行。
Python 多进程的优势
由于进程中GIL
的存在,Python 中多线程并不能很好的发挥多核优势,一个进程中的多个线程,同一时刻只能有一个线程运行。
对于多进程来说,每个进程都有属于自己的GIL
,所以多核处理器下,多进程的运行是不受 GIL 的影响的,多进程能更好的发挥多核的优势。
Python
的多进程整体来看是比多线程更有优势,在条件的允许的情况下,能用多进程就尽量用多进程
由于进程是系统进程资源分配和调用的一个独立单位,所以各个进程之间的数据是无法共享的。
多进程的实现
Multiprocessing
- Process(进程)
- Queue(队列)
- Semaphore(信号量)
- Pipe(管道)
- Lock(锁)
- Pool(进程池)
直接使用Process
类
在multiprocessing
中,每个进程都用一个Process
类表示
API 调用:Process([group[, target [, name [, args [, kwargs]]]]])
- target 表示调用对象,可以传入方法的名字
- args 表示被调用对象的位置参数元祖
比如 target 是函数 func, 他有两个参数 m,n, 那么 args 就传入[m, n]即可
- kwargs 表示调用对象的字典
- name 是别名,相当于给这个进程取个名字
- group 分组
以下是使用Process
类:
1
2
3
4
5
6
7
8
9
10
11
12
| # process-01.py
import multiprocessing
def process(index):
print(f'Process: {index}')
if __name__ == '__main__':
for i in range(5):
p = multiprocessing.Process(target=process, args=(i,))
p.start()
|
运行结果:
1
2
3
4
5
6
| (base) ➜ studay-spider git:(master) ✗ python3 process.py
Process: 1
Process: 2
Process: 0
Process: 3
Process: 4
|
multiprocessing
还提供了几个比较有用的方法:
- 通过
cpu_count
方法,获取当前机器 CPU 的核心数量 - 通过
active_children
方法,获取当前还在运行的所有进程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
| # process-02.py
import multiprocessing
import time
def process(index):
time.sleep(index)
print(f'Process: {index}')
if __name__ == '__main__':
for i in range(5):
p = multiprocessing.Process(target=process, args=[i])
p.start()
print(f'CPU number: {multiprocessing.cpu_count()}')
for p in multiprocessing.active_children():
print(f'Child process name: {p.name} id: {p.pid}')
print('Process Ended')
|
结果如下:
1
2
3
4
5
6
7
8
9
10
11
| (base) ➜ studay-spider git:(master) ✗ python3 process-02.py Process: 0
CPU number: 4
Child process name: Process-2 id: 44432
Child process name: Process-4 id: 44434
Child process name: Process-5 id: 44435
Child process name: Process-3 id: 44433
Process Ended
Process: 1
Process: 2
Process: 3
Process: 4
|
继承 Process
类
创建进程的方式不止一种,也可以像线程 Thread 一样来通过继承的方式创建一个进程类,
进程的基本操作在子类的 run
方法中实现即可。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| # process-03.py
from multiprocessing import Process
import time
class MyProcess(Process):
def __init__(self, loop):
Process.__init__(self)
self.loop = loop
def run(self):
for count in range(self.loop):
time.sleep(1)
print(f'Pid: {self.pid} LoopCount: {count}')
if __name__ == "__main__":
for i in range(2, 5):
p = MyProcess(i)
p.start()
|
运行结果:
1
2
3
4
5
6
7
8
9
10
| (base) ➜ studay-spider git:(master) ✗ python3 process-03.py
Pid: 45236 LoopCount: 0
Pid: 45237 LoopCount: 0
Pid: 45238 LoopCount: 0
Pid: 45236 LoopCount: 1
Pid: 45237 LoopCount: 1
Pid: 45238 LoopCount: 1
Pid: 45237 LoopCount: 2
Pid: 45238 LoopCount: 2
Pid: 45238 LoopCount: 3
|
守护进程
如果一个进程被设置为守护进程,当父进程结束后,子进程会自动被终止,可以通过设置 daemon
属性来控制是否为守护进程。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
| from multiprocessing import Process
import time
# 守护进程
class MyProcess(Process):
def __init__(self, loop):
Process.__init__(self)
self.loop = loop
def run(self):
for count in range(self.loop):
time.sleep(1)
print(f'Pid: {self.pid} LoopCount: {count}')
if __name__ == "__main__":
for i in range(2, 5):
p = MyProcess(i)
p.daemon = True
p.start()
print('Main Process ended')
|
运行结果
1
2
| (base) ➜ studay-spider git:(master) ✗ python3 process-04.py
Main Process ended
|
进程等待
让所有子进程都执行完了然后再结束,只需要加入join
方法即可,示例:
1
2
3
4
5
6
7
| if __name__ == "__main__":
for i in range(2, 5):
p = MyProcess(i)
p.daemon = True
p.start()
# 进程等待
p.join()
|
结果:
1
2
3
4
5
6
7
8
9
10
11
| (base) ➜ studay-spider git:(master) ✗ python3 process-04.py
Pid: 46609 LoopCount: 0
Pid: 46609 LoopCount: 1
Pid: 46613 LoopCount: 0
Pid: 46613 LoopCount: 1
Pid: 46613 LoopCount: 2
Pid: 46616 LoopCount: 0
Pid: 46616 LoopCount: 1
Pid: 46616 LoopCount: 2
Pid: 46616 LoopCount: 3
Main Process ended
|
改写后:
1
2
3
4
5
6
7
| if __name__ == "__main__":
for i in range(2, 5):
p = MyProcess(i)
p.daemon = True
p.start()
# 进程等待
p.join(1)
|
结果:
1
2
3
4
5
6
7
| (base) ➜ studay-spider git:(master) ✗ python3 process-04.py
Pid: 46662 LoopCount: 0
Pid: 46662 LoopCount: 1
Pid: 46663 LoopCount: 0
Pid: 46663 LoopCount: 1
Pid: 46667 LoopCount: 0
Main Process ended
|
终止进程
终止进程不止有守护进程这一种做法,通过terminate
方法来终止某个子进程,还可以通过is_alive
方法半盘进程是否还在运行,
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
| import multiprocessing
import time
# 终止进程
def process():
print('Starting')
time.sleep(5)
print('Finished')
if __name__ == '__main__':
p = multiprocessing.Process(target=process)
print('Before:', p, p.is_alive())
p.start()
print('During:', p, p.is_alive())
p.terminate()
print('Terminate:', p, p.is_alive())
p.join()
print('Joined:', p, p.is_alive())
|
结果:
1
2
3
4
5
| (base) ➜ studay-spider git:(master) ✗ python3 process-05.py
Before: <Process(Process-1, initial)> False
During: <Process(Process-1, started)> True
Terminate: <Process(Process-1, started)> True
Joined: <Process(Process-1, stopped[SIGTERM])> False
|
进程互斥锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
| from multiprocessing import Process, Lock
import time
# 进程互斥锁
看代码:
class MyProcess(Process):
def __init__(self, loop, lock):
Process.__init__(self)
self.loop = loop
self.lock = lock
def run(self):
for count in range(self.loop):
time.sleep(1)
# self.lock.acquire()
print(f'Pid: {self.pid} LoopCount: {count}')
# self.lock.release()
if __name__ == "__main__":
lock = Lock()
for i in range(2, 5):
p = MyProcess(i, lock)
p.start()
|
有时输出的结果:
1
2
3
4
5
6
7
8
| Pid: 48275 LoopCount: 0Pid: 48276 LoopCount: 0
Pid: 48277 LoopCount: 0
Pid: 48275 LoopCount: 1Pid: 48276 LoopCount: 1
Pid: 48277 LoopCount: 1
Pid: 48276 LoopCount: 2
Pid: 48277 LoopCount: 2
|
如果能保证多个进程期间的任一时间只能一个进程输出,其他进程等待,等刚才那个进程输出完毕之后另一个进程再进行输出就不会出现没有换行的现象。
去掉刚才代码中注释的两行,对其进程加锁,结果如下:
1
2
3
4
5
6
7
8
9
| Pid: 48543 LoopCount: 0
Pid: 48544 LoopCount: 0
Pid: 48545 LoopCount: 0
Pid: 48543 LoopCount: 1
Pid: 48544 LoopCount: 1
Pid: 48545 LoopCount: 1
Pid: 48544 LoopCount: 2
Pid: 48545 LoopCount: 2
Pid: 48545 LoopCount: 3
|
信号量
信号量
是进程同步过程中一个比较重要的角色,可以控制临界资源的数量,实现多个进程同时访问共享资源,限制进程的并发量
可以用 multiprocessing
库中的 Semaphore
来实现信号量:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
| from multiprocessing import Process, Semaphore, Lock, Queue
import time
buffer = Queue(10)
empty = Semaphore(2)
full = Semaphore(0)
lock = Lock()
class Consumer(Process):
def run(self):
global buffer, empty, full, lock
while True:
full.acquire()
lock.acquire()
buffer.get()
print('Consumer pop an element')
time.sleep(1)
lock.release()
empty.release()
class Producer(Process):
def run(self):
global buffer, empty, full, lock
while True:
empty.acquire()
lock.acquire()
buffer.put(1)
print('Producer append an element')
time.sleep(1)
lock.release()
full.release()
if __name__ == '__main__':
p = Producer()
c = Consumer()
p.daemon = c.daemon = True
p.start()
c.start()
p.join()
c.join()
print('Main Process Ended')
|
结果:
1
2
3
4
5
6
7
8
9
10
| Producer append an element
Producer append an element
Consumer pop an element
Consumer pop an element
Producer append an element
Producer append an element
Consumer pop an element
Consumer pop an element
...
...
|
队列
队列,即 Queue
让进程共享数据,这里的队列指的是 multiprocessing
里面的 Queue
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
| from multiprocessing import Process, Semaphore, Lock, Queue
import time
from random import random
# 队列
buffer = Queue(10)
empty = Semaphore(2)
full = Semaphore(0)
lock = Lock()
class Consumer(Process):
def run(self):
global buffer, empty, full, lock
while True:
full.acquire()
lock.acquire()
buffer.get()
print('Consumer pop an element')
time.sleep(1)
lock.release()
empty.release()
class Producer(Process):
def run(self):
global buffer, empty, full, lock
while True:
empty.acquire()
lock.acquire()
num = random()
print(f'Producer put {num}')
time.sleep(1)
lock.release()
full.release()
if __name__ == '__main__':
p = Producer()
c = Consumer()
p.daemon = c.daemon = True
p.start()
c.start()
p.join()
c.join()
print('Main Process Ended')
|
结果
1
2
| Producer put 0.2183099559744145
Producer put 0.20124931823006031
|
管道
可以理解为两个进程之间通信的通道,可以是「单向」的,即half-duplex
:一个进程负责发消息,另一个进程负责收消息。
也可以是「双向」的duplex
,即互相收发消息,默认声明 Pipe
对象是双向管道,
如果要创建单向管道,可以在初始化的时候传入 deplex
参数为 False
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
| from multiprocessing import Process, Pipe
# 管道
class Producer(Process):
def __init__(self, pipe):
Process.__init__(self)
self.pipe = pipe
def run(self):
print(f'Producer Received: {self.pipe.recv()}')
if __name__ == '__main__':
pipe = Pipe()
p = Producer(pipe[0])
c = Producer(pipe[1])
p.daemon = c.daemon = True
p.start()
c.start()
p.join()
c.join()
print('Main Process Ended')
|
进程池
假如有 10000 个任务,每个任务需要启动一个进程来执行,并且一个进程运行完毕之后要紧接着启动下一个进程,同时还需要控制进程的并发数量,不能并发太高,不然 CPU 处理不过来(如果同时运行的进程能维持在一个最高恒定值当然利用率是最高的),
那么我们该如何来实现这个需求呢?
进程池即 multiprocessing
中的 Pool
,可以提供指定数量的进程,供用户调用,当有新的请求提交到 poll 中时,
- 如果池还没有满,就会创建一个新的进程用来执行该请求
- 如果池中进程数已达到规定最大值,那么该请求就会等待直到池中有进程结束,才会创建新的进程来执行它。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
| from multiprocessing import Pool
import time
def function(index):
print(f'Start process: {index}')
time.sleep(3)
print(f'End process {index}',)
if __name__ == '__main__':
pool = Pool(processes=3)
for i in range(4):
pool.apply_async(function, args=(i,))
print('Main Process started')
pool.close()
pool.join()
print('Main Process ended')
|
结果:
1
2
3
4
5
6
7
8
9
10
11
| (base) ➜ studay-spider git:(master) ✗ python3 pool.py
Main Process started
Start process: 0
Start process: 1
Start process: 2
End process 0
End process 1
End process 2
Start process: 3
End process 3
Main Process ended
|
map
方法的使用:
- 第一个参数:是启动的进程对应的执行方法
- 第二个参数:一个可迭代对象,其中的每个元素会被传递给这个可执行方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
| from multiprocessing import Pool
import urllib.request
import urllib.error
def scrape(url):
try:
urllib.request.urlopen(url)
print(f'URL {url} Scraped')
except (urllib.error.HTTPError, urllib.error.URLError):
print(f'URL {url} not Scraped')
if __name__ == "__main__":
pool = Pool(processes=3)
urls = [
'https://www.baidu.com',
'http://www.meituan.com',
'http://blog.csdn.net/',
'http://xyzxyz.com'
]
pool.map(scrape, urls)
pool.close()
|
结果:
1
2
3
4
5
| (base) ➜ studay-spider git:(master) ✗ python3 map.py
URL http://blog.csdn.net/ Scraped
URL http://xyzxyz.com Scraped
URL https://www.baidu.com Scraped
URL http://www.meituan.com Scraped
|
The end