Python3 - 进程与线程

1 多进程

multiprocessing包中的Process类提供了多进程支持。

class multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)。

其中,target为子进程所要执行函数的名字,name为子进程名字,args与kwargs为要传入到函数的参数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from multiprocessing import Process
import os, time

def run_proc(name):
# os.getpid()获取该进程的进程号,os.getppid()获取该进程的父进程的进程号。
print("Run child process %s (%s). Its parent process is %s" % (name, os.getpid(), os.getppid()))
time.sleep(3)


if __name__ == "__main__":
print("Parent process %s." % os.getpid())

p = Process(target = run_proc, args = ("test",))
print("Child process will start.")
p.start()
# 等待子进程结束。
p.join()

print("Child process end.")

创建子进程时,只需要传入一个执行函数和这个函数的参数构成的tuple即可,返回一个Process实例。然后用Process实例的start()方法启动。join()方法等待子进程结束后再继续往下运行(也即会产生阻塞)。

对于daemon参数,它的默认值会继承自父进程(一般情况下,父进程的daemon参数为False)。如果将子进程的daemon参数置为True,则当父进程退出时,子进程也会跟着退出(即使子进程的任务还未完成)。如果将子进程的daemon参数置为False,则当父进程退出时,子进程仍然会继续执行,直至任务结束或者shell里ctrl+c结束这个子进程。

另外,对于daemon参数为True的进程,不能再创建其子进程。

注意:daemon参数为True的进程与daemon进程是两回事。daemon参数为True的进程仍然是一个普通进程。 另外,要实现一些复杂的功能,还要用到signal,比如避免出现孤儿进程和僵尸进程等。

2 进程池

multiprocess包中的Pool类提供了进程池支持。

class multiprocessing.pool.Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])。

其中,process为子进程数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
from multiprocessing import Pool
import os, time

def long_time_task(name):
print("Run task %s (%s). Its parent process is %s" % (name, os.getpid(), os.getppid()))
time.sleep(3)
print("Task %s end ..." % name)



if __name__=="__main__":
print("Parent process %s." % os.getpid())
p = Pool(4)

for i in range(5):
p.apply_async(long_time_task, args = (i,))

print("Waiting for all subprocess done ...")
p.close()
p.join()
print("All subprocess done.")
  • 对Pool对象调用join()方法会等待所有子进程执行完毕,调用join()之前必须先调用close(),调用close()之后就不能继续添加新的Process了。

  • task 0,1,2,3是立刻执行的,而task 4要等待前面某个task完成后才执行,这是因为这里将Pool的大小设置为4,因此,最多能同时执行4个进程。

  • 这里的进程池实例调用的是apply_async来启动子进程的,它是一种异步启动,非阻塞。也就是说,在启动一个子进程后,不需要等到这个子进程结束了才启动下一个子进程或者执行父进程中后面的语句,而是一种同时或者说并发进行的。如果这里用apply,则它是一种阻塞型的子进程启动,只有当前面一个子进程结束后才能启动下一个子进程,当所有的子进程结束后,才能继续执行父进程后面的语句。

3 外部子进程

Python中提供了subprocess模块,用于调用一个外部子进程。这个类似于Linux下提供的exec函数族接口。

1
2
3
4
5
6
import subprocess

# 相当于是在终端执行了 nslookup www.python.org 命令。
r = subprocess.run(["ping", "-c 3", "-t 3", "baidu.com"], stdout = subprocess.DEVNULL)

print("Exit code:", r.returncode)

4 进程间通信

Python的multiprocessing包封装了底层的机制,提供了Queue类、Pipes类等多种方式来交换数据。

我们以Queue为例,在父进程中创建两个子进程,一个往Queue里写数据,一个从Queue里读数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
from multiprocessing import Process, Queue
import os, time

def write(q):
print("Process to write: %s" % os.getpid())

for value in ['A', 'B', 'C']:
print("Put %s to queue..." % value)
q.put(value)
time.sleep(3)

def read(q):
print("Process to read: %s" % os.getpid())

while True:
value = q.get(True)
print("Get %s from queue..." % value)



if __name__ == "__main__":
# 父进程建立一个队列。
q = Queue()

# 分别创建读写子进程,并将队列分别传给它们。
pw = Process(target = write, args = (q,))
pr = Process(target = read, args = (q,))

# 启动写进程。
pw.start()
# 启动读进程。
pr.start()

# 等待写进程结束。
pw.join()
# pr进程里是死循环,无法等待其结束,只能强行终止
pr.terminate()

5 多线程

threading模块中的Thread类,用于多线程的创建和管理。

class threading.Thread(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)。

其中,target为子进程所要执行函数的名字,name为子进程名字,args与kwargs为要传入到函数的参数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import time, threading

def loop(m):
print("thread %s is running..." % threading.current_thread().name)
n = 0

while n < m:
n = n + 1
print("thread %s >>> %s" % (threading.current_thread().name, n))

print("thread %s ended." % threading.current_thread().name)



print("thread %s is running..." % threading.current_thread().name)

# 创建线程实例。
t = threading.Thread(target = loop, name = "LoopThread", args = (5,))
t.start()
t.join()

print("thread %s ended." % threading.current_thread().name)
  • 由于任何进程默认就会启动一个线程,我们把该线程称为主线程,主线程又可以启动新的线程。主线程实例的名字叫MainThread,子线程的名字在创建时指定。
  • threading.current_thread()函数可以返回当前线程的名字。
  • 同多进程类似,用线程实例的start()方法来启动,用join()方法来等待结束。
  • 对于daemon参数,它的默认值会继承自主线程(一般情况下,主线程的daemon参数为False)。如果将子线程的daemon参数置为True,则当主线程退出时,子线程也会跟着退出(即使子线程的任务还未完成)。如果子线程的daemon参数为False,则当主线程退出时,子线程仍然继续执行,直至任务执行完成或者进程退出(shell里ctrl+c)。 > 注意:要实现一些复杂的功能,还要用到signal(多线程是不会出现孤儿和僵尸的)。

6 线程同步

多线程和多进程最大的不同在于,多进程中,同一个变量,各自有一份拷贝存在于每个进程中,互不影响,而多线程中,所有变量都由所有线程共享,所以,任何一个变量都可以被任何一个线程修改,因此,线程之间共享数据最大的危险在于多个线程同时改一个变量,把内容给改乱了。

Python中的threading模块中有两个类专门用于线程的同步。一个是Lock类,在访问互斥资源前上锁,访问完成后释放锁。另一个是Condition类。

6.1 Lock类

threading模块中提供了Lock类,用以对共享资源访问前加锁,访问结束后解锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import time, threading

balance = 0
# 创建一个Lock实例。
lock = threading.Lock()

def change_it(n):
global balance

balance = balance + n
print("plus %s" %n)
balance = balance - n
print("minus %s" %n)

def run_thread(n):
for i in range(10):
# 加锁。
lock.acquire()
change_it(n)
# 解锁。
lock.release()



t1 = threading.Thread(target = run_thread, args = (5,))
t2 = threading.Thread(target = run_thread, args = (8,))

t1.start()
t2.start()

t1.join()
t2.join()

print(balance)

6.2 Condition类

threading模块提供了Condition类,类似于Linux C中的Condition条件锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import threading
import time, random
from queue import Queue

conditionLock = threading.Condition()
q = Queue(10)

def producer():
while True:
time.sleep(1)
conditionLock.acquire()

while q.full():
conditionLock.wait()

time.sleep(1)
num = random.randint(1, 10)
print("produce a product: %d" % num)
q.put(num)

conditionLock.notify()
conditionLock.release()

def consumer():
while True:
time.sleep(3)
conditionLock.acquire()

while q.empty():
conditionLock.wait()

time.sleep(2)
print("consume a product: %d" % q.get())

conditionLock.notify()
conditionLock.release()


t1 = threading.Thread(target = producer)
t2 = threading.Thread(target = consumer)

t1.start()
t2.start()

t1.join()
t2.join()
  • 这里的queue模块提供了一个队列类Queue,专为线程同步使用。对于进程同步,可以使用multiprocess 中的 Queue类。
  • wait([timeout]):线程挂起,直到收到一个notify通知或者notify_all通知或者超时(可选的,浮点数,单位是秒s)才会被唤醒继续运行。wait()必须在已获得Lock前提下才能调用,否则会触发RuntimeError。调用wait()会释放Lock,直至该线程被Notify()、NotifyAll()或者超时线程又重新获得Lock.
  • notify(n=1):通知其他线程,那些挂起的线程接到这个通知之后会开始运行,默认是通知一个正等待该condition的线程,最多则唤醒n个等待的线程。notify()必须在已获得Lock前提下才能调用,否则会触发RuntimeError。notify()不会主动释放Lock。
  • notifyAll():如果wait状态线程比较多,notifyAll的作用就是通知所有线程(这个一般用得少)。

7 GIL

GIL (Global Interpreter Lock) 是 CPython 解释器用来防止多个线程同时执行的锁。之所以需要这把锁,是因为 CPython 的内存管理并不是线程安全的

任何 Python 线程执行前,必须先获得 GIL 锁以及线程内部需要的锁。然后每执行一定字节码后,解释器就自动释放 GIL 锁,让别的线程有机会执行。

如果某个线程内部获得了锁,但并没有获得 GIL 锁,这个时候就会浪费 CPU 时间,导致 CPU 利用率低下。有些情况下,CPython 中使用了多线程的程序比使用单线程的程序执行甚至还慢。

Python 虽然不能利用多线程实现多核任务,但可以通过多进程实现多核任务。多个 Python 进程有各自独立的 GIL 锁,互不影响。

为了弥补 thread 模块因为 GIL 而低效的缺陷,因而出现了 multiprocessing 模块。

另外,如果对并行计算性能较高,可以考虑把核心部分用 C 实现,然后编译成库文件,并在 Python 里调用。

8 Thread-Local

有时候,一个线程的某个变量会在很多个函数中使用,如果将这个变量像普通函数那样传递,会显得很麻烦。

threading模块中定义了一个local类,它专门用于存储所有线程的变量,即使多个线程的变量名相同,但是它们的值却因它们属于不同的线程而不同。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import threading

# 建立一个local实例。
local_school = threading.local()

def process_student():
# 不需要像一般函数那样传递参数了,直接从local实例中取属于本线程的参数即可。
std = local_school.student
print("Hello, %s (in %s)" % (std, threading.current_thread().name))

def process_thread(name):
# 虽然这两个线程的变量名相同,但是local_school.student是不同的。
local_school.student = name
process_student()



t1 = threading.Thread(target = process_thread, args = ("Alice",), name = "Thread-A")
t2 = threading.Thread(target = process_thread, args = ("Bob",), name = "Thread-B")

t1.start()
t2.start()

t1.join()
t2.join()

9 分布式进程

multiprocessing包中的manager模块中的BaseManager类提供了分布式进程的创建功能。

在实现过程中,可以分别将master和worker放到两台通过互联网连接的主机上运行。

master:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import random, time, queue
from multiprocessing.managers import BaseManager

# 定义一个类,继承自BaseManager。
class QueueManager(BaseManager):
pass


# 定义两个队列,一个用于推送任务,另一个用于存储获取的结果。
task_queue = queue.Queue()
result_queue = queue.Queue()

# 将这两个队列注册到新建的类中,从而使得这个类的实例可以调用这两个队列。
# 实质上,相当于是将两个关于队列的函数注册到新建的类中。第一个参数为函数的名字,第二个参数为函数对象。
QueueManager.register("get_task_queue", callable = lambda: task_queue)
QueueManager.register("get_result_queue", callable = lambda: result_queue)

# 创建新建类的实例。
# address为一个tuple,第一个元素为ip地址,第二个元素为端口号。authkey为验证码。
manager = QueueManager(address = ("", 5000), authkey = b"abc")

# 启动这个实例。
manager.start()

# 分别通过实例获取这两个队列。
task = manager.get_task_queue()
result = manager.get_result_queue()

# 将任务发到task队列中。
for i in range(10):
n = random.randint(0, 1000)
print("Put task %d..." % n)
task.put(n)

print("Try get results...")

# 等待从结果队列result中获取结果。
for i in range(10):
r = result.get(timeout = 10)
print("Result: %s" % r)

# 关闭这个实例。
manager.shutdown()
print("master exit.")

worker:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import time, queue
from multiprocessing.managers import BaseManager

# 新建一个类,继承自BaseManager。
class QueueManager(BaseManager):
pass


# 将两个函数名为get_task_queue和get_result_queue的函数对象注册到新建类中。
# 由于任务队列和结果队列以及调用它们的两个函数对象都是通过网络到服务器中获取,因此,这里只添加两个函数名即可。另外,这两个函数名可以和服务器端的两个函数名不同。
QueueManager.register("get_task_queue")
QueueManager.register("get_result_queue")

# 服务器ip地址。
server_addr = "127.0.0.1"
print("Connect to server %s..." % server_addr)

# 创建新建类的实例。
m = QueueManager(address = (server_addr, 5000), authkey = b"abc")

# 将新建类的实例连接至服务器的实例。
m.connect()

# 获取服务器中的任务队列和结果队列。
task = m.get_task_queue()
result = m.get_result_queue()

for i in range(10):
try:
# 从任务队列中获取任务。
n = task.get(timeout = 1)
print("run task %d * %d..." % (n ,n))
r = "%d * %d = %d" % (n, n, n * n)
time.sleep(1)
# 将结果存储至结果队列中。
result.put(r)
except Queue.Empty:
print("task queue is empty.")

print("worker exit.")

  • 可以看出,在worker中,并未新建队列,而是通过网络从服务器中获取。
  • master在注册队列时,不仅要指明函数名,还要指明函数对象。而worker中,仅仅需要指明函数名即可,而且这个函数名可以和master中的函数名不同。
  • master在创建新建类的实例时,不需要指明ip地址,只需要指明服务器中的这个进程的端口号即可。而worker中,要指明服务器的ip地址以及服务器中master进程的端口号。
  • authkey用于worker连接至正确的master,因此worker的authkey一定要与所想要连接的服务器端相应的authkey相同。

Reference