- Frontend: Next.js 14 with TypeScript - Backend: FastAPI with SQLAlchemy - Agent: Carmodoo sync agent - Deployment: Docker Compose based staging/production setup - Scripts: Automated deployment with rollback support 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
357 lines
12 KiB
Python
357 lines
12 KiB
Python
"""
|
|
PDF Service for capturing web pages as PDF using Playwright
|
|
Used for capturing Korean vehicle performance check reports (성능점검기록부)
|
|
"""
|
|
import os
|
|
import asyncio
|
|
import logging
|
|
from pathlib import Path
|
|
from typing import Optional, List, Tuple
|
|
from datetime import datetime
|
|
import tempfile
|
|
|
|
# Configure logging
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# PDF generation failure log
|
|
PDF_FAILURES: List[dict] = [] # In-memory log of recent failures
|
|
|
|
# Playwright imports
|
|
try:
|
|
from playwright.async_api import async_playwright, Browser, Page
|
|
PLAYWRIGHT_AVAILABLE = True
|
|
except ImportError:
|
|
PLAYWRIGHT_AVAILABLE = False
|
|
print("Warning: Playwright not installed. PDF capture will not work.")
|
|
|
|
# Image to PDF imports
|
|
try:
|
|
import img2pdf
|
|
from PIL import Image
|
|
IMG2PDF_AVAILABLE = True
|
|
except ImportError:
|
|
IMG2PDF_AVAILABLE = False
|
|
print("Warning: img2pdf/pillow not installed. Image-based PDF will not work.")
|
|
|
|
# PDF storage directory
|
|
PDF_STORAGE_DIR = Path(__file__).parent.parent.parent / "uploads" / "performance_checks"
|
|
|
|
|
|
def ensure_pdf_directory():
|
|
"""Ensure PDF storage directory exists"""
|
|
PDF_STORAGE_DIR.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
def log_pdf_failure(car_id: int, check_num: str, error: str):
|
|
"""Log PDF generation failure"""
|
|
global PDF_FAILURES
|
|
failure = {
|
|
"car_id": car_id,
|
|
"check_num": check_num,
|
|
"error": str(error),
|
|
"timestamp": datetime.now().isoformat(),
|
|
"retried": False
|
|
}
|
|
PDF_FAILURES.append(failure)
|
|
# Keep only last 100 failures
|
|
if len(PDF_FAILURES) > 100:
|
|
PDF_FAILURES = PDF_FAILURES[-100:]
|
|
logger.error(f"PDF generation failed - car_id={car_id}, check_num={check_num}: {error}")
|
|
|
|
|
|
def get_pdf_failures() -> List[dict]:
|
|
"""Get list of recent PDF generation failures"""
|
|
return PDF_FAILURES.copy()
|
|
|
|
|
|
def clear_pdf_failure(car_id: int):
|
|
"""Clear failure record for a car after successful retry"""
|
|
global PDF_FAILURES
|
|
PDF_FAILURES = [f for f in PDF_FAILURES if f["car_id"] != car_id]
|
|
|
|
|
|
async def capture_performance_check_pdf(
|
|
check_num: str,
|
|
car_id: int,
|
|
timeout: int = 60000,
|
|
max_retries: int = 3,
|
|
retry_delay: int = 2
|
|
) -> Optional[str]:
|
|
"""
|
|
Capture Korean vehicle performance check report as PDF
|
|
Uses screenshot-based approach for accurate rendering
|
|
Includes automatic retry on failure
|
|
|
|
Args:
|
|
check_num: Performance check number (성능점검번호)
|
|
car_id: Car ID for naming the PDF file
|
|
timeout: Page load timeout in milliseconds
|
|
max_retries: Maximum number of retry attempts (default: 3)
|
|
retry_delay: Delay between retries in seconds (default: 2)
|
|
|
|
Returns:
|
|
PDF file path (relative) if successful, None if failed
|
|
"""
|
|
if not PLAYWRIGHT_AVAILABLE:
|
|
error_msg = "Playwright not available. Cannot capture PDF."
|
|
logger.error(error_msg)
|
|
log_pdf_failure(car_id, check_num, error_msg)
|
|
return None
|
|
|
|
if not IMG2PDF_AVAILABLE:
|
|
error_msg = "img2pdf/pillow not available. Cannot create PDF from screenshots."
|
|
logger.error(error_msg)
|
|
log_pdf_failure(car_id, check_num, error_msg)
|
|
return None
|
|
|
|
ensure_pdf_directory()
|
|
|
|
last_error = None
|
|
|
|
for attempt in range(1, max_retries + 1):
|
|
# 별도 스레드에서 새 이벤트 루프로 실행하여 uvicorn과의 충돌 방지
|
|
try:
|
|
result = await asyncio.get_event_loop().run_in_executor(
|
|
None,
|
|
_capture_pdf_in_new_loop,
|
|
check_num, car_id, timeout, attempt
|
|
)
|
|
if result:
|
|
# Success - clear any previous failure record
|
|
clear_pdf_failure(car_id)
|
|
return result
|
|
except Exception as e:
|
|
logger.error(f"PDF capture attempt {attempt} failed: {e}")
|
|
|
|
if attempt < max_retries:
|
|
logger.warning(f"PDF capture attempt {attempt}/{max_retries} failed for car_id={car_id}, retrying in {retry_delay}s...")
|
|
await asyncio.sleep(retry_delay)
|
|
|
|
# All retries failed
|
|
log_pdf_failure(car_id, check_num, f"Failed after {max_retries} attempts")
|
|
return None
|
|
|
|
|
|
def _capture_pdf_in_new_loop(check_num: str, car_id: int, timeout: int, attempt: int) -> Optional[str]:
|
|
"""별도 이벤트 루프에서 PDF 캡처 실행"""
|
|
import asyncio
|
|
|
|
# 새 이벤트 루프 생성
|
|
loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(loop)
|
|
try:
|
|
result = loop.run_until_complete(_capture_pdf_single_attempt(check_num, car_id, timeout, attempt))
|
|
return result
|
|
finally:
|
|
loop.close()
|
|
|
|
|
|
async def _capture_pdf_single_attempt(
|
|
check_num: str,
|
|
car_id: int,
|
|
timeout: int,
|
|
attempt: int
|
|
) -> Optional[str]:
|
|
"""Single attempt to capture PDF"""
|
|
print(f"[PDF] _capture_pdf_single_attempt: car_id={car_id}, check_num={check_num}, attempt={attempt}")
|
|
ensure_pdf_directory()
|
|
|
|
# Performance check URL from carmodoo
|
|
url = f"https://ck.carmodoo.com/carCheck/carmodooPrint.do?print=0&checkNum={check_num}"
|
|
print(f"[PDF] URL: {url}")
|
|
|
|
# PDF filename: car_id_timestamp.pdf
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
pdf_filename = f"{car_id}_{timestamp}.pdf"
|
|
pdf_path = PDF_STORAGE_DIR / pdf_filename
|
|
relative_path = f"/uploads/performance_checks/{pdf_filename}"
|
|
print(f"[PDF] Output path: {pdf_path}")
|
|
|
|
temp_images: List[Path] = []
|
|
browser = None
|
|
|
|
try:
|
|
print(f"[PDF] Launching playwright...")
|
|
async with async_playwright() as p:
|
|
# Launch browser (headless mode) with extended timeout
|
|
print(f"[PDF] Launching chromium...")
|
|
browser: Browser = await p.chromium.launch(
|
|
headless=True,
|
|
timeout=30000, # 30 second browser launch timeout
|
|
args=[
|
|
'--no-sandbox',
|
|
'--disable-setuid-sandbox',
|
|
'--disable-dev-shm-usage',
|
|
'--disable-gpu',
|
|
'--disable-extensions',
|
|
'--disable-background-networking',
|
|
'--single-process' # Use single process for stability
|
|
]
|
|
)
|
|
print(f"[PDF] Browser launched")
|
|
|
|
# Create new page - narrower viewport for larger content
|
|
context = await browser.new_context(
|
|
locale='ko-KR',
|
|
viewport={'width': 900, 'height': 800},
|
|
device_scale_factor=2 # High DPI for better quality
|
|
)
|
|
page: Page = await context.new_page()
|
|
print(f"[PDF] Page created, navigating to URL...")
|
|
|
|
# Navigate to performance check page
|
|
await page.goto(url, wait_until='networkidle', timeout=timeout)
|
|
print(f"[PDF] Navigation complete")
|
|
|
|
# Wait for content to fully load
|
|
await page.wait_for_timeout(3000)
|
|
print(f"[PDF] Content loaded, taking screenshot...")
|
|
|
|
# Get full page dimensions
|
|
page_height = await page.evaluate("document.documentElement.scrollHeight")
|
|
page_width = await page.evaluate("document.documentElement.scrollWidth")
|
|
|
|
print(f"Page size: {page_width}x{page_height}")
|
|
|
|
# Take single full-page screenshot (no page splits)
|
|
screenshot_path = PDF_STORAGE_DIR / f"temp_{car_id}_full.png"
|
|
await page.screenshot(
|
|
path=str(screenshot_path),
|
|
full_page=True
|
|
)
|
|
temp_images.append(screenshot_path)
|
|
print(f"Captured full page screenshot")
|
|
|
|
await browser.close()
|
|
|
|
# Convert screenshots to PDF
|
|
if temp_images:
|
|
print(f"Converting {len(temp_images)} images to PDF...")
|
|
|
|
# Process images for A4 size
|
|
processed_images = []
|
|
for img_path in temp_images:
|
|
# Open and convert to RGB (required for PDF)
|
|
with Image.open(img_path) as img:
|
|
if img.mode in ('RGBA', 'P'):
|
|
img = img.convert('RGB')
|
|
|
|
# Save as temporary JPEG for better compression
|
|
temp_jpg = img_path.with_suffix('.jpg')
|
|
img.save(temp_jpg, 'JPEG', quality=95)
|
|
processed_images.append(temp_jpg)
|
|
|
|
# Create PDF with margins (25mm left/right, 30mm top/bottom)
|
|
margin_lr_mm = 25 # left/right margin
|
|
margin_tb_mm = 30 # top/bottom margin
|
|
|
|
# Get image dimensions to calculate page size
|
|
with Image.open(processed_images[0]) as img:
|
|
img_width_px, img_height_px = img.size
|
|
|
|
# Convert image pixels to points (assuming 150 DPI for reasonable size)
|
|
dpi = 150
|
|
img_width_pt = img_width_px * 72 / dpi
|
|
img_height_pt = img_height_px * 72 / dpi
|
|
|
|
# Page size = image size + margins
|
|
page_width_pt = img_width_pt + 2 * img2pdf.mm_to_pt(margin_lr_mm)
|
|
page_height_pt = img_height_pt + 2 * img2pdf.mm_to_pt(margin_tb_mm)
|
|
|
|
with open(pdf_path, 'wb') as f:
|
|
pdf_bytes = img2pdf.convert(
|
|
[str(img) for img in processed_images],
|
|
layout_fun=img2pdf.get_layout_fun(
|
|
pagesize=(page_width_pt, page_height_pt),
|
|
border=(img2pdf.mm_to_pt(margin_lr_mm), img2pdf.mm_to_pt(margin_tb_mm),
|
|
img2pdf.mm_to_pt(margin_lr_mm), img2pdf.mm_to_pt(margin_tb_mm)),
|
|
fit=img2pdf.FitMode.into
|
|
)
|
|
)
|
|
f.write(pdf_bytes)
|
|
|
|
# Cleanup temporary files
|
|
for img_path in temp_images:
|
|
if img_path.exists():
|
|
img_path.unlink()
|
|
for img_path in processed_images:
|
|
if img_path.exists():
|
|
img_path.unlink()
|
|
|
|
# Verify PDF was created
|
|
if pdf_path.exists() and pdf_path.stat().st_size > 0:
|
|
logger.info(f"PDF captured successfully (attempt {attempt}): {pdf_path}")
|
|
return relative_path
|
|
else:
|
|
logger.warning(f"PDF file not created or empty: {pdf_path}")
|
|
return None
|
|
|
|
except Exception as e:
|
|
import traceback
|
|
error_trace = traceback.format_exc()
|
|
logger.error(f"Error capturing PDF for check_num={check_num} (attempt {attempt}): {e}\n{error_trace}")
|
|
print(f"[PDF] ERROR: {e}\n{error_trace}")
|
|
# Cleanup on error
|
|
for img_path in temp_images:
|
|
if img_path.exists():
|
|
img_path.unlink()
|
|
return None
|
|
|
|
|
|
def capture_performance_check_pdf_sync(check_num: str, car_id: int) -> Optional[str]:
|
|
"""
|
|
Synchronous wrapper for capture_performance_check_pdf
|
|
For use in non-async contexts
|
|
"""
|
|
try:
|
|
loop = asyncio.get_event_loop()
|
|
except RuntimeError:
|
|
loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(loop)
|
|
|
|
return loop.run_until_complete(capture_performance_check_pdf(check_num, car_id))
|
|
|
|
|
|
def get_pdf_path(car_id: int) -> Optional[str]:
|
|
"""
|
|
Get existing PDF path for a car if it exists
|
|
Returns the most recent PDF for the car
|
|
"""
|
|
ensure_pdf_directory()
|
|
|
|
# Find all PDFs for this car
|
|
pattern = f"{car_id}_*.pdf"
|
|
pdf_files = list(PDF_STORAGE_DIR.glob(pattern))
|
|
|
|
if not pdf_files:
|
|
return None
|
|
|
|
# Return the most recent one
|
|
latest_pdf = max(pdf_files, key=lambda p: p.stat().st_mtime)
|
|
return f"/uploads/performance_checks/{latest_pdf.name}"
|
|
|
|
|
|
def delete_pdf(relative_path: str) -> bool:
|
|
"""Delete a PDF file"""
|
|
try:
|
|
filename = Path(relative_path).name
|
|
full_path = PDF_STORAGE_DIR / filename
|
|
if full_path.exists():
|
|
full_path.unlink()
|
|
return True
|
|
return False
|
|
except Exception as e:
|
|
print(f"Error deleting PDF: {e}")
|
|
return False
|
|
|
|
|
|
def get_pdf_full_path(relative_path: str) -> Optional[Path]:
|
|
"""Get full filesystem path from relative path"""
|
|
if not relative_path:
|
|
return None
|
|
filename = Path(relative_path).name
|
|
full_path = PDF_STORAGE_DIR / filename
|
|
if full_path.exists():
|
|
return full_path
|
|
return None
|