Files
AutonetSellCar/temp_solutions_api.py

493 lines
15 KiB
Python

from fastapi import APIRouter, Depends, HTTPException, UploadFile, File, Query
from sqlalchemy.orm import Session
from typing import List, Optional
import os
import uuid
import aiofiles
from ..core.database import get_db
from ..core.security import get_current_admin
from ..core.config import settings
from ..models.admin import Admin
from ..models.solution import Solution, SolutionImage, Product, ProductImage
from ..schemas.solution import (
SolutionCreate, SolutionUpdate, SolutionResponse, SolutionLocalizedResponse,
ProductCreate, ProductUpdate, ProductResponse, ProductLocalizedResponse
)
router = APIRouter(tags=["solutions"])
ALLOWED_EXTENSIONS = {".jpg", ".jpeg", ".png", ".gif", ".webp"}
def get_localized_field(obj, field: str, lang: str) -> Optional[str]:
"""Get localized field value with fallback to Korean"""
localized = getattr(obj, f"{field}_{lang}", None)
if localized:
return localized
return getattr(obj, f"{field}_ko", None)
# ==================== Solution Public Endpoints ====================
@router.get("/solutions/", response_model=List[SolutionLocalizedResponse])
def get_solutions(
lang: str = Query("ko", regex="^(ko|en|ja|zh)$"),
db: Session = Depends(get_db)
):
"""Get all active solutions (public endpoint)"""
solutions = db.query(Solution).filter(
Solution.is_active == True
).order_by(Solution.display_order.asc()).all()
result = []
for s in solutions:
features_str = get_localized_field(s, "features", lang) or ""
features = [f.strip() for f in features_str.split(",") if f.strip()]
images = [
{
"id": img.id,
"image_url": img.image_url,
"caption": get_localized_field(img, "caption", lang)
}
for img in s.images
] if hasattr(s, 'images') and s.images else []
result.append(SolutionLocalizedResponse(
id=s.id,
title=get_localized_field(s, "title", lang),
subtitle=get_localized_field(s, "subtitle", lang),
description=get_localized_field(s, "description", lang),
features=features,
icon=s.icon,
color=s.color,
main_image=s.main_image,
images=images
))
return result
@router.get("/solutions/{solution_id}", response_model=SolutionLocalizedResponse)
def get_solution(
solution_id: int,
lang: str = Query("ko", regex="^(ko|en|ja|zh)$"),
db: Session = Depends(get_db)
):
"""Get single solution by ID (public endpoint)"""
solution = db.query(Solution).filter(
Solution.id == solution_id,
Solution.is_active == True
).first()
if not solution:
raise HTTPException(status_code=404, detail="Solution not found")
features_str = get_localized_field(solution, "features", lang) or ""
features = [f.strip() for f in features_str.split(",") if f.strip()]
images = [
{
"id": img.id,
"image_url": img.image_url,
"caption": get_localized_field(img, "caption", lang)
}
for img in solution.images
] if hasattr(solution, 'images') and solution.images else []
return SolutionLocalizedResponse(
id=solution.id,
title=get_localized_field(solution, "title", lang),
subtitle=get_localized_field(solution, "subtitle", lang),
description=get_localized_field(solution, "description", lang),
features=features,
icon=solution.icon,
color=solution.color,
main_image=solution.main_image,
images=images
)
# ==================== Solution Admin Endpoints ====================
@router.get("/solutions/admin/list", response_model=List[SolutionResponse])
def admin_get_solutions(
db: Session = Depends(get_db),
current_admin: Admin = Depends(get_current_admin)
):
"""Get all solutions (admin only)"""
solutions = db.query(Solution).order_by(Solution.display_order.asc(), Solution.id.desc()).all()
return solutions
@router.get("/solutions/admin/{solution_id}", response_model=SolutionResponse)
def admin_get_solution(
solution_id: int,
db: Session = Depends(get_db),
current_admin: Admin = Depends(get_current_admin)
):
"""Get solution with all fields (admin only)"""
solution = db.query(Solution).filter(Solution.id == solution_id).first()
if not solution:
raise HTTPException(status_code=404, detail="Solution not found")
return solution
@router.post("/solutions/admin", response_model=SolutionResponse)
def create_solution(
solution_data: SolutionCreate,
db: Session = Depends(get_db),
current_admin: Admin = Depends(get_current_admin)
):
"""Create new solution (admin only)"""
solution = Solution(**solution_data.model_dump())
db.add(solution)
db.commit()
db.refresh(solution)
return solution
@router.put("/solutions/admin/{solution_id}", response_model=SolutionResponse)
def update_solution(
solution_id: int,
solution_data: SolutionUpdate,
db: Session = Depends(get_db),
current_admin: Admin = Depends(get_current_admin)
):
"""Update solution (admin only)"""
solution = db.query(Solution).filter(Solution.id == solution_id).first()
if not solution:
raise HTTPException(status_code=404, detail="Solution not found")
update_data = solution_data.model_dump(exclude_unset=True)
for field, value in update_data.items():
setattr(solution, field, value)
db.commit()
db.refresh(solution)
return solution
@router.delete("/solutions/admin/{solution_id}")
def delete_solution(
solution_id: int,
db: Session = Depends(get_db),
current_admin: Admin = Depends(get_current_admin)
):
"""Delete solution (admin only)"""
solution = db.query(Solution).filter(Solution.id == solution_id).first()
if not solution:
raise HTTPException(status_code=404, detail="Solution not found")
# Delete associated images from filesystem
if solution.main_image:
try:
os.remove(solution.main_image)
except:
pass
for img in solution.images:
try:
os.remove(img.image_url)
except:
pass
db.delete(solution)
db.commit()
return {"message": "Solution deleted successfully"}
# ==================== Solution Image Upload ====================
@router.post("/solutions/admin/{solution_id}/upload-image")
async def upload_solution_image(
solution_id: int,
file: UploadFile = File(...),
is_main: bool = Query(False),
db: Session = Depends(get_db),
current_admin: Admin = Depends(get_current_admin)
):
"""Upload image for solution (admin only)"""
solution = db.query(Solution).filter(Solution.id == solution_id).first()
if not solution:
raise HTTPException(status_code=404, detail="Solution not found")
ext = os.path.splitext(file.filename)[1].lower()
if ext not in ALLOWED_EXTENSIONS:
raise HTTPException(status_code=400, detail="File type not allowed")
contents = await file.read()
if len(contents) > settings.MAX_FILE_SIZE:
raise HTTPException(status_code=400, detail="File too large")
filename = f"solution_{uuid.uuid4()}{ext}"
filepath = os.path.join(settings.UPLOAD_DIR, filename)
async with aiofiles.open(filepath, 'wb') as f:
await f.write(contents)
if is_main:
solution.main_image = filepath
db.commit()
else:
new_image = SolutionImage(
solution_id=solution_id,
image_url=filepath,
display_order=len(solution.images) if solution.images else 0
)
db.add(new_image)
db.commit()
return {"message": "Image uploaded successfully", "path": filepath}
@router.delete("/solutions/admin/images/{image_id}")
def delete_solution_image(
image_id: int,
db: Session = Depends(get_db),
current_admin: Admin = Depends(get_current_admin)
):
"""Delete solution image (admin only)"""
image = db.query(SolutionImage).filter(SolutionImage.id == image_id).first()
if not image:
raise HTTPException(status_code=404, detail="Image not found")
try:
os.remove(image.image_url)
except:
pass
db.delete(image)
db.commit()
return {"message": "Image deleted successfully"}
# ==================== Product Public Endpoints ====================
@router.get("/products/", response_model=List[ProductLocalizedResponse])
def get_products(
lang: str = Query("ko", regex="^(ko|en|ja|zh)$"),
db: Session = Depends(get_db)
):
"""Get all active products (public endpoint)"""
products = db.query(Product).filter(
Product.is_active == True
).order_by(Product.display_order.asc()).all()
result = []
for p in products:
images = [
{
"id": img.id,
"image_url": img.image_url,
"caption": get_localized_field(img, "caption", lang)
}
for img in p.images
] if hasattr(p, 'images') and p.images else []
result.append(ProductLocalizedResponse(
id=p.id,
name=get_localized_field(p, "name", lang),
category=get_localized_field(p, "category", lang),
description=get_localized_field(p, "description", lang),
detail=get_localized_field(p, "detail", lang),
specifications=p.specifications,
icon=p.icon,
main_image=p.main_image,
images=images
))
return result
@router.get("/products/{product_id}", response_model=ProductLocalizedResponse)
def get_product(
product_id: int,
lang: str = Query("ko", regex="^(ko|en|ja|zh)$"),
db: Session = Depends(get_db)
):
"""Get single product by ID (public endpoint)"""
product = db.query(Product).filter(
Product.id == product_id,
Product.is_active == True
).first()
if not product:
raise HTTPException(status_code=404, detail="Product not found")
images = [
{
"id": img.id,
"image_url": img.image_url,
"caption": get_localized_field(img, "caption", lang)
}
for img in product.images
] if hasattr(product, 'images') and product.images else []
return ProductLocalizedResponse(
id=product.id,
name=get_localized_field(product, "name", lang),
category=get_localized_field(product, "category", lang),
description=get_localized_field(product, "description", lang),
detail=get_localized_field(product, "detail", lang),
specifications=product.specifications,
icon=product.icon,
main_image=product.main_image,
images=images
)
# ==================== Product Admin Endpoints ====================
@router.get("/products/admin/list", response_model=List[ProductResponse])
def admin_get_products(
db: Session = Depends(get_db),
current_admin: Admin = Depends(get_current_admin)
):
"""Get all products (admin only)"""
products = db.query(Product).order_by(Product.display_order.asc(), Product.id.desc()).all()
return products
@router.get("/products/admin/{product_id}", response_model=ProductResponse)
def admin_get_product(
product_id: int,
db: Session = Depends(get_db),
current_admin: Admin = Depends(get_current_admin)
):
"""Get product with all fields (admin only)"""
product = db.query(Product).filter(Product.id == product_id).first()
if not product:
raise HTTPException(status_code=404, detail="Product not found")
return product
@router.post("/products/admin", response_model=ProductResponse)
def create_product(
product_data: ProductCreate,
db: Session = Depends(get_db),
current_admin: Admin = Depends(get_current_admin)
):
"""Create new product (admin only)"""
product = Product(**product_data.model_dump())
db.add(product)
db.commit()
db.refresh(product)
return product
@router.put("/products/admin/{product_id}", response_model=ProductResponse)
def update_product(
product_id: int,
product_data: ProductUpdate,
db: Session = Depends(get_db),
current_admin: Admin = Depends(get_current_admin)
):
"""Update product (admin only)"""
product = db.query(Product).filter(Product.id == product_id).first()
if not product:
raise HTTPException(status_code=404, detail="Product not found")
update_data = product_data.model_dump(exclude_unset=True)
for field, value in update_data.items():
setattr(product, field, value)
db.commit()
db.refresh(product)
return product
@router.delete("/products/admin/{product_id}")
def delete_product(
product_id: int,
db: Session = Depends(get_db),
current_admin: Admin = Depends(get_current_admin)
):
"""Delete product (admin only)"""
product = db.query(Product).filter(Product.id == product_id).first()
if not product:
raise HTTPException(status_code=404, detail="Product not found")
# Delete associated images from filesystem
if product.main_image:
try:
os.remove(product.main_image)
except:
pass
for img in product.images:
try:
os.remove(img.image_url)
except:
pass
db.delete(product)
db.commit()
return {"message": "Product deleted successfully"}
# ==================== Product Image Upload ====================
@router.post("/products/admin/{product_id}/upload-image")
async def upload_product_image(
product_id: int,
file: UploadFile = File(...),
is_main: bool = Query(False),
db: Session = Depends(get_db),
current_admin: Admin = Depends(get_current_admin)
):
"""Upload image for product (admin only)"""
product = db.query(Product).filter(Product.id == product_id).first()
if not product:
raise HTTPException(status_code=404, detail="Product not found")
ext = os.path.splitext(file.filename)[1].lower()
if ext not in ALLOWED_EXTENSIONS:
raise HTTPException(status_code=400, detail="File type not allowed")
contents = await file.read()
if len(contents) > settings.MAX_FILE_SIZE:
raise HTTPException(status_code=400, detail="File too large")
filename = f"product_{uuid.uuid4()}{ext}"
filepath = os.path.join(settings.UPLOAD_DIR, filename)
async with aiofiles.open(filepath, 'wb') as f:
await f.write(contents)
if is_main:
product.main_image = filepath
db.commit()
else:
new_image = ProductImage(
product_id=product_id,
image_url=filepath,
display_order=len(product.images) if product.images else 0
)
db.add(new_image)
db.commit()
return {"message": "Image uploaded successfully", "path": filepath}
@router.delete("/products/admin/images/{image_id}")
def delete_product_image(
image_id: int,
db: Session = Depends(get_db),
current_admin: Admin = Depends(get_current_admin)
):
"""Delete product image (admin only)"""
image = db.query(ProductImage).filter(ProductImage.id == image_id).first()
if not image:
raise HTTPException(status_code=404, detail="Image not found")
try:
os.remove(image.image_url)
except:
pass
db.delete(image)
db.commit()
return {"message": "Image deleted successfully"}