python使用futures进行并行处理

很多时候任务都需要并行,比如IO密集型的爬数据,读写磁盘等,CPU计算密集型的计算任务等等。而Python由于GIL的原因,默认情况下只能单线程运行,无法直接利用硬件的多核多线程,因此效率较低,python也早提供了一些列的多线程多进程的库可以用来使用,比如multiprocessing, queue 等等, 不过使用起来都相对复杂,不易控制。 经过几番尝试发现,发现futures基本是最方便将单线程/进程代码改为并发的代码的一个模块。这篇博客主要是简单记录下futures最常用的方式,以方便后续使用。

futures 是 从Python3.2 加入到python标准库,python2.x需要额外pip安装,主要是对线程池和进程池做了高度的封装,可以更加灵活的使用并行编程。

futures提供了多线程和多进程的接口:

  • futures.ThreadPoolExecutor: 多线程,适合IO密集型任务:爬虫,读写处理
  • futures.ThreadPoolExecutor: 多进程,适合CPU密集型任务,比如数学运算等。

话不多说,直接先上一个最常用的例子模板:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#!/usr/bin/env python
# encoding: utf-8
from concurrent import futures
import requests
r = requests.Session()
def func(url):
data = r.get(url).text
return data
task_list = ["https://www.163.com", "https://www.zhibo8.cc", "https://www.baidu.com"]
res = []
with futures.ThreadPoolExecutor(max_workers=2) as executor:
f_tasks = {executor.submit(func, url): url for url in task_list}
for f in futures.as_completed(f_tasks):
url = f_tasks[f]
try:
data = f.result()
res.append(data)
print(f"{url} finished")
except Exception as exc:
print(f"{url} in wrong:{exc}")

简单解释下:

  • func函数:对于每一个任务的执行函数,对应修改自己的任务即可
  • ThreadPoolExecutor(max_workers=2) 这里用的是多线程,多进程的话只需要修改为ProcessPoolExecutor, max_workers 调解并行线程或者进程数目。
  • executor.submit(func, url) 这个是定义了执行函数,一般不用特别修改,当然如果要传入多个参数的话,也是可以的,直接包装为一个list或者tuple即可。
  • f.result() 是在任务完成时候,取出结果
  • 最后 try / except 一定要保留,不然代码出错的话,也不会抛出异常。

当然futures除了上面的用法,还有很多其他的API和用法,比如executor.map等,不过上面的用法是我个人最常用的,这篇博客就是记录下,免得以后用到了还得去查。

想深入了解futures的话,可以看看下面的几个讲解

  • https://www.dongwm.com/post/78/
  • https://juejin.im/post/5b1e36476fb9a01e4a6e02e4