fix: Remove car_id property from adminAddVehicle call to fix TypeScript error
This commit is contained in:
492
temp_solutions_api.py
Normal file
492
temp_solutions_api.py
Normal file
@@ -0,0 +1,492 @@
|
||||
from fastapi import APIRouter, Depends, HTTPException, UploadFile, File, Query
|
||||
from sqlalchemy.orm import Session
|
||||
from typing import List, Optional
|
||||
import os
|
||||
import uuid
|
||||
import aiofiles
|
||||
|
||||
from ..core.database import get_db
|
||||
from ..core.security import get_current_admin
|
||||
from ..core.config import settings
|
||||
from ..models.admin import Admin
|
||||
from ..models.solution import Solution, SolutionImage, Product, ProductImage
|
||||
from ..schemas.solution import (
|
||||
SolutionCreate, SolutionUpdate, SolutionResponse, SolutionLocalizedResponse,
|
||||
ProductCreate, ProductUpdate, ProductResponse, ProductLocalizedResponse
|
||||
)
|
||||
|
||||
router = APIRouter(tags=["solutions"])
|
||||
|
||||
ALLOWED_EXTENSIONS = {".jpg", ".jpeg", ".png", ".gif", ".webp"}
|
||||
|
||||
|
||||
def get_localized_field(obj, field: str, lang: str) -> Optional[str]:
|
||||
"""Get localized field value with fallback to Korean"""
|
||||
localized = getattr(obj, f"{field}_{lang}", None)
|
||||
if localized:
|
||||
return localized
|
||||
return getattr(obj, f"{field}_ko", None)
|
||||
|
||||
|
||||
# ==================== Solution Public Endpoints ====================
|
||||
|
||||
@router.get("/solutions/", response_model=List[SolutionLocalizedResponse])
|
||||
def get_solutions(
|
||||
lang: str = Query("ko", regex="^(ko|en|ja|zh)$"),
|
||||
db: Session = Depends(get_db)
|
||||
):
|
||||
"""Get all active solutions (public endpoint)"""
|
||||
solutions = db.query(Solution).filter(
|
||||
Solution.is_active == True
|
||||
).order_by(Solution.display_order.asc()).all()
|
||||
|
||||
result = []
|
||||
for s in solutions:
|
||||
features_str = get_localized_field(s, "features", lang) or ""
|
||||
features = [f.strip() for f in features_str.split(",") if f.strip()]
|
||||
|
||||
images = [
|
||||
{
|
||||
"id": img.id,
|
||||
"image_url": img.image_url,
|
||||
"caption": get_localized_field(img, "caption", lang)
|
||||
}
|
||||
for img in s.images
|
||||
] if hasattr(s, 'images') and s.images else []
|
||||
|
||||
result.append(SolutionLocalizedResponse(
|
||||
id=s.id,
|
||||
title=get_localized_field(s, "title", lang),
|
||||
subtitle=get_localized_field(s, "subtitle", lang),
|
||||
description=get_localized_field(s, "description", lang),
|
||||
features=features,
|
||||
icon=s.icon,
|
||||
color=s.color,
|
||||
main_image=s.main_image,
|
||||
images=images
|
||||
))
|
||||
|
||||
return result
|
||||
|
||||
|
||||
@router.get("/solutions/{solution_id}", response_model=SolutionLocalizedResponse)
|
||||
def get_solution(
|
||||
solution_id: int,
|
||||
lang: str = Query("ko", regex="^(ko|en|ja|zh)$"),
|
||||
db: Session = Depends(get_db)
|
||||
):
|
||||
"""Get single solution by ID (public endpoint)"""
|
||||
solution = db.query(Solution).filter(
|
||||
Solution.id == solution_id,
|
||||
Solution.is_active == True
|
||||
).first()
|
||||
|
||||
if not solution:
|
||||
raise HTTPException(status_code=404, detail="Solution not found")
|
||||
|
||||
features_str = get_localized_field(solution, "features", lang) or ""
|
||||
features = [f.strip() for f in features_str.split(",") if f.strip()]
|
||||
|
||||
images = [
|
||||
{
|
||||
"id": img.id,
|
||||
"image_url": img.image_url,
|
||||
"caption": get_localized_field(img, "caption", lang)
|
||||
}
|
||||
for img in solution.images
|
||||
] if hasattr(solution, 'images') and solution.images else []
|
||||
|
||||
return SolutionLocalizedResponse(
|
||||
id=solution.id,
|
||||
title=get_localized_field(solution, "title", lang),
|
||||
subtitle=get_localized_field(solution, "subtitle", lang),
|
||||
description=get_localized_field(solution, "description", lang),
|
||||
features=features,
|
||||
icon=solution.icon,
|
||||
color=solution.color,
|
||||
main_image=solution.main_image,
|
||||
images=images
|
||||
)
|
||||
|
||||
|
||||
# ==================== Solution Admin Endpoints ====================
|
||||
|
||||
@router.get("/solutions/admin/list", response_model=List[SolutionResponse])
|
||||
def admin_get_solutions(
|
||||
db: Session = Depends(get_db),
|
||||
current_admin: Admin = Depends(get_current_admin)
|
||||
):
|
||||
"""Get all solutions (admin only)"""
|
||||
solutions = db.query(Solution).order_by(Solution.display_order.asc(), Solution.id.desc()).all()
|
||||
return solutions
|
||||
|
||||
|
||||
@router.get("/solutions/admin/{solution_id}", response_model=SolutionResponse)
|
||||
def admin_get_solution(
|
||||
solution_id: int,
|
||||
db: Session = Depends(get_db),
|
||||
current_admin: Admin = Depends(get_current_admin)
|
||||
):
|
||||
"""Get solution with all fields (admin only)"""
|
||||
solution = db.query(Solution).filter(Solution.id == solution_id).first()
|
||||
if not solution:
|
||||
raise HTTPException(status_code=404, detail="Solution not found")
|
||||
return solution
|
||||
|
||||
|
||||
@router.post("/solutions/admin", response_model=SolutionResponse)
|
||||
def create_solution(
|
||||
solution_data: SolutionCreate,
|
||||
db: Session = Depends(get_db),
|
||||
current_admin: Admin = Depends(get_current_admin)
|
||||
):
|
||||
"""Create new solution (admin only)"""
|
||||
solution = Solution(**solution_data.model_dump())
|
||||
db.add(solution)
|
||||
db.commit()
|
||||
db.refresh(solution)
|
||||
return solution
|
||||
|
||||
|
||||
@router.put("/solutions/admin/{solution_id}", response_model=SolutionResponse)
|
||||
def update_solution(
|
||||
solution_id: int,
|
||||
solution_data: SolutionUpdate,
|
||||
db: Session = Depends(get_db),
|
||||
current_admin: Admin = Depends(get_current_admin)
|
||||
):
|
||||
"""Update solution (admin only)"""
|
||||
solution = db.query(Solution).filter(Solution.id == solution_id).first()
|
||||
if not solution:
|
||||
raise HTTPException(status_code=404, detail="Solution not found")
|
||||
|
||||
update_data = solution_data.model_dump(exclude_unset=True)
|
||||
for field, value in update_data.items():
|
||||
setattr(solution, field, value)
|
||||
|
||||
db.commit()
|
||||
db.refresh(solution)
|
||||
return solution
|
||||
|
||||
|
||||
@router.delete("/solutions/admin/{solution_id}")
|
||||
def delete_solution(
|
||||
solution_id: int,
|
||||
db: Session = Depends(get_db),
|
||||
current_admin: Admin = Depends(get_current_admin)
|
||||
):
|
||||
"""Delete solution (admin only)"""
|
||||
solution = db.query(Solution).filter(Solution.id == solution_id).first()
|
||||
if not solution:
|
||||
raise HTTPException(status_code=404, detail="Solution not found")
|
||||
|
||||
# Delete associated images from filesystem
|
||||
if solution.main_image:
|
||||
try:
|
||||
os.remove(solution.main_image)
|
||||
except:
|
||||
pass
|
||||
|
||||
for img in solution.images:
|
||||
try:
|
||||
os.remove(img.image_url)
|
||||
except:
|
||||
pass
|
||||
|
||||
db.delete(solution)
|
||||
db.commit()
|
||||
return {"message": "Solution deleted successfully"}
|
||||
|
||||
|
||||
# ==================== Solution Image Upload ====================
|
||||
|
||||
@router.post("/solutions/admin/{solution_id}/upload-image")
|
||||
async def upload_solution_image(
|
||||
solution_id: int,
|
||||
file: UploadFile = File(...),
|
||||
is_main: bool = Query(False),
|
||||
db: Session = Depends(get_db),
|
||||
current_admin: Admin = Depends(get_current_admin)
|
||||
):
|
||||
"""Upload image for solution (admin only)"""
|
||||
solution = db.query(Solution).filter(Solution.id == solution_id).first()
|
||||
if not solution:
|
||||
raise HTTPException(status_code=404, detail="Solution not found")
|
||||
|
||||
ext = os.path.splitext(file.filename)[1].lower()
|
||||
if ext not in ALLOWED_EXTENSIONS:
|
||||
raise HTTPException(status_code=400, detail="File type not allowed")
|
||||
|
||||
contents = await file.read()
|
||||
if len(contents) > settings.MAX_FILE_SIZE:
|
||||
raise HTTPException(status_code=400, detail="File too large")
|
||||
|
||||
filename = f"solution_{uuid.uuid4()}{ext}"
|
||||
filepath = os.path.join(settings.UPLOAD_DIR, filename)
|
||||
|
||||
async with aiofiles.open(filepath, 'wb') as f:
|
||||
await f.write(contents)
|
||||
|
||||
if is_main:
|
||||
solution.main_image = filepath
|
||||
db.commit()
|
||||
else:
|
||||
new_image = SolutionImage(
|
||||
solution_id=solution_id,
|
||||
image_url=filepath,
|
||||
display_order=len(solution.images) if solution.images else 0
|
||||
)
|
||||
db.add(new_image)
|
||||
db.commit()
|
||||
|
||||
return {"message": "Image uploaded successfully", "path": filepath}
|
||||
|
||||
|
||||
@router.delete("/solutions/admin/images/{image_id}")
|
||||
def delete_solution_image(
|
||||
image_id: int,
|
||||
db: Session = Depends(get_db),
|
||||
current_admin: Admin = Depends(get_current_admin)
|
||||
):
|
||||
"""Delete solution image (admin only)"""
|
||||
image = db.query(SolutionImage).filter(SolutionImage.id == image_id).first()
|
||||
if not image:
|
||||
raise HTTPException(status_code=404, detail="Image not found")
|
||||
|
||||
try:
|
||||
os.remove(image.image_url)
|
||||
except:
|
||||
pass
|
||||
|
||||
db.delete(image)
|
||||
db.commit()
|
||||
return {"message": "Image deleted successfully"}
|
||||
|
||||
|
||||
# ==================== Product Public Endpoints ====================
|
||||
|
||||
@router.get("/products/", response_model=List[ProductLocalizedResponse])
|
||||
def get_products(
|
||||
lang: str = Query("ko", regex="^(ko|en|ja|zh)$"),
|
||||
db: Session = Depends(get_db)
|
||||
):
|
||||
"""Get all active products (public endpoint)"""
|
||||
products = db.query(Product).filter(
|
||||
Product.is_active == True
|
||||
).order_by(Product.display_order.asc()).all()
|
||||
|
||||
result = []
|
||||
for p in products:
|
||||
images = [
|
||||
{
|
||||
"id": img.id,
|
||||
"image_url": img.image_url,
|
||||
"caption": get_localized_field(img, "caption", lang)
|
||||
}
|
||||
for img in p.images
|
||||
] if hasattr(p, 'images') and p.images else []
|
||||
|
||||
result.append(ProductLocalizedResponse(
|
||||
id=p.id,
|
||||
name=get_localized_field(p, "name", lang),
|
||||
category=get_localized_field(p, "category", lang),
|
||||
description=get_localized_field(p, "description", lang),
|
||||
detail=get_localized_field(p, "detail", lang),
|
||||
specifications=p.specifications,
|
||||
icon=p.icon,
|
||||
main_image=p.main_image,
|
||||
images=images
|
||||
))
|
||||
|
||||
return result
|
||||
|
||||
|
||||
@router.get("/products/{product_id}", response_model=ProductLocalizedResponse)
|
||||
def get_product(
|
||||
product_id: int,
|
||||
lang: str = Query("ko", regex="^(ko|en|ja|zh)$"),
|
||||
db: Session = Depends(get_db)
|
||||
):
|
||||
"""Get single product by ID (public endpoint)"""
|
||||
product = db.query(Product).filter(
|
||||
Product.id == product_id,
|
||||
Product.is_active == True
|
||||
).first()
|
||||
|
||||
if not product:
|
||||
raise HTTPException(status_code=404, detail="Product not found")
|
||||
|
||||
images = [
|
||||
{
|
||||
"id": img.id,
|
||||
"image_url": img.image_url,
|
||||
"caption": get_localized_field(img, "caption", lang)
|
||||
}
|
||||
for img in product.images
|
||||
] if hasattr(product, 'images') and product.images else []
|
||||
|
||||
return ProductLocalizedResponse(
|
||||
id=product.id,
|
||||
name=get_localized_field(product, "name", lang),
|
||||
category=get_localized_field(product, "category", lang),
|
||||
description=get_localized_field(product, "description", lang),
|
||||
detail=get_localized_field(product, "detail", lang),
|
||||
specifications=product.specifications,
|
||||
icon=product.icon,
|
||||
main_image=product.main_image,
|
||||
images=images
|
||||
)
|
||||
|
||||
|
||||
# ==================== Product Admin Endpoints ====================
|
||||
|
||||
@router.get("/products/admin/list", response_model=List[ProductResponse])
|
||||
def admin_get_products(
|
||||
db: Session = Depends(get_db),
|
||||
current_admin: Admin = Depends(get_current_admin)
|
||||
):
|
||||
"""Get all products (admin only)"""
|
||||
products = db.query(Product).order_by(Product.display_order.asc(), Product.id.desc()).all()
|
||||
return products
|
||||
|
||||
|
||||
@router.get("/products/admin/{product_id}", response_model=ProductResponse)
|
||||
def admin_get_product(
|
||||
product_id: int,
|
||||
db: Session = Depends(get_db),
|
||||
current_admin: Admin = Depends(get_current_admin)
|
||||
):
|
||||
"""Get product with all fields (admin only)"""
|
||||
product = db.query(Product).filter(Product.id == product_id).first()
|
||||
if not product:
|
||||
raise HTTPException(status_code=404, detail="Product not found")
|
||||
return product
|
||||
|
||||
|
||||
@router.post("/products/admin", response_model=ProductResponse)
|
||||
def create_product(
|
||||
product_data: ProductCreate,
|
||||
db: Session = Depends(get_db),
|
||||
current_admin: Admin = Depends(get_current_admin)
|
||||
):
|
||||
"""Create new product (admin only)"""
|
||||
product = Product(**product_data.model_dump())
|
||||
db.add(product)
|
||||
db.commit()
|
||||
db.refresh(product)
|
||||
return product
|
||||
|
||||
|
||||
@router.put("/products/admin/{product_id}", response_model=ProductResponse)
|
||||
def update_product(
|
||||
product_id: int,
|
||||
product_data: ProductUpdate,
|
||||
db: Session = Depends(get_db),
|
||||
current_admin: Admin = Depends(get_current_admin)
|
||||
):
|
||||
"""Update product (admin only)"""
|
||||
product = db.query(Product).filter(Product.id == product_id).first()
|
||||
if not product:
|
||||
raise HTTPException(status_code=404, detail="Product not found")
|
||||
|
||||
update_data = product_data.model_dump(exclude_unset=True)
|
||||
for field, value in update_data.items():
|
||||
setattr(product, field, value)
|
||||
|
||||
db.commit()
|
||||
db.refresh(product)
|
||||
return product
|
||||
|
||||
|
||||
@router.delete("/products/admin/{product_id}")
|
||||
def delete_product(
|
||||
product_id: int,
|
||||
db: Session = Depends(get_db),
|
||||
current_admin: Admin = Depends(get_current_admin)
|
||||
):
|
||||
"""Delete product (admin only)"""
|
||||
product = db.query(Product).filter(Product.id == product_id).first()
|
||||
if not product:
|
||||
raise HTTPException(status_code=404, detail="Product not found")
|
||||
|
||||
# Delete associated images from filesystem
|
||||
if product.main_image:
|
||||
try:
|
||||
os.remove(product.main_image)
|
||||
except:
|
||||
pass
|
||||
|
||||
for img in product.images:
|
||||
try:
|
||||
os.remove(img.image_url)
|
||||
except:
|
||||
pass
|
||||
|
||||
db.delete(product)
|
||||
db.commit()
|
||||
return {"message": "Product deleted successfully"}
|
||||
|
||||
|
||||
# ==================== Product Image Upload ====================
|
||||
|
||||
@router.post("/products/admin/{product_id}/upload-image")
|
||||
async def upload_product_image(
|
||||
product_id: int,
|
||||
file: UploadFile = File(...),
|
||||
is_main: bool = Query(False),
|
||||
db: Session = Depends(get_db),
|
||||
current_admin: Admin = Depends(get_current_admin)
|
||||
):
|
||||
"""Upload image for product (admin only)"""
|
||||
product = db.query(Product).filter(Product.id == product_id).first()
|
||||
if not product:
|
||||
raise HTTPException(status_code=404, detail="Product not found")
|
||||
|
||||
ext = os.path.splitext(file.filename)[1].lower()
|
||||
if ext not in ALLOWED_EXTENSIONS:
|
||||
raise HTTPException(status_code=400, detail="File type not allowed")
|
||||
|
||||
contents = await file.read()
|
||||
if len(contents) > settings.MAX_FILE_SIZE:
|
||||
raise HTTPException(status_code=400, detail="File too large")
|
||||
|
||||
filename = f"product_{uuid.uuid4()}{ext}"
|
||||
filepath = os.path.join(settings.UPLOAD_DIR, filename)
|
||||
|
||||
async with aiofiles.open(filepath, 'wb') as f:
|
||||
await f.write(contents)
|
||||
|
||||
if is_main:
|
||||
product.main_image = filepath
|
||||
db.commit()
|
||||
else:
|
||||
new_image = ProductImage(
|
||||
product_id=product_id,
|
||||
image_url=filepath,
|
||||
display_order=len(product.images) if product.images else 0
|
||||
)
|
||||
db.add(new_image)
|
||||
db.commit()
|
||||
|
||||
return {"message": "Image uploaded successfully", "path": filepath}
|
||||
|
||||
|
||||
@router.delete("/products/admin/images/{image_id}")
|
||||
def delete_product_image(
|
||||
image_id: int,
|
||||
db: Session = Depends(get_db),
|
||||
current_admin: Admin = Depends(get_current_admin)
|
||||
):
|
||||
"""Delete product image (admin only)"""
|
||||
image = db.query(ProductImage).filter(ProductImage.id == image_id).first()
|
||||
if not image:
|
||||
raise HTTPException(status_code=404, detail="Image not found")
|
||||
|
||||
try:
|
||||
os.remove(image.image_url)
|
||||
except:
|
||||
pass
|
||||
|
||||
db.delete(image)
|
||||
db.commit()
|
||||
return {"message": "Image deleted successfully"}
|
||||
Reference in New Issue
Block a user