Асинхронный парсер сайта, часть 2: добавляем конкурентность

В прошлой статье мы написали асинхронный парсер сайта. Он имеет несколько проблем, главная из которых - это, собственно, отсутствие конкурентности.

Как создаётся конкурентность?

  1. Созданием нескольких задач (asyncio.create_task), без вызова await до создания всех задач
  2. Использованием asyncio.gather с несколькими корутинами

Где использовать asyncio.gather?

Там, где хотим конкурентного выполнения задач.

Если, например, мы хотели спарсить множество сайтов, то мы могли бы написать asyncio.gather в main, и конкурентность была бы только на уровне различных сайтов.

Этот подход будет иметь свои недостатки (например, объём каждого сайта разный, и один очень большой сайт будет тормозить работу всего приложения). Впрочем, ничто не мешает использовать asyncio.gather в нескольких местах.

Приведу код асинхронного парсера, использующего asyncio.gather:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
from urllib.parse import urljoin, urlparse, urldefrag
import asyncio
import logging

import aiohttp
from bs4 import BeautifulSoup

logging.basicConfig(level=logging.INFO)


async def fetch_and_validate_page_contents(url, session):
    while True:
        try:
            async with session.get(url) as response:
                if response.content_type != "text/html":
                    return None
                if response.url.host != HOSTNAME:
                    return None
                return await response.text()
        except aiohttp.ClientConnectorError:
            logging.warning(f"Connection error for URL: {url}, retrying in 0.5 second...")
            await asyncio.sleep(0.5)


async def parse_page(url, session):
    text = await fetch_and_validate_page_contents(url, session)
    if text is None:
        return None, set()
    soup = BeautifulSoup(text, "lxml")
    title = soup.find("title")
    if title is not None:
        title = title.text
    all_internal_links = set()
    for a in soup.find_all("a"):
        link = a.get("href")
        if link is None:
            continue
        if link.startswith("#"):
            continue
        absolute_link = urljoin(url, link)
        if urlparse(absolute_link).hostname == HOSTNAME:
            all_internal_links.add(
                urldefrag(absolute_link)[0]
            )
    return title, all_internal_links


async def process_site(website, session):
    visited_urls = {}
    urls_to_visit = {website}
    while urls_to_visit:
        tasks = {}
        for url in urls_to_visit:
            current_url_to_parse = url
            visited_urls[current_url_to_parse] = None
            tasks[url] = parse_page(current_url_to_parse, session)
        urls_to_visit.clear()
        results = await asyncio.gather(
            *tasks.values()
        )
        for url, (title, internal_links) in zip(tasks.keys(), results):
            visited_urls[url] = title
            for link in internal_links:
                if link not in visited_urls:
                    urls_to_visit.add(link)
        logging.info(f"Crawled {len(visited_urls)} pages, next iteration is {len(urls_to_visit)} pages")
    return visited_urls


async def main():
    async with aiohttp.ClientSession() as session:
        return await process_site(WEBSITE, session)


if __name__ == "__main__":
    WEBSITE = "https://www.python.org"
    HOSTNAME = urlparse(WEBSITE).hostname
    titles = asyncio.run(main())
    print(len(titles))
    print(titles[WEBSITE])

Что изменилось по сравнению с предыдущей статьей?

Главное изменение, конечно же, в функции process_site, задачи по обработке страниц на каждой итерации создаются словарём (чтобы был доступен url-адрес для каждой задачи), а потом разом передаются в asyncio.gather. Таким образом, задачи парсинга страниц на каждой итерации выполняются конкурентно.

Этот подход, однако, также имеет недостаток: когда мы создаём очень много задач, мы все из них выполняем конкурентно. То есть, мы одновременно посылаем тысячи запросов к сайту. Это, во-первых, перегружает наш интернет-канал, а во-вторых, интернет-канал принимающего сервера. Более того, адрес, с которого мы запускаем парсер, в теории могут заблокировать.

В функциях process_site и fetch_and_validate_page_contents добавлено базовое логирование ошибок и процесса парсинга. Попробуем запустить этот код и посмотрим, что получится.

Мы видим, что уже появились ошибки (хотя сайт https://www.python.org/ нас, скорее всего, не забанит).

INFO:root:Crawled 1 pages, next iteration is 66 pages
INFO:root:Crawled 67 pages, next iteration is 3579 pages
WARNING:root:Connection error for URL: https://www.python.org/downloads/release/python-3613/, retrying in 1 second...
WARNING:root:Connection error for URL: https://www.python.org/events/python-user-group/1724/, retrying in 1 second...
WARNING:root:Connection error for URL: https://www.python.org/downloads/release/python-349rc1/, retrying in 1 second...
WARNING:root:Connection error for URL: https://www.python.org/ftp/python/3.5.0/python-3.5.0rc3-macosx10.5.pkg, retrying in 1 second...
INFO:root:Crawled 3646 pages, next iteration is 4316 pages
........

Далее мы разберём, как добавить ограничения на количество одновременных запросов.