Files
AutonetSellCar/backend/app/api/cars.py
AutonetSellCar Deploy 3bd1e49699 fix: Reorder API routes and update main page layout
- Move /makers/ and /models/ routes before /{car_id} to fix route conflict
- Center Request Vehicle button, move PromoPreference to the right

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-12 23:48:06 +09:00

419 lines
15 KiB
Python

from fastapi import APIRouter, Depends, HTTPException, Query, BackgroundTasks
from sqlalchemy.orm import Session, joinedload
from typing import Optional, List
from ..database import get_db
from ..models import Car, CarMaker, CarModel, CarImage, CarOption
from ..services.soldout_service import SoldoutChecker
from ..schemas import (
CarCreate, CarUpdate, CarResponse, CarListResponse,
CarMakerCreate, CarMakerResponse,
CarModelCreate, CarModelResponse,
)
router = APIRouter(prefix="/cars", tags=["cars"])
def car_to_response(car: Car) -> dict:
"""Convert Car model to response dict with computed final prices"""
return {
"id": car.id,
"source": car.source,
"source_id": car.source_id,
"car_name": car.car_name,
"year": car.year,
"month": car.month,
"mileage": car.mileage,
"price_krw": car.price_krw,
"margin_krw": car.margin_krw or 0,
"margin_mn": car.margin_mn or 0,
"final_price_krw": (car.price_krw or 0) + (car.margin_krw or 0),
"final_price_mn": (car.price_krw or 0) + (car.margin_mn or 0),
"price_usd": car.price_usd,
"is_displayed": car.is_displayed or False,
"is_banner": car.is_banner or False,
"soldout": car.soldout or False,
"fuel": car.fuel,
"transmission": car.transmission,
"color": car.color,
"displacement": car.displacement,
"car_number": car.car_number,
"seize_count": car.seize_count or 0,
"collateral_count": car.collateral_count or 0,
"check_num": car.check_num,
"dealer_name": car.dealer_name,
"dealer_description": car.dealer_description,
"status": car.status,
"created_at": car.created_at,
"updated_at": car.updated_at,
"maker": car.maker,
"model": car.model,
"images": car.images,
"specification": car.specification,
}
@router.get("", response_model=CarListResponse)
def get_cars(
page: int = Query(1, ge=1),
page_size: int = Query(20, ge=1, le=100),
maker_id: Optional[int] = None,
model_id: Optional[int] = None,
year_min: Optional[int] = None,
year_max: Optional[int] = None,
price_min: Optional[int] = None,
price_max: Optional[int] = None,
mileage_max: Optional[int] = None,
fuel: Optional[str] = None,
status: Optional[str] = None,
is_displayed: Optional[bool] = None,
admin: bool = Query(False, description="Admin mode - show all cars"),
db: Session = Depends(get_db),
):
"""차량 목록 조회"""
# Base query for filtering (without eager loading for count)
base_query = db.query(Car)
# For non-admin (user-facing), only show displayed cars
if not admin:
base_query = base_query.filter(Car.is_displayed == True)
# status 필터 (None이면 전체 조회)
if status:
base_query = base_query.filter(Car.status == status)
# is_displayed 필터 (admin mode에서만 의미있음)
if is_displayed is not None and admin:
base_query = base_query.filter(Car.is_displayed == is_displayed)
if maker_id:
base_query = base_query.filter(Car.maker_id == maker_id)
if model_id:
base_query = base_query.filter(Car.model_id == model_id)
if year_min:
base_query = base_query.filter(Car.year >= year_min)
if year_max:
base_query = base_query.filter(Car.year <= year_max)
if price_min:
base_query = base_query.filter(Car.price_krw >= price_min)
if price_max:
base_query = base_query.filter(Car.price_krw <= price_max)
if mileage_max:
base_query = base_query.filter(Car.mileage <= mileage_max)
if fuel:
base_query = base_query.filter(Car.fuel == fuel)
total = base_query.count()
# Add eager loading for actual data fetch
cars = base_query.options(
joinedload(Car.maker),
joinedload(Car.model),
joinedload(Car.images)
).order_by(Car.created_at.desc()).offset((page - 1) * page_size).limit(page_size).all()
# Convert to response with computed fields
cars_response = [car_to_response(car) for car in cars]
return CarListResponse(
total=total,
page=page,
page_size=page_size,
cars=cars_response
)
# Makers - Must be defined before /{car_id} to avoid route conflict
@router.get("/makers/", response_model=List[CarMakerResponse])
def get_makers(db: Session = Depends(get_db)):
"""제조사 목록 조회"""
return db.query(CarMaker).all()
@router.post("/makers/", response_model=CarMakerResponse)
def create_maker(maker_data: CarMakerCreate, db: Session = Depends(get_db)):
"""제조사 등록"""
existing = db.query(CarMaker).filter(CarMaker.code == maker_data.code).first()
if existing:
return existing
maker = CarMaker(**maker_data.dict())
db.add(maker)
db.commit()
db.refresh(maker)
return maker
# Models - Must be defined before /{car_id} to avoid route conflict
@router.get("/models/", response_model=List[CarModelResponse])
def get_models(maker_id: Optional[int] = None, db: Session = Depends(get_db)):
"""모델 목록 조회"""
query = db.query(CarModel)
if maker_id:
query = query.filter(CarModel.maker_id == maker_id)
return query.all()
@router.post("/models/", response_model=CarModelResponse)
def create_model(model_data: CarModelCreate, db: Session = Depends(get_db)):
"""모델 등록"""
existing = db.query(CarModel).filter(
CarModel.code == model_data.code,
CarModel.maker_id == model_data.maker_id
).first()
if existing:
return existing
model = CarModel(**model_data.dict())
db.add(model)
db.commit()
db.refresh(model)
return model
@router.get("/{car_id}", response_model=CarResponse)
def get_car(car_id: int, admin: bool = Query(False), db: Session = Depends(get_db)):
"""차량 상세 조회"""
car = db.query(Car).options(
joinedload(Car.maker),
joinedload(Car.model),
joinedload(Car.images),
joinedload(Car.specification)
).filter(Car.id == car_id).first()
if not car:
raise HTTPException(status_code=404, detail="Car not found")
# Non-admin can only see displayed cars
if not admin and not car.is_displayed:
raise HTTPException(status_code=404, detail="Car not found")
return car_to_response(car)
@router.post("", response_model=CarResponse)
def create_car(car_data: CarCreate, db: Session = Depends(get_db)):
"""차량 등록 (Agent용)"""
# Check if car already exists
existing = db.query(Car).filter(
Car.source == car_data.source,
Car.source_id == car_data.source_id
).first()
if existing:
raise HTTPException(status_code=400, detail="Car already exists")
# Get maker and model IDs
maker_id = None
model_id = None
if car_data.maker_code:
maker = db.query(CarMaker).filter(CarMaker.code == car_data.maker_code).first()
if maker:
maker_id = maker.id
if car_data.model_code and maker_id:
model = db.query(CarModel).filter(
CarModel.code == car_data.model_code,
CarModel.maker_id == maker_id
).first()
if model:
model_id = model.id
# Create car
car = Car(
source=car_data.source,
source_id=car_data.source_id,
source_key=car_data.source_key,
maker_id=maker_id,
model_id=model_id,
car_name=car_data.car_name,
year=car_data.year,
month=car_data.month,
mileage=car_data.mileage,
price_krw=car_data.price_krw,
price_usd=car_data.price_usd,
fuel=car_data.fuel,
transmission=car_data.transmission,
color=car_data.color,
displacement=car_data.displacement,
car_number=car_data.car_number,
seize_count=car_data.seize_count,
collateral_count=car_data.collateral_count,
check_num=car_data.check_num,
dealer_name=car_data.dealer_name,
dealer_phone=car_data.dealer_phone,
shop_name=car_data.shop_name,
memo=car_data.memo,
)
db.add(car)
db.flush()
# Add images
for i, img in enumerate(car_data.images):
car_image = CarImage(
car_id=car.id,
url=img.url,
local_path=img.local_path,
is_main=(i == 0),
sort_order=i
)
db.add(car_image)
# Add options
for opt in car_data.options:
car_option = CarOption(car_id=car.id, option_name=opt)
db.add(car_option)
db.commit()
db.refresh(car)
return car
@router.put("/{car_id}", response_model=CarResponse)
def update_car(car_id: int, car_data: CarUpdate, db: Session = Depends(get_db)):
"""차량 정보 수정"""
car = db.query(Car).options(
joinedload(Car.maker),
joinedload(Car.model),
joinedload(Car.images),
joinedload(Car.specification)
).filter(Car.id == car_id).first()
if not car:
raise HTTPException(status_code=404, detail="Car not found")
for key, value in car_data.dict(exclude_unset=True).items():
setattr(car, key, value)
db.commit()
db.refresh(car)
return car_to_response(car)
@router.delete("/{car_id}")
def delete_car(car_id: int, db: Session = Depends(get_db)):
"""차량 삭제 (관련 데이터 포함)"""
print(f"[DELETE] Deleting car {car_id}")
car = db.query(Car).filter(Car.id == car_id).first()
if not car:
print(f"[DELETE] Car {car_id} not found")
raise HTTPException(status_code=404, detail="Car not found")
try:
# 관련 테이블 데이터 삭제
from ..models.car import CarImage, CarOption
from ..models.performance_check import CarPerformanceCheck
from ..models.car_specification import CarSpecification
from ..models.hero_banner import HeroBanner
from ..models.user import CarView, PerformanceCheckView
from sqlalchemy import text
# 이미지 삭제
img_count = db.query(CarImage).filter(CarImage.car_id == car_id).delete(synchronize_session=False)
print(f"[DELETE] Deleted {img_count} images")
# 옵션 삭제
opt_count = db.query(CarOption).filter(CarOption.car_id == car_id).delete(synchronize_session=False)
print(f"[DELETE] Deleted {opt_count} options")
# 성능점검 삭제
pc_count = db.query(CarPerformanceCheck).filter(CarPerformanceCheck.car_id == car_id).delete(synchronize_session=False)
print(f"[DELETE] Deleted {pc_count} performance checks")
# 사양 삭제
spec_count = db.query(CarSpecification).filter(CarSpecification.car_id == car_id).delete(synchronize_session=False)
print(f"[DELETE] Deleted {spec_count} specifications")
# 차량 조회 기록 삭제
cv_count = db.query(CarView).filter(CarView.car_id == car_id).delete(synchronize_session=False)
print(f"[DELETE] Deleted {cv_count} car views")
# 성능점검 조회 기록 삭제
pcv_count = db.query(PerformanceCheckView).filter(PerformanceCheckView.car_id == car_id).delete(synchronize_session=False)
print(f"[DELETE] Deleted {pcv_count} performance check views")
# 문의 기록에서 car_id 제거 (raw SQL로 실행하여 모델 스키마 검증 방지)
result = db.execute(text("UPDATE inquiries SET car_id = NULL WHERE car_id = :car_id"), {"car_id": car_id})
inq_count = result.rowcount
print(f"[DELETE] Unlinked {inq_count} inquiries")
# 배너에서 car_id 제거 (배너는 삭제하지 않고 연결만 해제)
banner_count = db.query(HeroBanner).filter(HeroBanner.car_id == car_id).update({"car_id": None}, synchronize_session=False)
print(f"[DELETE] Unlinked {banner_count} banners")
# 차량 삭제
db.delete(car)
db.commit()
print(f"[DELETE] Car {car_id} deleted successfully")
return {"message": "Car deleted"}
except Exception as e:
db.rollback()
import traceback
error_trace = traceback.format_exc()
print(f"[DELETE] Error deleting car {car_id}: {e}\n{error_trace}")
raise HTTPException(status_code=500, detail=f"Delete failed: {str(e)}")
# ==================== Soldout APIs ====================
@router.post("/{car_id}/soldout")
def mark_car_soldout(car_id: int, db: Session = Depends(get_db)):
"""차량을 SOLD OUT 처리 (수동)"""
car = db.query(Car).filter(Car.id == car_id).first()
if not car:
raise HTTPException(status_code=404, detail="Car not found")
car.soldout = True
db.commit()
return {"car_id": car_id, "soldout": True, "message": "Car marked as sold out"}
@router.delete("/{car_id}/soldout")
def mark_car_available(car_id: int, db: Session = Depends(get_db)):
"""차량을 다시 판매 가능 상태로 변경 (수동)"""
car = db.query(Car).filter(Car.id == car_id).first()
if not car:
raise HTTPException(status_code=404, detail="Car not found")
car.soldout = False
db.commit()
return {"car_id": car_id, "soldout": False, "message": "Car marked as available"}
async def run_soldout_check(db: Session):
"""백그라운드에서 soldout 체크 실행"""
checker = SoldoutChecker(db)
await checker.check_all_cars()
@router.post("/admin/check-soldout")
async def trigger_soldout_check(
background_tasks: BackgroundTasks,
db: Session = Depends(get_db)
):
"""모든 차량의 SOLD OUT 상태 확인 (Admin, 백그라운드 실행)
Carmodoo에서 더 이상 존재하지 않는 차량을 soldout=True로 표시합니다.
이 작업은 백그라운드에서 실행되며, 시간이 오래 걸릴 수 있습니다.
"""
# 현재 active이고 soldout=False인 차량 수 확인
pending_count = db.query(Car).filter(
Car.status == "active",
Car.soldout == False,
Car.source == "carmodoo"
).count()
# 백그라운드로 실행 (주의: db session 문제가 있을 수 있음)
# 실제로는 agent에서 스케줄로 실행하는 것이 좋음
# background_tasks.add_task(run_soldout_check, db)
return {
"message": "Soldout check will be performed by nightly agent job",
"pending_cars": pending_count,
"note": "Use carmodoo-agent for scheduled soldout checks"
}
@router.get("/admin/soldout-stats")
def get_soldout_stats(db: Session = Depends(get_db)):
"""SOLD OUT 통계 조회 (Admin)"""
total = db.query(Car).filter(Car.status == "active").count()
soldout = db.query(Car).filter(Car.status == "active", Car.soldout == True).count()
available = db.query(Car).filter(Car.status == "active", Car.soldout == False).count()
return {
"total_active": total,
"soldout": soldout,
"available": available,
"soldout_percentage": round(soldout / total * 100, 1) if total > 0 else 0
}