Initial commit: AutonetSellCar platform with deployment system

- Frontend: Next.js 14 with TypeScript
- Backend: FastAPI with SQLAlchemy
- Agent: Carmodoo sync agent
- Deployment: Docker Compose based staging/production setup
- Scripts: Automated deployment with rollback support

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
AutonetSellCar Deploy
2025-12-30 13:24:39 +09:00
commit 1f0dcb1ddb
224 changed files with 55119 additions and 0 deletions

View File

@@ -0,0 +1,356 @@
"""
PDF Service for capturing web pages as PDF using Playwright
Used for capturing Korean vehicle performance check reports (성능점검기록부)
"""
import os
import asyncio
import logging
from pathlib import Path
from typing import Optional, List, Tuple
from datetime import datetime
import tempfile
# Configure logging
logger = logging.getLogger(__name__)
# PDF generation failure log
PDF_FAILURES: List[dict] = [] # In-memory log of recent failures
# Playwright imports
try:
from playwright.async_api import async_playwright, Browser, Page
PLAYWRIGHT_AVAILABLE = True
except ImportError:
PLAYWRIGHT_AVAILABLE = False
print("Warning: Playwright not installed. PDF capture will not work.")
# Image to PDF imports
try:
import img2pdf
from PIL import Image
IMG2PDF_AVAILABLE = True
except ImportError:
IMG2PDF_AVAILABLE = False
print("Warning: img2pdf/pillow not installed. Image-based PDF will not work.")
# PDF storage directory
PDF_STORAGE_DIR = Path(__file__).parent.parent.parent / "uploads" / "performance_checks"
def ensure_pdf_directory():
"""Ensure PDF storage directory exists"""
PDF_STORAGE_DIR.mkdir(parents=True, exist_ok=True)
def log_pdf_failure(car_id: int, check_num: str, error: str):
"""Log PDF generation failure"""
global PDF_FAILURES
failure = {
"car_id": car_id,
"check_num": check_num,
"error": str(error),
"timestamp": datetime.now().isoformat(),
"retried": False
}
PDF_FAILURES.append(failure)
# Keep only last 100 failures
if len(PDF_FAILURES) > 100:
PDF_FAILURES = PDF_FAILURES[-100:]
logger.error(f"PDF generation failed - car_id={car_id}, check_num={check_num}: {error}")
def get_pdf_failures() -> List[dict]:
"""Get list of recent PDF generation failures"""
return PDF_FAILURES.copy()
def clear_pdf_failure(car_id: int):
"""Clear failure record for a car after successful retry"""
global PDF_FAILURES
PDF_FAILURES = [f for f in PDF_FAILURES if f["car_id"] != car_id]
async def capture_performance_check_pdf(
check_num: str,
car_id: int,
timeout: int = 60000,
max_retries: int = 3,
retry_delay: int = 2
) -> Optional[str]:
"""
Capture Korean vehicle performance check report as PDF
Uses screenshot-based approach for accurate rendering
Includes automatic retry on failure
Args:
check_num: Performance check number (성능점검번호)
car_id: Car ID for naming the PDF file
timeout: Page load timeout in milliseconds
max_retries: Maximum number of retry attempts (default: 3)
retry_delay: Delay between retries in seconds (default: 2)
Returns:
PDF file path (relative) if successful, None if failed
"""
if not PLAYWRIGHT_AVAILABLE:
error_msg = "Playwright not available. Cannot capture PDF."
logger.error(error_msg)
log_pdf_failure(car_id, check_num, error_msg)
return None
if not IMG2PDF_AVAILABLE:
error_msg = "img2pdf/pillow not available. Cannot create PDF from screenshots."
logger.error(error_msg)
log_pdf_failure(car_id, check_num, error_msg)
return None
ensure_pdf_directory()
last_error = None
for attempt in range(1, max_retries + 1):
# 별도 스레드에서 새 이벤트 루프로 실행하여 uvicorn과의 충돌 방지
try:
result = await asyncio.get_event_loop().run_in_executor(
None,
_capture_pdf_in_new_loop,
check_num, car_id, timeout, attempt
)
if result:
# Success - clear any previous failure record
clear_pdf_failure(car_id)
return result
except Exception as e:
logger.error(f"PDF capture attempt {attempt} failed: {e}")
if attempt < max_retries:
logger.warning(f"PDF capture attempt {attempt}/{max_retries} failed for car_id={car_id}, retrying in {retry_delay}s...")
await asyncio.sleep(retry_delay)
# All retries failed
log_pdf_failure(car_id, check_num, f"Failed after {max_retries} attempts")
return None
def _capture_pdf_in_new_loop(check_num: str, car_id: int, timeout: int, attempt: int) -> Optional[str]:
"""별도 이벤트 루프에서 PDF 캡처 실행"""
import asyncio
# 새 이벤트 루프 생성
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
result = loop.run_until_complete(_capture_pdf_single_attempt(check_num, car_id, timeout, attempt))
return result
finally:
loop.close()
async def _capture_pdf_single_attempt(
check_num: str,
car_id: int,
timeout: int,
attempt: int
) -> Optional[str]:
"""Single attempt to capture PDF"""
print(f"[PDF] _capture_pdf_single_attempt: car_id={car_id}, check_num={check_num}, attempt={attempt}")
ensure_pdf_directory()
# Performance check URL from carmodoo
url = f"https://ck.carmodoo.com/carCheck/carmodooPrint.do?print=0&checkNum={check_num}"
print(f"[PDF] URL: {url}")
# PDF filename: car_id_timestamp.pdf
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
pdf_filename = f"{car_id}_{timestamp}.pdf"
pdf_path = PDF_STORAGE_DIR / pdf_filename
relative_path = f"/uploads/performance_checks/{pdf_filename}"
print(f"[PDF] Output path: {pdf_path}")
temp_images: List[Path] = []
browser = None
try:
print(f"[PDF] Launching playwright...")
async with async_playwright() as p:
# Launch browser (headless mode) with extended timeout
print(f"[PDF] Launching chromium...")
browser: Browser = await p.chromium.launch(
headless=True,
timeout=30000, # 30 second browser launch timeout
args=[
'--no-sandbox',
'--disable-setuid-sandbox',
'--disable-dev-shm-usage',
'--disable-gpu',
'--disable-extensions',
'--disable-background-networking',
'--single-process' # Use single process for stability
]
)
print(f"[PDF] Browser launched")
# Create new page - narrower viewport for larger content
context = await browser.new_context(
locale='ko-KR',
viewport={'width': 900, 'height': 800},
device_scale_factor=2 # High DPI for better quality
)
page: Page = await context.new_page()
print(f"[PDF] Page created, navigating to URL...")
# Navigate to performance check page
await page.goto(url, wait_until='networkidle', timeout=timeout)
print(f"[PDF] Navigation complete")
# Wait for content to fully load
await page.wait_for_timeout(3000)
print(f"[PDF] Content loaded, taking screenshot...")
# Get full page dimensions
page_height = await page.evaluate("document.documentElement.scrollHeight")
page_width = await page.evaluate("document.documentElement.scrollWidth")
print(f"Page size: {page_width}x{page_height}")
# Take single full-page screenshot (no page splits)
screenshot_path = PDF_STORAGE_DIR / f"temp_{car_id}_full.png"
await page.screenshot(
path=str(screenshot_path),
full_page=True
)
temp_images.append(screenshot_path)
print(f"Captured full page screenshot")
await browser.close()
# Convert screenshots to PDF
if temp_images:
print(f"Converting {len(temp_images)} images to PDF...")
# Process images for A4 size
processed_images = []
for img_path in temp_images:
# Open and convert to RGB (required for PDF)
with Image.open(img_path) as img:
if img.mode in ('RGBA', 'P'):
img = img.convert('RGB')
# Save as temporary JPEG for better compression
temp_jpg = img_path.with_suffix('.jpg')
img.save(temp_jpg, 'JPEG', quality=95)
processed_images.append(temp_jpg)
# Create PDF with margins (25mm left/right, 30mm top/bottom)
margin_lr_mm = 25 # left/right margin
margin_tb_mm = 30 # top/bottom margin
# Get image dimensions to calculate page size
with Image.open(processed_images[0]) as img:
img_width_px, img_height_px = img.size
# Convert image pixels to points (assuming 150 DPI for reasonable size)
dpi = 150
img_width_pt = img_width_px * 72 / dpi
img_height_pt = img_height_px * 72 / dpi
# Page size = image size + margins
page_width_pt = img_width_pt + 2 * img2pdf.mm_to_pt(margin_lr_mm)
page_height_pt = img_height_pt + 2 * img2pdf.mm_to_pt(margin_tb_mm)
with open(pdf_path, 'wb') as f:
pdf_bytes = img2pdf.convert(
[str(img) for img in processed_images],
layout_fun=img2pdf.get_layout_fun(
pagesize=(page_width_pt, page_height_pt),
border=(img2pdf.mm_to_pt(margin_lr_mm), img2pdf.mm_to_pt(margin_tb_mm),
img2pdf.mm_to_pt(margin_lr_mm), img2pdf.mm_to_pt(margin_tb_mm)),
fit=img2pdf.FitMode.into
)
)
f.write(pdf_bytes)
# Cleanup temporary files
for img_path in temp_images:
if img_path.exists():
img_path.unlink()
for img_path in processed_images:
if img_path.exists():
img_path.unlink()
# Verify PDF was created
if pdf_path.exists() and pdf_path.stat().st_size > 0:
logger.info(f"PDF captured successfully (attempt {attempt}): {pdf_path}")
return relative_path
else:
logger.warning(f"PDF file not created or empty: {pdf_path}")
return None
except Exception as e:
import traceback
error_trace = traceback.format_exc()
logger.error(f"Error capturing PDF for check_num={check_num} (attempt {attempt}): {e}\n{error_trace}")
print(f"[PDF] ERROR: {e}\n{error_trace}")
# Cleanup on error
for img_path in temp_images:
if img_path.exists():
img_path.unlink()
return None
def capture_performance_check_pdf_sync(check_num: str, car_id: int) -> Optional[str]:
"""
Synchronous wrapper for capture_performance_check_pdf
For use in non-async contexts
"""
try:
loop = asyncio.get_event_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
return loop.run_until_complete(capture_performance_check_pdf(check_num, car_id))
def get_pdf_path(car_id: int) -> Optional[str]:
"""
Get existing PDF path for a car if it exists
Returns the most recent PDF for the car
"""
ensure_pdf_directory()
# Find all PDFs for this car
pattern = f"{car_id}_*.pdf"
pdf_files = list(PDF_STORAGE_DIR.glob(pattern))
if not pdf_files:
return None
# Return the most recent one
latest_pdf = max(pdf_files, key=lambda p: p.stat().st_mtime)
return f"/uploads/performance_checks/{latest_pdf.name}"
def delete_pdf(relative_path: str) -> bool:
"""Delete a PDF file"""
try:
filename = Path(relative_path).name
full_path = PDF_STORAGE_DIR / filename
if full_path.exists():
full_path.unlink()
return True
return False
except Exception as e:
print(f"Error deleting PDF: {e}")
return False
def get_pdf_full_path(relative_path: str) -> Optional[Path]:
"""Get full filesystem path from relative path"""
if not relative_path:
return None
filename = Path(relative_path).name
full_path = PDF_STORAGE_DIR / filename
if full_path.exists():
return full_path
return None