资料推荐
这里只是学习资料的一个笔记与总结, 更详细、仔细的学习还请各位看官自行看看原始的资料。在此罗列一下参考到的有用的资料。
主要参考资料:
译者非常的用心, 原著(英文版)的代码译者应该大部分都亲自测试过。 因为原著很多疑似疏漏的地方, 译者都做了特别的标识。在此鄙视一下由张龙翻译并出版成书的版本。 翻译狗屁不通, 上面的代码应该也没跑过。
异步编程讲得还是满详细的! 学到了很多东西!
基于线程的并行
Python Thread的定义
1 2 3 4 5 6 |
class threading.Thread(group=None, # 一般设置为 None ,这是为以后的一些特性预留的 target=None, # 当线程启动的时候要执行的函数 name=None, # 线程的名字,默认会分配一个唯一名字 Thread-N args=(), # 传递给 target 的参数,要使用tuple类型 kwargs={}) # 传递给 target 的参数,要使用dict类型 |
例子: 简单的Python Thread 示例代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
import threading import time def first_function(): print(threading.currentThread().getName() + str(' is Starting ')) time.sleep(2) print (threading.currentThread().getName() + str(' is Exiting ')) return def second_function(): print(threading.currentThread().getName() + str(' is Starting ')) time.sleep(2) print (threading.currentThread().getName() + str(' is Exiting ')) return def third_function(): print(threading.currentThread().getName() + str(' is Starting ')) time.sleep(2) print(threading.currentThread().getName() + str(' is Exiting ')) return if __name__ == "__main__": t1 = threading.Thread(name='first_function', target=first_function) t2 = threading.Thread(name='second_function', target=second_function) t3 = threading.Thread(name='third_function', target=third_function) t1.start() t2.start() t3.start() print("main thread!!!") |
输出示例:
1 2 3 4 5 6 7 8 9 10 11 12 |
first_function is Starting second_function is Starting third_function is Starting main thread!!! # 问题: 请问如果多执行几次, 下面的输出结果的顺序是固定的吗? first_function is Exiting second_function is Exiting third_function is Exiting # 结论: 测试结果是固定的 |
问题2: 在最底部按如下顺序加入join()
1 2 3 4 5 6 7 8 |
t1.join() t2.join() t3.join() """ 结果:第二部分的顺序看起来是随机的 """ |
问题3: 按照以下顺序开始&join()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
t1.start() t1.join() t2.start() t2.join() t3.start() t3.join() print("main thread!!!") """ 输出顺序固定, 如下所示: first_function is Starting first_function is Exiting second_function is Starting second_function is Exiting third_function is Starting third_function is Exiting main thread!!! 我的理解: join() 是让当前线程完成了再回到主线程, 多线程就成了“单线程”在执行了 """ |
几个线程的事实
参考资料: https://blog.csdn.net/zhiyuan_2007/article/details/48807761
- python 默认参数创建线程后,不管主线程是否执行完毕,都会等待子线程执行完毕才一起退出,有无join结果一样
-
如果创建线程,并且设置了daemon为true,即thread.setDaemon(True), 则主线程执行完毕后自动退出,不会等待子线程的执行结果。而且随着主线程退出,子线程也消亡。
-
join方法的作用是阻塞,等待子线程结束,join方法有一个参数是timeout,即如果主线程等待timeout,子线程还没有结束,则主线程强制结束子线程。
-
如果线程daemon属性为False, 则join里的timeout参数无效。主线程会一直等待子线程结束。
-
如果线程daemon属性为True, 则join里的timeout参数是有效的, 主线程会等待timeout时间后,结束子线程。此处有一个坑,即如果同时有N个子线程join(timeout),那么实际上主线程会等待的超时时间最长为 N * timeout, 因为每个子线程的超时开始时刻是上一个子线程超时结束的时刻。
自定义线程类的实现
- 定义一个
Thread
的子类 - 覆盖构造函数
__init__(self [,args])
, 可以添加更多的参数 - 覆盖
run()
函数, 实现该线程要做的事情 - 启动方法: 调用
start()
而不是run()
- 同样的, 也可以调用
join()
函数
一个使用多线程来抓取豆瓣API的例子:
参考: https://zhuanlan.zhihu.com/p/34004447
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
import threading import requests from bs4 import BeautifulSoup class MyThread(threading.Thread): def __init__(self, i): threading.Thread.__init__(self) self.i = i def run(self): url = 'https://movie.douban.com/top250?start={}&filter='.format(self.i*25) r = requests.get(url) soup = BeautifulSoup(r.content, 'html.parser') lis = soup.find('ol', class_='grid_view').find_all('li') title_lst = [] for li in lis: title = li.find('span', class_="title").text title_lst.append(title) print( "thread-%s" % self.i, ",".join(title_lst)) for i in range(10): th = MyThread(i) th.start() |
输出示例: (宽度有限, 用省略号来替代了)
1 2 3 4 5 6 7 8 9 10 11 |
thread-6 心迷宫,纵横四海,荒... thread-7 燃情岁月,未麻的部屋... thread-9 绿里奇迹,2001太... thread-1 蝙蝠侠:黑暗骑士,乱... thread-8 谍影重重,战争之王,... thread-0 肖申克的救赎,霸王别... thread-5 蝙蝠侠:黑暗骑士崛起... thread-3 猫鼠游戏,沉默的羔羊... thread-4 消失的爱人,大鱼,一... thread-2 指环王2:双塔奇兵,... |
多线程的应用场景
补充:
一些简单的结论:
- 全称
Global Interpreter Lock
: 全局解释器锁 - 作用: CPython 引入的锁机制。GIL在解释器层面阻止了真正的并行运行。
-
因此, 如果是CPU密集型的任务, 多线程反而会拖累整体性能
-
I/O 密集型才是CPython 解释器下多线程的正确应用场景
注意: 如果读的是本地文件, 也需要读取不同的文件, 否则不一定能提高性能。
典型应用: 多线程抓取网络数据 / 调用WebAPI 。。。
-
Why IO: 因为在进行IO调用的时候, GIL会释放相应的锁!
多线程同步
书中介绍了多种同步机制: Lock / RLock / 信号量(semaphore) / 事件 / with / 队列(quene)
我没有细看, 暂时还不需要处理这种场景。 (当前处理的场景还不需要这么复杂。)
多线程运行结果汇总
举个例子: 上面我们通过多线程抓取了豆瓣Top250 的电影名称,那么我们如何直接得到汇总之后的运行结果呢?
基于上面使用线程类的方式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
import threading import requests from bs4 import BeautifulSoup class MyThread(threading.Thread): def __init__(self, i): threading.Thread.__init__(self) self.i = i self.result = [] # 新增一个变量用于存放结果 def run(self): url = 'https://movie.douban.com/top250?start={}&filter='.format(self.i*25) r = requests.get(url) soup = BeautifulSoup(r.content, 'html.parser') lis = soup.find('ol', class_='grid_view').find_all('li') title_lst = [] for li in lis: title = li.find('span', class_="title").text title_lst.append(title) print( "thread-%s" % self.i, ",".join(title_lst)[:10] + "...") self.result = title_lst # 存储运算结果 def get_result(self): # 专门用于返回运算结果 return self.result result_list = [] thread_list = [] for i in range(10): th = MyThread(i) th.start() thread_list.append(th) # 把全部线程类都放到一个list之中 for th in thread_list : # 遍历线程类list th.join() # 让每个线程都执行完 result_list.extend(th.get_result()) # 获取线程类的执行结果 print("==== final result:", len(result_list)) print(result_list) |
运行结果如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
thread-4 消失的爱人,大鱼,一... thread-8 谍影重重,战争之王,... thread-1 蝙蝠侠:黑暗骑士,乱... thread-3 猫鼠游戏,沉默的羔羊... thread-7 燃情岁月,未麻的部屋... thread-0 肖申克的救赎,霸王别... thread-5 蝙蝠侠:黑暗骑士崛起... thread-2 指环王2:双塔奇兵,... thread-9 绿里奇迹,2001太... thread-6 心迷宫,纵横四海,荒... ==== final result: 250 ['肖申克的救赎', '霸王别姬', 。。。。。] |
基于threading.Thread
直接调用的方式
目前没有找到直接从Thread
类返回结果的方式。
线程池ThreadPool的方式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
from multiprocessing.pool import ThreadPool import requests from bs4 import BeautifulSoup def run(page_no): url = 'https://movie.douban.com/top250?start={}&filter='.format(page_no * 25) r = requests.get(url) soup = BeautifulSoup(r.content, 'html.parser') lis = soup.find('ol', class_='grid_view').find_all('li') title_lst = [] for li in lis: title = li.find('span', class_="title").text title_lst.append(title) print("thread-%s" % page_no, ",".join(title_lst)[:10] + "...") return title_lst import time t0 = time.time() pool = ThreadPool(processes=4) result_list = pool.map(run, [0,1]) print(len(result_list)) print(result_list) t1 = time.time() print("cost time:", (t1 - t0)) |
输出结果:
1 2 3 4 5 6 7 |
thread-1 蝙蝠侠:黑暗骑士,乱... thread-0 肖申克的救赎,霸王别... 2 [['肖申克的救赎', '霸王别姬', ...], ['蝙蝠侠:黑暗骑士', '乱世佳人', ...]] cost time: 2.3958077430725098 |
注意: 刚才只抓了两页, 因此结果集的长度=2, 现在改成抓取4页, 即有如下改动:
1 2 3 4 5 6 |
# 原来 result_list = pool.map(run, [0,1]) # 改为 result_list = pool.map(run, [0,1,2,3]) |
输出结果:
1 2 3 4 5 6 7 8 9 10 11 12 |
thread-3 猫鼠游戏,沉默的羔羊... thread-0 肖申克的救赎,霸王别... thread-1 蝙蝠侠:黑暗骑士,乱... thread-2 指环王2:双塔奇兵,... 4 [ ['肖申克的救赎', '霸王别姬', ...], ['蝙蝠侠:黑暗骑士', '乱世佳人', ...], ['指环王2:双塔奇兵', '教父2', ...], ['猫鼠游戏', '沉默的羔羊', ...]] cost time: 2.5353810787200928 |
从运行结果上面来说, 时间并没有增加太多(网络开销无法固定)。 如果我们改成抓取10页, 执行速度就要慢得多了(毕竟我们设置线程池的数量为4
)
线程池
ThreadPool VS Pool
from multiprocessing.pool import ThreadPool
: 线程池from multiprocessing import Pool
: 进程池from multiprocessing.dummy import Pool
: 线程池
具体选择哪个线程池, 两个线程池有什么区别? 我也在好奇之中。。。
貌似推荐dummy.Pool
的人多一些。 两个线程池在Python官网的介绍都比较少。。。
之所以推荐dummy.Pool
的人多一些, 原因总结如下:
- 实现了跟Pool 完全一致的API , 切换线程池、进程池比较方便
- ThreadPool 没有文档说明。。。
几种执行方式
- apply_async
- apply
- map_async
- map
简单说说他们的区别与联系:
map
&apply
他们都是同步/阻塞的即: map/apply之后直接运行线程/进程,运行结束后再执行之后语句
-
map
对比apply
就是调用参数不太一样def apply(self, func, args=(), kwds={}):
def map(self, func, iterable, chunksize=None):
- 带
_async
就是原来同步的基础之上变成是异步执行。直到遇到wait()
之后才阻塞
几种执行方式例子:
参考: python进程池multiprocessing.Pool和线程池multiprocessing.dummy.Pool实例
正确关闭pool的姿势
需要注意的是, 正确关闭Pool的方式:
1 2 3 4 5 |
pool = ThreadPool(x) # do something ... pool.close() # 先close pool.join() # 再join |
异步编程
一个关键概念: 协程
除了并发之外, 还有一种编程模式:异步。 其中很重要的一个概念就是: 协程(Coroutine / 微线程,纤程)
引用一个文章的说法:小米安全中心: 乱谈 Python 并发
Python的线程(包括进程)其实都是对系统原生内核级线程的包装,切换时,需要在内核与用户态空间来来回回,开销明显会大很多,而且多个线程完全由系统来调度,什么时候执行哪个线程是无法预知的。相比而言,协程就轻量级很多,是对线程的一种模拟,原理与内核级线程类似,只不过切换时,上下文环境保存在用户态的堆栈里,协程“挂起”的时候入栈,“唤醒”的时候出栈,所以其调度是可以人为控制的,这也是“协程”名字的来由,大伙协作着来,别总抢来抢去的,伤感情。
简单来说:协程是一种用户态的轻量级线程。因此在任务切换的时候要更轻量得多。
futures
做爬虫的例子
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor,as_completed import time import requests def download(url): headers = {'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:63.0) Gecko/20100101 Firefox/63.0', 'Connection':'keep-alive', 'Host':'example.webscraping.com'} response = requests.get(url, headers=headers) return response.status_code if __name__ == '__main__': urllist = ['http://example.webscraping.com/places/default/view/Afghanistan-1', 'http://example.webscraping.com/places/default/view/Aland-Islands-2'] start = time.time() pool = ProcessPoolExecutor(max_workers = 2) # 也可以改成 ThreadPoolExecutor futures = [pool.submit(download,url) for url in urllist] for future in futures: print('执行中:%s, 已完成:%s' % (future.running(), future.done())) print('#### 分界线 ####') for future in as_completed(futures, timeout=2): print('执行中:%s, 已完成:%s' % (future.running(), future.done())) print(future.result()) end = time.time() print('使用多线程--timestamp:{:.3f}'.format(end-start)) |
注意:
- Python的这个
concurrent.futures
库感觉跟Java 的很像, 可能相互借鉴与学习的结果。 -
不过感觉这样写还略麻烦。 可以考虑使用
aiohttp
来做网络爬虫, 用法跟直接使用requests
比较像, 但是需要接触到新的关键字:asycn wait / asycn with
-
使用了异步编程之后, 感觉整个人生观都变了, 需要注意时刻在你的代码中使用异步操作,你如果在代码中使用同步操作,爬虫并不会报错,但是速度可能会受影响。 (就像在写nodejs, 需要调整到回调思维那样)
异步编程个人经验小结
-
心智模式上, 线程池、进程池的思维模式我们还是比较熟悉的, 跟之前的同步思想差别不大
但是异步编程就差别很多了, 而且很多地方都用到了
asycn / await
的关键字。 -
可维护性上,考虑到项目交接等等,我的选择: (IO密集型的任务)
- 单进程、单线程同步
- 单进程、多线程
- 异步编程
也就是说: 除非性能扛不住, 否则就用最简单的编程模型。
-
如果是网络爬虫,
aiohttp + asycnio
搭配的执行效率是最高的性能数据参考:使用Python进行并发编程-asyncio篇(一)
-
你碰到了CPU密集型? 比如机器学习、数据分析等等~
个人不推荐使用多进程,各种开销、维护很头疼。 推荐使用消息队列、Celery 等等。
-
asyncio
还在快速发展之中, 在Python3.7 之中,新增了asyncio.run
的写法, 否则我们得这样写:1234loop = asyncio.get_event_loop()loop.run_until_complete(target_function())loop.close()
aiohttp做爬虫的例子
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
import aiohttp import asyncio NUMBERS = range(12) # URL = 'http://httpbin.org/get?a={}' URL = "https://www.baidu.com/s?wd={}" sema = asyncio.Semaphore(10) # 通过信号量控制并发数 final_list = [] def update_dict(new_ele) : """ 更新final_list, 相当于把异步并发的结果合并到final_list之中。 并且在这里控制, 如果长度到达一定的标准, 就可以做下一步的事情。 这一步是同步的。 """ final_list.append(new_ele) print("============ list length:", len(final_list)) if len(final_list) >= 3 : print("3333 sleep 3 seconds, could insert data list into db") time.sleep(3) final_list = [] async def fetch_async(a): """ 使用aiohttp发出异步请求 """ async with aiohttp.request('GET', URL.format(a)) as r: data = await r.text() return data async def print_result(a): with (await sema): r = await fetch_async(a) update_dict(r) print('fetch({}) = {}'.format(a, r)) import time t0 = time.time() loop = asyncio.get_event_loop() f = asyncio.wait([print_result(num) for num in NUMBERS]) loop.run_until_complete(f) t1 = time.time() print("============ cost time:", t1 - t0) |
一个可以优化的地方: 官网建议重用ClientSession
, 使用方法跟requests
的session
比较类似。 在这里图省事, 暂时没有改造成这样的形式。重用Session, 根据我的理解, 抓取同一个域名会比较有效。(么有实际测试过~)
原文链接:https://www.flyml.net/2019/07/07/python-parallel-programming/

文章评论