关于ThreadPoolExecutor

来源:8-10 ThreadPoolExecutor线程池重构爬虫

ErogenousMonstar

2019-11-15

我设置的最大线程数为10.
如果不用as_completed方法,则运行结果如下,只运行了10个线程就退出程序了:
图片描述

如果使用as_completed方法,则会跑完所有的线程,结果如下:
图片描述

老师的代码中并没有加as_completed方法,以上代码是我从老师的git仓库下载的。我也不懂到底需不需要加as_completed方法。对ThreadPoolExecutor还是不是很理解,希望老师可以解答下。

写回答

4回答

bobby

2019-11-18

课程中已经讲解过as_completed的源码了,其实这个就是一个生成器,目的就是等待所有的任务执行完成,但是因为使用了生成器的方式,所以这个函数可以做到完成一个任务就返回一个结果,而不需要等到所有的任务都完成后执行,所以这个就相当于一个监控了。 上面会退出的代码你发一下代码 我本地运行试试

1
5
仙女座舜
回复
bobby
谢谢老师。
2020-03-13
共5条回复

夜愿小夜

2020-12-17

验证as_completed,确实会等待执行,哪怕被等待的任务内部有新的任务加入。

代码如下:

import time
from concurrent.futures import ThreadPoolExecutor, as_completed


def sleep_task(sleep_time, task_name):
    print("{} sleep {} s".format(task_name, sleep_time))
    time.sleep(3)
    print("{} weekea {} s".format(task_name, sleep_time))
    if task_name == "task3":  # as_completed 会等待中途上车的任务
        executor.submit(sleep_task, 6, "task4")
        executor.submit(sleep_task, 20, "task5")
    return task_name


executor = ThreadPoolExecutor(max_workers=2)
task1 = executor.submit(sleep_task, 2, "task1")
task2 = executor.submit(sleep_task, 3, "task2")
task3 = executor.submit(sleep_task, 3, "task3")

all_task = [task1, task2, task3]
for task in as_completed(all_task):
    print(task.result(), "  执行完成")
print("main end")

结果如下:

task1 sleep 2 s
task2 sleep 3 s
task2 weekea 3 s
task3 sleep 3 s
task1 weekea 2 s
task2   执行完成
task1   执行完成
task3 weekea 3 s
task4 sleep 6 stask5 sleep 20 s

task3   执行完成
main end
task5 weekea 20 s
task4 weekea 6 s

注意 main end  输出的位置,虽然在task1,task2,task3执行完毕后就输出了,但整个程序仍旧等待了task4和task5的执行。

0
0

bobby

2020-03-13

import re
import ast
from urllib import parse
from datetime import datetime

import requests
from scrapy import Selector


domain = "https://bbs.csdn.net"


def parse_list(url):
    print('*' * 200)
    print("解析列表页: {}".format(url))

    res_text = requests.get(url).text
    sel = Selector(text=res_text)
    all_trs = sel.xpath("//table[@class='forums_tab_table']/tbody/tr")
    for tr in all_trs:
        topic_title = tr.xpath(".//td[3]/a/text()").extract()[0]
        # print(topic_title)

    next_page = sel.xpath("//a[@class='pageliststy next_page']/@href").extract()
    if next_page:
        next_url = parse.urljoin(domain, next_page[1])
        print(next_url)
        task2 = executor.submit(parse_list, next_url)
        thread_list.append(task2)


if __name__ == "__main__":
    stop = False
    from concurrent.futures import ThreadPoolExecutor, as_completed

    executor = ThreadPoolExecutor(max_workers=5)
    last_urls = ['https://bbs.csdn.net/forums/ios']
    # 爬取这个url下的所有标题 和下一页的标题
    thread_list = []
    print(len(last_urls))
    for url in last_urls:
        task1 = executor.submit(parse_list, url)
        thread_list.append(task1)
        parse_list(url)

    # https://bbs.csdn.net/forums/ios' 这个url有一百个下一页

    # 主线程退出以后会导致运行中的线程运行完成以后后续排队的任务不执行
    # as_completed只会将参数中的task完成以后就不在执行新的任务,所以采用一个全局变量stop来决定是否退出主线程并关闭线程池
    import time
    while not stop:
        time.sleep(1)


0
0

bobby

2019-11-20

如果不使用as_completed的话,那么主线程会等待线程池中每个线程结束退出,这个时候所有线程都会关闭,也就是当线程池中能创建的最大线程数量满了以后就会退出

0
3
小陈Cc
回复
bobby
✧*。٩(ˊωˋ*)و✧*。
2020-04-14
共3条回复

Python爬虫工程师实战 大数据时代必备

慕课网严选精品教程,高质量内容+服务!

2377 学习 · 1158 问题

查看课程