""" Carmodoo API Client - HTTP based car data extraction """ import asyncio import re import logging from typing import Optional, Dict, List, Any from dataclasses import dataclass, field from datetime import datetime import aiohttp from lxml import etree @dataclass class CarmodooConfig: base_url: str = "https://dealer.carmodoo.com" check_url: str = "https://ck.carmodoo.com" encoding: str = "euc-kr" user_id: str = "" password: str = "" request_timeout: int = 30 request_delay: float = 0.5 max_retries: int = 3 retry_delay: int = 2 @dataclass class CarMaker: code: str name: str cho: str = "" @dataclass class CarModel: code: str name: str maker_code: str @dataclass class CarDetail: car_no: str car_name: str maker_code: str model_code: str year: int month: Optional[int] mileage: int price: int fuel: str transmission: str color: str displacement: int car_number: str seize_count: int collateral_count: int options: List[str] memo: str dealer_memo: str main_image: str images: List[str] thumbnails: List[str] check_num: str check_url: str check_gubun: str dealer_name: str dealer_phone: str shop_name: str shop_tel: str raw_data: Dict[str, Any] = field(default_factory=dict) class CarmodooClient: def __init__(self, config: CarmodooConfig): self.config = config self.logger = logging.getLogger('carmodoo') self.session: Optional[aiohttp.ClientSession] = None self.cookies: Dict[str, str] = {} self.is_logged_in = False self.last_session_refresh = None self.headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 6.2; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/106.0.0.0 Safari/537.36', 'Accept-Language': 'ko-KR,ko;q=0.9', } async def __aenter__(self): await self.create_session() return self async def __aexit__(self, exc_type, exc_val, exc_tb): await self.close() async def create_session(self): if self.session is None or self.session.closed: timeout = aiohttp.ClientTimeout(total=self.config.request_timeout) self.session = aiohttp.ClientSession(timeout=timeout, headers=self.headers) return self.session async def close(self): if self.session and not self.session.closed: await self.session.close() def _decode_response(self, content: bytes) -> str: try: return content.decode(self.config.encoding) except UnicodeDecodeError: try: return content.decode('utf-8') except UnicodeDecodeError: return content.decode('latin-1') def _clean_xml_bytes(self, content: bytes) -> bytes: try: text = content.decode(self.config.encoding) except UnicodeDecodeError: try: text = content.decode('utf-8') except UnicodeDecodeError: text = content.decode('latin-1') text = re.sub(r'^[0-9a-fA-F]+\r?\n', '', text, flags=re.MULTILINE) text = text.strip() if not text.startswith(' 0: text = text[xml_start:] text = re.sub(r'encoding=["\'][^"\']*["\']', 'encoding="UTF-8"', text) return text.encode('utf-8') async def _request(self, method: str, url: str, **kwargs): await self.create_session() for attempt in range(self.config.max_retries): try: if attempt > 0: await asyncio.sleep(self.config.retry_delay) async with self.session.request(method, url, **kwargs) as response: content = await response.read() return response.status, content, dict(response.cookies) except aiohttp.ClientError as e: self.logger.warning(f"Request failed (attempt {attempt + 1}): {e}") if attempt == self.config.max_retries - 1: raise raise Exception("Max retries exceeded") async def login(self, user_id: Optional[str] = None, password: Optional[str] = None) -> bool: user_id = user_id or self.config.user_id password = password or self.config.password if not user_id or not password: self.logger.error("User ID and password are required") return False self.logger.info(f"Attempting login for user: {user_id}") url = f"{self.config.base_url}/member/login_ok.html" data = { 'prevURL': '', 'id': user_id, 'passwd': password, 'idSave': 'Y', 'button': 'LOGIN' } headers = { **self.headers, 'Content-Type': 'application/x-www-form-urlencoded', 'Origin': self.config.base_url, 'Referer': f'{self.config.base_url}/member/login_v2.html', } try: status, content, cookies = await self._request( 'POST', url, data=data, headers=headers, allow_redirects=False ) text = self._decode_response(content) if 'goMain' in text or 'PHPSESSID' in str(cookies): self.cookies.update(cookies) self.is_logged_in = True self.last_session_refresh = datetime.now() self.logger.info("Login successful") return True else: self.logger.error("Login failed: unexpected response") return False except Exception as e: self.logger.error(f"Login error: {e}") return False async def get_car_makers(self) -> List[CarMaker]: url = f"{self.config.base_url}/common/ajax/AutoDBCode.html" params = {'mode': 'getCarInit', 'ctl': 'car'} headers = { **self.headers, 'Accept': 'application/xml, text/xml, */*; q=0.01', 'X-Requested-With': 'XMLHttpRequest', } try: status, content, _ = await self._request( 'GET', url, params=params, headers=headers, cookies=self.cookies ) if status != 200: return [] return self._parse_car_makers(content) except Exception as e: self.logger.error(f"Error getting car makers: {e}") return [] def _parse_car_makers(self, content: bytes) -> List[CarMaker]: makers = [] try: xml_bytes = self._clean_xml_bytes(content) root = etree.fromstring(xml_bytes) for item in root.findall('.//item'): key = item.findtext('key', '') name = item.findtext('name', '') cho = item.findtext('cho', '') if key and name: makers.append(CarMaker(code=key, name=name, cho=cho)) except Exception as e: self.logger.error(f"Error parsing car makers: {e}") return makers async def get_car_models(self, maker_code: str) -> List[CarModel]: url = f"{self.config.base_url}/common/ajax/AutoDBCode.html" params = { 'mode': 'getCarModelInit', 'ctl': 'car', 'company': maker_code, 'selected': '', } headers = { **self.headers, 'Accept': 'application/xml, text/xml, */*; q=0.01', 'X-Requested-With': 'XMLHttpRequest', } try: status, content, _ = await self._request( 'GET', url, params=params, headers=headers, cookies=self.cookies ) if status != 200: return [] return self._parse_car_models(content, maker_code) except Exception as e: self.logger.error(f"Error getting car models: {e}") return [] def _parse_car_models(self, content: bytes, maker_code: str) -> List[CarModel]: models = [] try: xml_bytes = self._clean_xml_bytes(content) root = etree.fromstring(xml_bytes) for item in root.findall('.//item'): key = item.findtext('key', '') name = item.findtext('name', '') if key and name: models.append(CarModel(code=key, name=name, maker_code=maker_code)) except Exception as e: self.logger.error(f"Error parsing car models: {e}") return models def get_image_url(self, car_no: str, index: int = 0) -> str: padded = car_no.zfill(9) folder = f"{padded[0:3]}/{padded[3:6]}/{padded[6:9]}" return f"{self.config.base_url}/data/__carPhoto/{folder}/cmcar_{index}.jpg"