Асинхронный парсер сайта, часть 3: ограничиваем количество одновременных запросов

В прошлой статье мы написали конкурентный парсер сайта, и столкнулись с ошибками перегрузки интернет-соединения.

Решить эту проблему мы можем добавлением ограничителя. Ограничитель можно добавить по-разному.

Способ 1: семафоры

Семафор — примитив синхронизации, в основе которого лежит счётчик, над которым можно производить две атомарные операции: увеличение и уменьшение значения на единицу, при этом операция уменьшения для нулевого значения счётчика является блокирующейся.

С помощью семафора можно регулировать количество одновременно выполняющегося кода.

Приведу новые функции parse_page и process_site (остальной код остался прежним).

async def parse_page(url, session, limiter):
    async with limiter:
        text = await fetch_and_validate_page_contents(url, session)
    if text is None:
        return None, set()
    soup = BeautifulSoup(text, "lxml")
    title = soup.find("title")
    if title is not None:
        title = title.text
    all_internal_links = set()
    for a in soup.find_all("a"):
        link = a.get("href")
        if link is None:
            continue
        if link.startswith("#"):
            continue
        absolute_link = urljoin(url, link)
        if urlparse(absolute_link).hostname == HOSTNAME:
            all_internal_links.add(
                urldefrag(absolute_link)[0]
            )
    return title, all_internal_links


async def process_site(website, session):
    visited_urls = {}
    urls_to_visit = {website}
    while urls_to_visit:
        tasks = {}
        limiter = asyncio.Semaphore(2)
        for url in urls_to_visit:
            current_url_to_parse = url
            visited_urls[current_url_to_parse] = None
            tasks[url] = parse_page(current_url_to_parse, session, limiter)
        urls_to_visit.clear()

        results = await asyncio.gather(
            *tasks.values()
        )
        for url, (title, internal_links) in zip(tasks.keys(), results):
            visited_urls[url] = title
            for link in internal_links:
                if link not in visited_urls:
                    urls_to_visit.add(link)
        logging.info(f"Crawled {len(visited_urls)} pages, next iteration is {len(urls_to_visit)} pages")
    return visited_urls

В функции process_site появилась строка limiter = asyncio.Semaphore(2) , и экземпляр limiter (один и тот же для одного цикла парсинга) передаётся во все задачи parse_page.

А в parse_page производится вызов функции fetch_and_validate_page_contents, но под управлением семафора limiter. Таким образом, больше двух (или сколько вы укажете) одновременно выполняемых блоков

async with limiter:
    text = await fetch_and_validate_page_contents(url, session)

во время работы программы не будет. А все запросы к любой из страниц сайта у нас происходит только внутри функции fetch_and_validate_page_contents.

Способ 2: настройки aiohttp

Создатели aiohttp позаботились о многих вещах, в том числе и о настройке количества одновременных запросов.

Весь список настроек можно посмотреть тут, нас же интересует параметр limit_per_host.

Он отвечает за количество одновременно открытых соединений к одному сайту.

Изменениям подвергнется (по сравнению с кодом предыдущей статьи) только функция main, где мы добавим настройки объекта сессии aiohttp:

async def main():
    connector = aiohttp.TCPConnector(
        limit_per_host=2,
    )
    async with aiohttp.ClientSession(
        connector=connector,
    ) as session:
        return await process_site(WEBSITE, session)

И финальный получившийся код:

from urllib.parse import urljoin, urlparse, urldefrag
import asyncio
import logging

import aiohttp
from bs4 import BeautifulSoup

logging.basicConfig(level=logging.INFO)


async def fetch_and_validate_page_contents(url, session):
    while True:
        try:
            async with session.get(url) as response:
                if response.content_type != "text/html":
                    return None
                if response.url.host != HOSTNAME:
                    return None
                return await response.text()
        except (aiohttp.ClientConnectorError, asyncio.TimeoutError) as e:
            logging.warning(f"Connection error for URL: {url} {e}, retrying in 0.5 second...")
            await asyncio.sleep(0.5)


async def parse_page(url, session):
    text = await fetch_and_validate_page_contents(url, session)
    if text is None:
        return None, set()
    soup = BeautifulSoup(text, "lxml")
    title = soup.find("title")
    if title is not None:
        title = title.text
    all_internal_links = set()
    for a in soup.find_all("a"):
        link = a.get("href")
        if link is None:
            continue
        if link.startswith("#"):
            continue
        absolute_link = urljoin(url, link)
        if urlparse(absolute_link).hostname == HOSTNAME:
            all_internal_links.add(
                urldefrag(absolute_link)[0]
            )
    return title, all_internal_links


async def process_site(website, session):
    visited_urls = {}
    urls_to_visit = {website}
    while urls_to_visit:
        tasks = {}
        for url in urls_to_visit:
            current_url_to_parse = url
            visited_urls[current_url_to_parse] = None
            tasks[url] = parse_page(current_url_to_parse, session)
        urls_to_visit.clear()

        results = await asyncio.gather(
            *tasks.values()
        )
        for url, (title, internal_links) in zip(tasks.keys(), results):
            visited_urls[url] = title
            for link in internal_links:
                if link not in visited_urls:
                    urls_to_visit.add(link)
        logging.info(f"Crawled {len(visited_urls)} pages, next iteration is {len(urls_to_visit)} pages")
    return visited_urls


async def main():
    connector = aiohttp.TCPConnector(
        limit_per_host=2,
    )
    async with aiohttp.ClientSession(
        connector=connector,
    ) as session:
        return await process_site(WEBSITE, session)


if __name__ == "__main__":
    WEBSITE = "https://www.python.org"
    HOSTNAME = urlparse(WEBSITE).hostname
    titles = asyncio.run(main())
    print(len(titles))
    print(titles)

Этот код уже достаточно хорош. Есть некоторые проблемы, например, адреса с http и https для него - разные адреса; картинки и архивы можно фильтровать по расширению (.svg или .tar.gz, например); а также, возможно, некоторые другие.