Асинхронный парсер сайта, часть 3: ограничиваем количество одновременных запросов
В прошлой статье мы написали конкурентный парсер сайта, и столкнулись с ошибками перегрузки интернет-соединения.
Решить эту проблему мы можем добавлением ограничителя. Ограничитель можно добавить по-разному.
Способ 1: семафоры
Семафор — примитив синхронизации, в основе которого лежит счётчик, над которым можно производить две атомарные операции: увеличение и уменьшение значения на единицу, при этом операция уменьшения для нулевого значения счётчика является блокирующейся.
С помощью семафора можно регулировать количество одновременно выполняющегося кода.
Приведу новые функции parse_page и process_site (остальной код остался прежним).
async def parse_page(url, session, limiter): async with limiter: text = await fetch_and_validate_page_contents(url, session) if text is None: return None, set() soup = BeautifulSoup(text, "lxml") title = soup.find("title") if title is not None: title = title.text all_internal_links = set() for a in soup.find_all("a"): link = a.get("href") if link is None: continue if link.startswith("#"): continue absolute_link = urljoin(url, link) if urlparse(absolute_link).hostname == HOSTNAME: all_internal_links.add( urldefrag(absolute_link)[0] ) return title, all_internal_links async def process_site(website, session): visited_urls = {} urls_to_visit = {website} while urls_to_visit: tasks = {} limiter = asyncio.Semaphore(2) for url in urls_to_visit: current_url_to_parse = url visited_urls[current_url_to_parse] = None tasks[url] = parse_page(current_url_to_parse, session, limiter) urls_to_visit.clear() results = await asyncio.gather( *tasks.values() ) for url, (title, internal_links) in zip(tasks.keys(), results): visited_urls[url] = title for link in internal_links: if link not in visited_urls: urls_to_visit.add(link) logging.info(f"Crawled {len(visited_urls)} pages, next iteration is {len(urls_to_visit)} pages") return visited_urls
В функции process_site появилась строка limiter = asyncio.Semaphore(2) , и экземпляр limiter (один и тот же для одного цикла парсинга) передаётся во все задачи parse_page.
А в parse_page производится вызов функции fetch_and_validate_page_contents, но под управлением семафора limiter. Таким образом, больше двух (или сколько вы укажете) одновременно выполняемых блоков
async with limiter: text = await fetch_and_validate_page_contents(url, session)
во время работы программы не будет. А все запросы к любой из страниц сайта у нас происходит только внутри функции fetch_and_validate_page_contents.
Способ 2: настройки aiohttp
Создатели aiohttp позаботились о многих вещах, в том числе и о настройке количества одновременных запросов.
Весь список настроек можно посмотреть тут, нас же интересует параметр limit_per_host.
Он отвечает за количество одновременно открытых соединений к одному сайту.
Изменениям подвергнется (по сравнению с кодом предыдущей статьи) только функция main, где мы добавим настройки объекта сессии aiohttp:
async def main(): connector = aiohttp.TCPConnector( limit_per_host=2, ) async with aiohttp.ClientSession( connector=connector, ) as session: return await process_site(WEBSITE, session)
И финальный получившийся код:
from urllib.parse import urljoin, urlparse, urldefrag import asyncio import logging import aiohttp from bs4 import BeautifulSoup logging.basicConfig(level=logging.INFO) async def fetch_and_validate_page_contents(url, session): while True: try: async with session.get(url) as response: if response.content_type != "text/html": return None if response.url.host != HOSTNAME: return None return await response.text() except (aiohttp.ClientConnectorError, asyncio.TimeoutError) as e: logging.warning(f"Connection error for URL: {url} {e}, retrying in 0.5 second...") await asyncio.sleep(0.5) async def parse_page(url, session): text = await fetch_and_validate_page_contents(url, session) if text is None: return None, set() soup = BeautifulSoup(text, "lxml") title = soup.find("title") if title is not None: title = title.text all_internal_links = set() for a in soup.find_all("a"): link = a.get("href") if link is None: continue if link.startswith("#"): continue absolute_link = urljoin(url, link) if urlparse(absolute_link).hostname == HOSTNAME: all_internal_links.add( urldefrag(absolute_link)[0] ) return title, all_internal_links async def process_site(website, session): visited_urls = {} urls_to_visit = {website} while urls_to_visit: tasks = {} for url in urls_to_visit: current_url_to_parse = url visited_urls[current_url_to_parse] = None tasks[url] = parse_page(current_url_to_parse, session) urls_to_visit.clear() results = await asyncio.gather( *tasks.values() ) for url, (title, internal_links) in zip(tasks.keys(), results): visited_urls[url] = title for link in internal_links: if link not in visited_urls: urls_to_visit.add(link) logging.info(f"Crawled {len(visited_urls)} pages, next iteration is {len(urls_to_visit)} pages") return visited_urls async def main(): connector = aiohttp.TCPConnector( limit_per_host=2, ) async with aiohttp.ClientSession( connector=connector, ) as session: return await process_site(WEBSITE, session) if __name__ == "__main__": WEBSITE = "https://www.python.org" HOSTNAME = urlparse(WEBSITE).hostname titles = asyncio.run(main()) print(len(titles)) print(titles)
Этот код уже достаточно хорош. Есть некоторые проблемы, например, адреса с http и https для него - разные адреса; картинки и архивы можно фильтровать по расширению (.svg или .tar.gz, например); а также, возможно, некоторые другие.