Source code for perekrestok_api.manager

from __future__ import annotations

import json
import os
from collections import defaultdict
from dataclasses import dataclass, field
from typing import Any

from camoufox.async_api import AsyncCamoufox
from human_requests import HumanBrowser, HumanContext, HumanPage
from human_requests.abstraction import FetchResponse, HttpMethod, Proxy
from human_requests.network_analyzer.anomaly_sniffer import (
    HeaderAnomalySniffer, WaitHeader, WaitSource)
from playwright.async_api import TimeoutError as PWTimeoutError

from .endpoints.advertising import ClassAdvertising
from .endpoints.catalog import ClassCatalog
from .endpoints.general import ClassGeneral
from .endpoints.geolocation import ClassGeolocation


def _pick_https_proxy() -> str | None:
    """Возвращает прокси из HTTPS_PROXY/https_proxy (если заданы)."""
    return os.getenv("HTTPS_PROXY") or os.getenv("https_proxy")


[docs] @dataclass class PerekrestokAPI: """ Клиент Перекрестка. """ timeout_ms: float = 10000.0 """Время ожидания ответа от сервера в миллисекундах.""" headless: bool = True """Запускать браузер в headless режиме?""" proxy: str | dict | None = field(default_factory=_pick_https_proxy) """Прокси-сервер для всех запросов (если нужен). По умолчанию берет из окружения (если есть). Принимает как формат Playwright, так и строчный формат.""" browser_opts: dict[str, Any] = field(default_factory=dict) """Дополнительные опции для браузера (см. https://camoufox.com/python/installation/)""" CATALOG_VERSION = "1.4.1.0" MAIN_SITE_URL = "https://www.perekrestok.ru" CATALOG_URL = f"{MAIN_SITE_URL}/api/customer/{CATALOG_VERSION}" # будет создана в __post_init__ session: HumanBrowser = field(init=False, repr=False) """Внутренняя сессия браузера для выполнения HTTP-запросов.""" # будет создано в warmup ctx: HumanContext = field(init=False, repr=False) """Внутренний контекст сессии браузера""" page: HumanPage = field(init=False, repr=False) """Внутренний страница сессии браузера""" unstandard_headers: dict[str, str] = field(init=False, repr=False) """Список нестандартных заголовков пойманных при инициализации""" Geolocation: ClassGeolocation = field(init=False) """API для работы с геолокацией.""" Catalog: ClassCatalog = field(init=False) """API для работы с каталогом товаров.""" Advertising: ClassAdvertising = field(init=False) """API для работы с рекламой.""" General: ClassGeneral = field(init=False) """API для работы с общими функциями.""" # ───── lifecycle ───── def __post_init__(self) -> None: self.Geolocation = ClassGeolocation(self) self.Catalog = ClassCatalog(self) self.Advertising = ClassAdvertising(self) self.General = ClassGeneral(self) async def __aenter__(self): """Вход в контекстный менеджер с автоматическим прогревом сессии.""" await self._warmup() return self # Прогрев сессии (headless ➜ cookie `session` ➜ accessToken) async def _warmup(self) -> None: """Прогрев сессии через браузер для получения человекоподобности.""" br = await AsyncCamoufox( headless=self.headless, proxy=Proxy(self.proxy).as_dict(), **self.browser_opts, ).start() self.session = HumanBrowser.replace(br) self.ctx = await self.session.new_context() self.page = await self.ctx.new_page() sniffer = HeaderAnomalySniffer( include_subresources=True, # или False, если интересны только документы url_filter=lambda u: u.startswith(self.CATALOG_URL), ) await sniffer.start(self.ctx) await self.page.goto(self.MAIN_SITE_URL, wait_until="networkidle") ok = False try_count = 3 while not ok or try_count <= 0: try_count -= 1 try: await self.page.wait_for_selector("#app", timeout=self.timeout_ms) ok = True except PWTimeoutError: await self.page.reload() if not ok: raise RuntimeError(self.page.content) if "session" not in list(map(lambda d: d["name"], await self.page.cookies())): raise RuntimeError("Cookie 'session' not found after warmup.") await sniffer.wait( tasks=[ WaitHeader( source=WaitSource.REQUEST, headers=["Auth"], ) ], timeout_ms=self.timeout_ms, ) result_sniffer = await sniffer.complete() # Результат: {заголовок: [уникальные значения]} result = defaultdict(set) # Проходим по всем URL в 'request' for _url, headers in result_sniffer["request"].items(): for header, values in headers.items(): result[header].update(values) # добавляем значения, set уберёт дубли # Преобразуем set обратно в list self.unstandard_headers = {k: list(v)[0] for k, v in result.items()} async def __aexit__(self, *exc): """Выход из контекстного менеджера с закрытием сессии.""" await self.close()
[docs] async def close(self): """Закрыть HTTP-сессию и освободить ресурсы.""" await self.session.close()
async def _request( self, method: HttpMethod, url: str, *, json_body: Any | None = None, add_unstandard_headers: bool = True, credentials: bool = True, ) -> FetchResponse: """Выполнить HTTP-запрос через внутреннюю сессию. Единая точка входа для всех HTTP-запросов библиотеки. Добавляет к ответу объект Request для совместимости. Args: method: HTTP метод (GET, POST, PUT, DELETE и т.д.) url: URL для запроса json_body: Тело запроса в формате JSON (опционально) """ # Единая точка входа в чужую библиотеку для удобства resp: FetchResponse = await self.page.fetch( url=url, method=method, body=json_body, mode="cors", credentials="include" if credentials else "omit", timeout_ms=self.timeout_ms, referrer=self.MAIN_SITE_URL, headers={"Accept": "application/json, text/plain, */*"} | (self.unstandard_headers if add_unstandard_headers else {}), ) return resp