- Frontend: Next.js 14 with TypeScript - Backend: FastAPI with SQLAlchemy - Agent: Carmodoo sync agent - Deployment: Docker Compose based staging/production setup - Scripts: Automated deployment with rollback support 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
365 lines
14 KiB
Python
365 lines
14 KiB
Python
"""
|
|
Specification Service for fetching vehicle specifications from AUTOBEGINS via Carmodoo
|
|
Uses Playwright to interact with the dealer portal and call AUTOBEGINS API
|
|
"""
|
|
import os
|
|
import re
|
|
import asyncio
|
|
import logging
|
|
import json
|
|
from typing import Optional, Dict, Any
|
|
from dataclasses import dataclass, field
|
|
|
|
# Configure logging
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Playwright imports
|
|
try:
|
|
from playwright.async_api import async_playwright, Browser, Page
|
|
PLAYWRIGHT_AVAILABLE = True
|
|
except ImportError:
|
|
PLAYWRIGHT_AVAILABLE = False
|
|
logger.warning("Playwright not installed. Specification lookup will not work.")
|
|
|
|
# Carmodoo credentials
|
|
CARMODOO_BASE_URL = "https://dealer.carmodoo.com"
|
|
CARMODOO_USER_ID = os.getenv("CARMODOO_USER_ID", "01033315258")
|
|
CARMODOO_PASSWORD = os.getenv("CARMODOO_PASSWORD", "alskfl@1122")
|
|
|
|
|
|
@dataclass
|
|
class CarSpecification:
|
|
"""Vehicle specification data from AUTOBEGINS"""
|
|
car_number: str = ""
|
|
manufacturer: str = ""
|
|
model_name: str = ""
|
|
grade: str = ""
|
|
model_year: str = ""
|
|
first_registration: str = ""
|
|
body_type: str = ""
|
|
transmission: str = ""
|
|
fuel_type: str = ""
|
|
displacement: int = 0
|
|
color: str = ""
|
|
mileage: int = 0
|
|
usage: str = ""
|
|
vin: str = ""
|
|
inspection_validity: str = ""
|
|
|
|
# Price info (in 만원)
|
|
release_price: int = 0
|
|
base_price: int = 0
|
|
option_price: int = 0
|
|
|
|
# Mortgage/Seizure
|
|
mortgage_count: int = 0
|
|
seizure_count: int = 0
|
|
|
|
# Options
|
|
standard_options: list = field(default_factory=list)
|
|
selected_options: list = field(default_factory=list)
|
|
|
|
# Raw data
|
|
raw_data: dict = field(default_factory=dict)
|
|
|
|
|
|
def _parse_spec_html(html: str, car_number: str) -> CarSpecification:
|
|
"""Parse HTML content from AUTOBEGINS search.html to extract specification data"""
|
|
spec = CarSpecification(car_number=car_number)
|
|
spec.raw_data = {"html_length": len(html)}
|
|
|
|
try:
|
|
# Manufacturer and Model (from logo and text)
|
|
model_match = re.search(r'<ul class="model">\s*<li>.*?([^>]+)<br>([^<]+)</li>', html, re.DOTALL)
|
|
if model_match:
|
|
# Extract manufacturer from text before <br>
|
|
maker_text = model_match.group(1).strip()
|
|
maker_clean = re.sub(r'<[^>]+>', '', maker_text).strip()
|
|
spec.manufacturer = maker_clean
|
|
|
|
# Model name after <br>
|
|
spec.model_name = model_match.group(2).strip()
|
|
|
|
# Alternative manufacturer detection
|
|
if not spec.manufacturer:
|
|
maker_patterns = ['기아', '현대', 'KG모빌리티', '쌍용', '르노', '쉐보레', 'BMW', '벤츠', '아우디', '볼보', '렉서스', '토요타']
|
|
for maker in maker_patterns:
|
|
if maker in html:
|
|
spec.manufacturer = maker
|
|
break
|
|
|
|
# Year (년형)
|
|
year_match = re.search(r'<th>년형</th>\s*<td>(\d{4})년</td>', html)
|
|
if year_match:
|
|
spec.model_year = year_match.group(1)
|
|
|
|
# First registration (최초등록일)
|
|
reg_match = re.search(r'<th>최초등록일</th>\s*<td>(\d{4}\.\d{2}\.\d{2})</td>', html)
|
|
if reg_match:
|
|
spec.first_registration = reg_match.group(1)
|
|
|
|
# Body type (외형)
|
|
body_match = re.search(r'<th>외형</th>\s*<td>([^<]+)</td>', html)
|
|
if body_match:
|
|
spec.body_type = body_match.group(1).strip()
|
|
|
|
# Transmission (미션)
|
|
trans_match = re.search(r'<th>미션</th>\s*<td>([^<]+)</td>', html)
|
|
if trans_match:
|
|
spec.transmission = trans_match.group(1).strip()
|
|
|
|
# Fuel type (연료)
|
|
fuel_match = re.search(r'<th>연료</th>\s*<td>([^<]+)</td>', html)
|
|
if fuel_match:
|
|
spec.fuel_type = fuel_match.group(1).strip()
|
|
|
|
# Displacement (배기량)
|
|
disp_match = re.search(r'<th>배기량</th>\s*<td>(\d+)cc</td>', html)
|
|
if disp_match:
|
|
spec.displacement = int(disp_match.group(1))
|
|
|
|
# Color (색상)
|
|
color_match = re.search(r'<th>색상</th>\s*<td>([^<]+)</td>', html)
|
|
if color_match:
|
|
spec.color = color_match.group(1).strip()
|
|
|
|
# Mileage (주행거리)
|
|
mileage_match = re.search(r'<th>주행거리</th>\s*<td>([\d,]+)km</td>', html)
|
|
if mileage_match:
|
|
spec.mileage = int(mileage_match.group(1).replace(',', ''))
|
|
|
|
# Usage (용도)
|
|
usage_match = re.search(r'<th>용도</th>\s*<td>([^<]+)</td>', html)
|
|
if usage_match:
|
|
spec.usage = usage_match.group(1).strip()
|
|
|
|
# VIN (차대번호)
|
|
vin_match = re.search(r'value="([A-Z0-9]{17})"', html)
|
|
if vin_match:
|
|
spec.vin = vin_match.group(1)
|
|
|
|
# Inspection validity (검사유효기간)
|
|
insp_match = re.search(r'<th>검사유효기간</th>\s*<td[^>]*>([^<]+)</td>', html)
|
|
if insp_match:
|
|
spec.inspection_validity = insp_match.group(1).strip()
|
|
|
|
# Price extraction - digit_area contains nested spans with hidden digits
|
|
# Format: <span class="digit_area red">...<span class="hide">1</span>...<span class="hide">4</span>...</span>
|
|
def extract_price_from_section(section_html):
|
|
"""Extract price from a section of HTML containing digit_area spans"""
|
|
digits = re.findall(r'<span class="hide">([0-9])</span>', section_html)
|
|
if digits:
|
|
try:
|
|
return int(''.join(digits))
|
|
except:
|
|
pass
|
|
return 0
|
|
|
|
# Release price (출고가) - find the whole price_table row
|
|
release_section = re.search(r'출고가.*?</td>', html, re.DOTALL)
|
|
if release_section:
|
|
spec.release_price = extract_price_from_section(release_section.group(0))
|
|
|
|
# Base price (기본가)
|
|
base_section = re.search(r'>기본가<.*?</td>', html, re.DOTALL)
|
|
if base_section:
|
|
spec.base_price = extract_price_from_section(base_section.group(0))
|
|
|
|
# Option price (출고시 옵션가)
|
|
option_section = re.search(r'출고시 옵션가.*?</td>', html, re.DOTALL)
|
|
if option_section:
|
|
spec.option_price = extract_price_from_section(option_section.group(0))
|
|
|
|
# Mortgage/Seizure (저당/압류)
|
|
mortgage_match = re.search(r'<span class="title_big">저당</span>\s*<strong[^>]*>(\d+)</strong>', html)
|
|
if mortgage_match:
|
|
spec.mortgage_count = int(mortgage_match.group(1))
|
|
|
|
seizure_match = re.search(r'<span class="title_big">압류</span>\s*<strong[^>]*>(\d+)</strong>', html)
|
|
if seizure_match:
|
|
spec.seizure_count = int(seizure_match.group(1))
|
|
|
|
# Standard options (기본품목)
|
|
std_opts = re.findall(r'<ul class="opt_base">.*?</ul>', html, re.DOTALL)
|
|
if std_opts:
|
|
spec.standard_options = re.findall(r'<span>([^<]+)</span>', std_opts[0])
|
|
|
|
# Selected options (선택품목)
|
|
sel_opts = re.findall(r'<li><span>([^<]+)</span>\s*<strong>([^<]+)</strong></li>', html)
|
|
spec.selected_options = [f"{name} ({price})" for name, price in sel_opts]
|
|
|
|
logger.info(f"Parsed spec for {car_number}: {spec.manufacturer} {spec.model_name}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error parsing spec HTML: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
|
|
return spec
|
|
|
|
|
|
async def get_specifications_from_carmodoo(car_number: str, timeout: int = 60000) -> Optional[CarSpecification]:
|
|
"""
|
|
Fetch vehicle specifications from AUTOBEGINS via Carmodoo dealer portal
|
|
|
|
Args:
|
|
car_number: Korean license plate number (e.g., "117더3590")
|
|
timeout: Maximum wait time in milliseconds
|
|
|
|
Returns:
|
|
CarSpecification object or None if not found
|
|
"""
|
|
if not PLAYWRIGHT_AVAILABLE:
|
|
logger.error("Playwright not available for specification lookup")
|
|
return None
|
|
|
|
if not car_number or len(car_number) < 7:
|
|
logger.error(f"Invalid car number: {car_number}")
|
|
return None
|
|
|
|
try:
|
|
async with async_playwright() as p:
|
|
browser = await p.chromium.launch(headless=True)
|
|
page = await browser.new_page()
|
|
|
|
try:
|
|
# Login to Carmodoo
|
|
logger.info("Logging in to Carmodoo...")
|
|
await page.goto(f"{CARMODOO_BASE_URL}/member/login_v2.html", timeout=timeout)
|
|
await page.fill('input[name="id"]', CARMODOO_USER_ID)
|
|
await page.fill('input[name="passwd"]', CARMODOO_PASSWORD)
|
|
await page.click('input[value="LOGIN"]')
|
|
await page.wait_for_timeout(3000)
|
|
|
|
# Navigate to spec search page
|
|
logger.info("Navigating to spec search...")
|
|
await page.goto(f"{CARMODOO_BASE_URL}/info/search_ab.html", timeout=timeout)
|
|
await page.wait_for_timeout(3000)
|
|
|
|
# Find the AUTOBEGINS iframe
|
|
target_frame = None
|
|
for frame in page.frames:
|
|
if 'autobegins.com/cp/?k=' in frame.url:
|
|
target_frame = frame
|
|
break
|
|
|
|
if not target_frame:
|
|
logger.error("Could not find AUTOBEGINS frame")
|
|
return None
|
|
|
|
# Get OTP values
|
|
otp = await target_frame.evaluate("document.getElementById('otp').value")
|
|
next_otp = await target_frame.evaluate("document.getElementById('nextOtp').value")
|
|
|
|
logger.info(f"Calling AUTOBEGINS API for: {car_number}")
|
|
|
|
# Call the API directly
|
|
api_result = await target_frame.evaluate("""
|
|
async (params) => {
|
|
const { carNum, otp, nextOtp } = params;
|
|
try {
|
|
const response = await fetch('/ext/gg1_ab.php', {
|
|
method: 'POST',
|
|
headers: {
|
|
'Content-Type': 'application/x-www-form-urlencoded',
|
|
},
|
|
body: `mode=search&carNum=${encodeURIComponent(carNum)}&otp=${otp}&nextOtp=${nextOtp}`
|
|
});
|
|
const text = await response.text();
|
|
return { success: true, data: text };
|
|
} catch (e) {
|
|
return { success: false, error: e.message };
|
|
}
|
|
}
|
|
""", {"carNum": car_number, "otp": otp, "nextOtp": next_otp})
|
|
|
|
if not api_result.get('success'):
|
|
logger.error(f"API call failed: {api_result.get('error')}")
|
|
return None
|
|
|
|
# Parse API response
|
|
try:
|
|
data = json.loads(api_result['data'])
|
|
except json.JSONDecodeError:
|
|
logger.error(f"Failed to parse API response")
|
|
return None
|
|
|
|
rst_code = data.get('rst_code')
|
|
if rst_code != 1:
|
|
rst_msg = data.get('rst_msg', 'Unknown error')
|
|
logger.warning(f"AUTOBEGINS API returned: {rst_msg} (code: {rst_code})")
|
|
return None
|
|
|
|
# Get search result
|
|
sd_key = data.get('sdKey')
|
|
sd_type = data.get('sdType')
|
|
|
|
if sd_type not in [2, 3]:
|
|
logger.warning(f"Unexpected sdType: {sd_type}")
|
|
return None
|
|
|
|
# Navigate to result page
|
|
page_name = 'search_yet.html' if sd_type == 2 else 'search.html'
|
|
result_url = f'/cp/{page_name}?otp={otp}&nextOtp={next_otp}&S_SDDATA={sd_key}'
|
|
|
|
await target_frame.evaluate(f"""
|
|
document.getElementById('searchIFrame').src = '{result_url}';
|
|
""")
|
|
|
|
logger.info("Waiting for result page to load...")
|
|
await page.wait_for_timeout(8000)
|
|
|
|
# Find and read result frame
|
|
result_content = None
|
|
for frame in page.frames:
|
|
if page_name in frame.url and 'S_SDDATA' in frame.url:
|
|
result_content = await frame.content()
|
|
break
|
|
|
|
if not result_content:
|
|
logger.error("Could not find result frame content")
|
|
return None
|
|
|
|
# Parse the HTML
|
|
spec = _parse_spec_html(result_content, car_number)
|
|
logger.info(f"Successfully retrieved specs for {car_number}")
|
|
return spec
|
|
|
|
finally:
|
|
await browser.close()
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error fetching specifications for {car_number}: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
return None
|
|
|
|
|
|
def spec_to_dict(spec: CarSpecification) -> dict:
|
|
"""Convert CarSpecification to dictionary for database storage"""
|
|
return {
|
|
"car_number": spec.car_number,
|
|
"manufacturer": spec.manufacturer,
|
|
"model_name": spec.model_name,
|
|
"grade": spec.grade,
|
|
"model_year": spec.model_year,
|
|
"first_registration": spec.first_registration,
|
|
"body_type": spec.body_type,
|
|
"transmission": spec.transmission,
|
|
"fuel_type": spec.fuel_type,
|
|
"displacement": spec.displacement,
|
|
"color": spec.color,
|
|
"mileage": spec.mileage,
|
|
"usage": spec.usage,
|
|
"vin": spec.vin,
|
|
"inspection_validity": spec.inspection_validity,
|
|
"release_price": spec.release_price,
|
|
"base_price": spec.base_price,
|
|
"option_price": spec.option_price,
|
|
"mortgage_count": spec.mortgage_count,
|
|
"seizure_count": spec.seizure_count,
|
|
"standard_options": spec.standard_options,
|
|
"selected_options": spec.selected_options,
|
|
"raw_data": spec.raw_data,
|
|
}
|