python的multiprocessing包是标准库提供的多进程并行计算包,提供了和threading(多线程)相似的API函数,但是相比于threading,将任务分配到不同的CPU,避免了GIL(Global Interpreter Lock)的限制。下面我们对multiprocessing中的Pool和Process类做介绍。
采用Pool进程池对任务并行处理更加方便,我们可以指定并行的CPU个数,然后 Pool 会自动把任务放到进程池中运行。 Pool 包含了多个并行函数。
apply 要逐个执行任务,在python3中已经被弃用,而apply_async是apply的异步执行版本。并行计算一定要采用apply_async函数。
import multiprocessingimport timefrom random import randint, seeddef f(num): seed() rand_num = randint(0,10) # 每次都随机生成一个停顿时间 time.sleep(rand_num) return (num, rand_num)start_time = time.time()cores = multiprocessing.cpu_count()pool = multiprocessing.Pool(processes=cores)pool_list = []result_list = []start_time = time.time()for xx in xrange(10): pool_list.append(pool.apply_async(f, (xx, ))) # 这里不能 get, 会阻塞进程result_list = [xx.get() for xx in pool_list]#在这里不免有人要疑问,为什么不直接在 for 循环中直接 result.get()呢?这是因为pool.apply_async之后的语句都是阻塞执行的,调用 result.get() 会等待上一个任务执行完之后才会分配下一个任务。事实上,获取返回值的过程最好放在进程池回收之后进行,避免阻塞后面的语句。# 最后我们使用一下语句回收进程池: pool.close()pool.join()print result_listprint '并行花费时间 %.2f' % (time.time() - start_time)print '串行花费时间 %.2f' % (sum([xx[1] for xx in result_list]))#[(0, 8), (1, 2), (2, 4), (3, 9), (4, 0), (5, 1), (6, 8), (7, 3), (8, 4), (9, 6)]#并行花费时间 14.11#串行花费时间 45.00
map_async 是 map的异步执行函数。
相比于 apply_async, map_async 只能接受一个参数。
import timefrom multiprocessing import Pooldef run(fn): #fn: 函数参数是数据列表的一个元素 time.sleep(1) return fn*fnif __name__ == "__main__": testFL = [1,2,3,4,5,6] print '串行:' #顺序执行(也就是串行执行,单进程) s = time.time() for fn in testFL: run(fn) e1 = time.time() print "顺序执行时间:", int(e1 - s) print '并行:' #创建多个进程,并行执行 pool = Pool(4) #创建拥有5个进程数量的进程池 #testFL:要处理的数据列表,run:处理testFL列表中数据的函数 rl, testFL) pool.close()#关闭进程池,不再接受新的进程 pool.join()#主进程阻塞等待子进程的退出 e2 = time.time() print "并行执行时间:", int(e2-e1) print rl# 串行:# 顺序执行时间: 6# 并行:# 并行执行时间: 2# [1, 4, 9, 16, 25, 36]
采用Process必须注意的是,Process对象来创建进程,每一个进程占据一个CPU,所以要建立的进程必须 小于等于 CPU的个数。如果启动进程数过多,特别是当遇到CPU密集型任务,会降低并行的效率。
# The Process classfrom multiprocessing import Process, cpu_countimport osimport timestart_time = time.time()def info(title):# print(title) if hasattr(os, 'getppid'): # only available on Unix print 'parent process:', os.getppid() print 'process id:', os.getpid() time.sleep(3)def f(name): info('function f') print 'hello', nameif __name__ == '__main__':# info('main line') p_list = [] # 保存Process新建的进程 cpu_num = cpu_count() for xx in xrange(cpu_num): p_list.append(Process(target=f, args=('xx_%s' % xx,))) for xx in p_list: xx.start() for xx in p_list: xx.join() print('spend time: %.2f' % (time.time() - start_time))parent process: 11741# parent process: 11741# parent process: 11741# process id: 12249# process id: 12250# parent process: 11741# process id: 12251# process id: 12252# hello xx_1# hello xx_0# hello xx_2# hello xx_3# spend time: 3.04
Process和Pool均支持Queues 和 Pipes 两种类型的通信。
# Exchanging objects between processes# Queuesfrom multiprocessing import Process, Queuedef f(q): q.put([42, None, 'hello'])if __name__ == '__main__': q = Queue() p = Process(target=f, args=(q,)) p.start() print q.get() # prints "[42, None, 'hello']" p.join()
from multiprocessing import Process, Pipedef f(conn): conn.send([42, None, 'hello']) conn.close()if __name__ == '__main__': parent_conn, child_conn = Pipe() p = Process(target=f, args=(child_conn,)) p.start() print parent_conn.recv() # prints "[42, None, 'hello']" p.join()
Queue() can have multiple producers and consumers.
When to use them
If you need more than two points to communicate, use a Queue().
If you need absolute performance, a Pipe() is much faster because Queue() is built on top of Pipe().
共享内存仅适用于 Process 类,不能用于进程池 Pool
# Sharing state between processes# Shared memoryfrom multiprocessing import Process, Value, Arraydef f(n, a): n.value = 3.1415927 for i in range(len(a)): a[i] = -a[i]if __name__ == '__main__': num = Value('d', 0.0) arr = Array('i', range(10)) p = Process(target=f, args=(num, arr)) p.start() p.join() print num.value print arr[:]# 3.1415927# [0, -1, -2, -3, -4, -5, -6, -7, -8, -9]
Manager Class 既可以用于Process 也可以用于进程池 Pool。
from multiprocessing import Manager, Processdef f(d, l, ii): d[ii] = ii l.append(ii)if __name__ == '__main__': manager = Manager() d = manager.dict() l = manager.list(range(10)) p_list = [] for xx in range(4): p_list.append(Process(target=f, args=(d, l, xx))) for xx in p_list: xx.start() for xx in p_list: xx.join() print d print l# {0: 0, 1: 1, 2: 2, 3: 3}# [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3]