Асинхронный парсер сайта, часть 3: ограничиваем количество одновременных запросов
В прошлой статье мы написали конкурентный парсер сайта, и столкнулись с ошибками перегрузки интернет-соединения.
Решить эту проблему мы можем добавлением ограничителя. Ограничитель можно добавить по-разному.
Способ 1: семафоры
Семафор — примитив синхронизации, в основе которого лежит счётчик, над которым можно производить две атомарные операции: увеличение и уменьшение значения на единицу, при этом операция уменьшения для нулевого значения счётчика является блокирующейся.
С помощью семафора можно регулировать количество одновременно выполняющегося кода.
Приведу новые функции parse_page и process_site (остальной код остался прежним).
async def parse_page(url, session, limiter):
async with limiter:
text = await fetch_and_validate_page_contents(url, session)
if text is None:
return None, set()
soup = BeautifulSoup(text, "lxml")
title = soup.find("title")
if title is not None:
title = title.text
all_internal_links = set()
for a in soup.find_all("a"):
link = a.get("href")
if link is None:
continue
if link.startswith("#"):
continue
absolute_link = urljoin(url, link)
if urlparse(absolute_link).hostname == HOSTNAME:
all_internal_links.add(
urldefrag(absolute_link)[0]
)
return title, all_internal_links
async def process_site(website, session):
visited_urls = {}
urls_to_visit = {website}
while urls_to_visit:
tasks = {}
limiter = asyncio.Semaphore(2)
for url in urls_to_visit:
current_url_to_parse = url
visited_urls[current_url_to_parse] = None
tasks[url] = parse_page(current_url_to_parse, session, limiter)
urls_to_visit.clear()
results = await asyncio.gather(
*tasks.values()
)
for url, (title, internal_links) in zip(tasks.keys(), results):
visited_urls[url] = title
for link in internal_links:
if link not in visited_urls:
urls_to_visit.add(link)
logging.info(f"Crawled {len(visited_urls)} pages, next iteration is {len(urls_to_visit)} pages")
return visited_urls
В функции process_site появилась строка limiter = asyncio.Semaphore(2) , и экземпляр limiter (один и тот же для одного цикла парсинга) передаётся во все задачи parse_page.
А в parse_page производится вызов функции fetch_and_validate_page_contents, но под управлением семафора limiter. Таким образом, больше двух (или сколько вы укажете) одновременно выполняемых блоков
async with limiter:
text = await fetch_and_validate_page_contents(url, session)
во время работы программы не будет. А все запросы к любой из страниц сайта у нас происходит только внутри функции fetch_and_validate_page_contents.
Способ 2: настройки aiohttp
Создатели aiohttp позаботились о многих вещах, в том числе и о настройке количества одновременных запросов.
Весь список настроек можно посмотреть тут, нас же интересует параметр limit_per_host.
Он отвечает за количество одновременно открытых соединений к одному сайту.
Изменениям подвергнется (по сравнению с кодом предыдущей статьи) только функция main, где мы добавим настройки объекта сессии aiohttp:
async def main():
connector = aiohttp.TCPConnector(
limit_per_host=2,
)
async with aiohttp.ClientSession(
connector=connector,
) as session:
return await process_site(WEBSITE, session)
И финальный получившийся код:
from urllib.parse import urljoin, urlparse, urldefrag
import asyncio
import logging
import aiohttp
from bs4 import BeautifulSoup
logging.basicConfig(level=logging.INFO)
async def fetch_and_validate_page_contents(url, session):
while True:
try:
async with session.get(url) as response:
if response.content_type != "text/html":
return None
if response.url.host != HOSTNAME:
return None
return await response.text()
except (aiohttp.ClientConnectorError, asyncio.TimeoutError) as e:
logging.warning(f"Connection error for URL: {url} {e}, retrying in 0.5 second...")
await asyncio.sleep(0.5)
async def parse_page(url, session):
text = await fetch_and_validate_page_contents(url, session)
if text is None:
return None, set()
soup = BeautifulSoup(text, "lxml")
title = soup.find("title")
if title is not None:
title = title.text
all_internal_links = set()
for a in soup.find_all("a"):
link = a.get("href")
if link is None:
continue
if link.startswith("#"):
continue
absolute_link = urljoin(url, link)
if urlparse(absolute_link).hostname == HOSTNAME:
all_internal_links.add(
urldefrag(absolute_link)[0]
)
return title, all_internal_links
async def process_site(website, session):
visited_urls = {}
urls_to_visit = {website}
while urls_to_visit:
tasks = {}
for url in urls_to_visit:
current_url_to_parse = url
visited_urls[current_url_to_parse] = None
tasks[url] = parse_page(current_url_to_parse, session)
urls_to_visit.clear()
results = await asyncio.gather(
*tasks.values()
)
for url, (title, internal_links) in zip(tasks.keys(), results):
visited_urls[url] = title
for link in internal_links:
if link not in visited_urls:
urls_to_visit.add(link)
logging.info(f"Crawled {len(visited_urls)} pages, next iteration is {len(urls_to_visit)} pages")
return visited_urls
async def main():
connector = aiohttp.TCPConnector(
limit_per_host=2,
)
async with aiohttp.ClientSession(
connector=connector,
) as session:
return await process_site(WEBSITE, session)
if __name__ == "__main__":
WEBSITE = "https://www.python.org"
HOSTNAME = urlparse(WEBSITE).hostname
titles = asyncio.run(main())
print(len(titles))
print(titles)
Этот код уже достаточно хорош. Есть некоторые проблемы, например, адреса с http и https для него - разные адреса; картинки и архивы можно фильтровать по расширению (.svg или .tar.gz, например); а также, возможно, некоторые другие.