Files
AutonetSellCar/temp_downloads_api.py

323 lines
10 KiB
Python

from fastapi import APIRouter, Depends, HTTPException, UploadFile, File, Query, Request
from fastapi.responses import StreamingResponse
from sqlalchemy.orm import Session
from sqlalchemy import desc
from typing import List, Optional
import uuid
import os
import csv
import io
from datetime import datetime
from ..database import get_db
from ..models.download import Download, DownloadRequest
from ..schemas.download import (
DownloadResponse, DownloadRequestCreate, DownloadRequestResponse,
DownloadAdminCreate, DownloadAdminUpdate, DownloadAdminResponse,
DownloadRequestAdminResponse
)
from ..core.security import get_current_admin
from ..core.config import settings
router = APIRouter(prefix="/downloads", tags=["downloads"])
UPLOAD_DIR = os.path.join(settings.UPLOAD_DIR, "downloads")
os.makedirs(UPLOAD_DIR, exist_ok=True)
ALLOWED_EXTENSIONS = {".exe", ".zip", ".msi", ".dmg", ".pkg", ".tar", ".gz", ".rar", ".7z", ".pdf"}
ALLOWED_IMAGE_EXTENSIONS = {".jpg", ".jpeg", ".png", ".gif", ".webp"}
def get_localized_field(obj, field: str, lang: str) -> Optional[str]:
localized = getattr(obj, f"{field}_{lang}", None)
if localized:
return localized
return getattr(obj, f"{field}_ko", None)
# Public Endpoints
@router.get("/", response_model=List[DownloadResponse])
def get_downloads(lang: str = "ko", db: Session = Depends(get_db)):
downloads = db.query(Download).filter(
Download.is_active == True
).order_by(Download.display_order).all()
result = []
for d in downloads:
result.append(DownloadResponse(
id=d.id,
title=get_localized_field(d, "title", lang) or "",
description=get_localized_field(d, "description", lang),
category=get_localized_field(d, "category", lang),
file_name=d.file_name,
file_size=d.file_size,
version=d.version,
thumbnail=d.thumbnail,
download_count=d.download_count or 0
))
return result
@router.get("/{download_id}", response_model=DownloadResponse)
def get_download(download_id: int, lang: str = "ko", db: Session = Depends(get_db)):
d = db.query(Download).filter(
Download.id == download_id,
Download.is_active == True
).first()
if not d:
raise HTTPException(status_code=404, detail="Download not found")
return DownloadResponse(
id=d.id,
title=get_localized_field(d, "title", lang) or "",
description=get_localized_field(d, "description", lang),
category=get_localized_field(d, "category", lang),
file_name=d.file_name,
file_size=d.file_size,
version=d.version,
thumbnail=d.thumbnail,
download_count=d.download_count or 0
)
@router.post("/{download_id}/request", response_model=DownloadRequestResponse)
async def request_download(
download_id: int,
request_data: DownloadRequestCreate,
request: Request,
db: Session = Depends(get_db)
):
download = db.query(Download).filter(
Download.id == download_id,
Download.is_active == True
).first()
if not download:
raise HTTPException(status_code=404, detail="Download not found")
if not request_data.newsletter_agreed:
raise HTTPException(status_code=400, detail="Newsletter consent is required")
client_ip = request.headers.get("X-Forwarded-For", request.client.host)
if client_ip and "," in client_ip:
client_ip = client_ip.split(",")[0].strip()
country = None
country_code = None
try:
import httpx
async with httpx.AsyncClient(timeout=2.0) as client:
resp = await client.get(f"http://ip-api.com/json/{client_ip}?fields=country,countryCode")
if resp.status_code == 200:
data = resp.json()
country = data.get("country")
country_code = data.get("countryCode")
except:
pass
download_request = DownloadRequest(
download_id=download_id,
email=request_data.email,
newsletter_agreed=request_data.newsletter_agreed,
ip_address=client_ip,
country=country,
country_code=country_code
)
db.add(download_request)
download.download_count = (download.download_count or 0) + 1
db.commit()
return DownloadRequestResponse(
download_url=download.file_url,
file_name=download.file_name or "download"
)
# Admin Endpoints
@router.get("/admin/list", response_model=List[DownloadAdminResponse])
def admin_get_downloads(
db: Session = Depends(get_db),
admin = Depends(get_current_admin)
):
downloads = db.query(Download).order_by(Download.display_order).all()
return downloads
@router.get("/admin/{download_id}", response_model=DownloadAdminResponse)
def admin_get_download(
download_id: int,
db: Session = Depends(get_db),
admin = Depends(get_current_admin)
):
download = db.query(Download).filter(Download.id == download_id).first()
if not download:
raise HTTPException(status_code=404, detail="Download not found")
return download
@router.post("/admin", response_model=DownloadAdminResponse)
def admin_create_download(
download_data: DownloadAdminCreate,
db: Session = Depends(get_db),
admin = Depends(get_current_admin)
):
download = Download(**download_data.model_dump())
db.add(download)
db.commit()
db.refresh(download)
return download
@router.put("/admin/{download_id}", response_model=DownloadAdminResponse)
def admin_update_download(
download_id: int,
download_data: DownloadAdminUpdate,
db: Session = Depends(get_db),
admin = Depends(get_current_admin)
):
download = db.query(Download).filter(Download.id == download_id).first()
if not download:
raise HTTPException(status_code=404, detail="Download not found")
update_data = download_data.model_dump(exclude_unset=True)
for key, value in update_data.items():
setattr(download, key, value)
db.commit()
db.refresh(download)
return download
@router.delete("/admin/{download_id}")
def admin_delete_download(
download_id: int,
db: Session = Depends(get_db),
admin = Depends(get_current_admin)
):
download = db.query(Download).filter(Download.id == download_id).first()
if not download:
raise HTTPException(status_code=404, detail="Download not found")
db.delete(download)
db.commit()
return {"message": "Download deleted successfully"}
@router.post("/admin/upload")
async def admin_upload_file(
file: UploadFile = File(...),
file_type: str = Query("file", description="file or thumbnail"),
admin = Depends(get_current_admin)
):
ext = os.path.splitext(file.filename)[1].lower()
if file_type == "thumbnail":
if ext not in ALLOWED_IMAGE_EXTENSIONS:
raise HTTPException(status_code=400, detail=f"Allowed image formats: {ALLOWED_IMAGE_EXTENSIONS}")
else:
if ext not in ALLOWED_EXTENSIONS and ext not in ALLOWED_IMAGE_EXTENSIONS:
raise HTTPException(status_code=400, detail=f"Allowed formats: {ALLOWED_EXTENSIONS}")
unique_name = f"{uuid.uuid4()}{ext}"
file_path = os.path.join(UPLOAD_DIR, unique_name)
content = await file.read()
with open(file_path, "wb") as f:
f.write(content)
file_size = len(content)
file_url = f"uploads/downloads/{unique_name}"
return {
"file_url": file_url,
"file_name": file.filename,
"file_size": file_size
}
# Admin: Download Requests (Email List)
@router.get("/admin/requests", response_model=List[DownloadRequestAdminResponse])
def admin_get_requests(
newsletter_only: bool = False,
limit: int = 100,
offset: int = 0,
db: Session = Depends(get_db),
admin = Depends(get_current_admin)
):
query = db.query(DownloadRequest).join(Download)
if newsletter_only:
query = query.filter(DownloadRequest.newsletter_agreed == True)
requests = query.order_by(desc(DownloadRequest.requested_at)).offset(offset).limit(limit).all()
result = []
for r in requests:
result.append(DownloadRequestAdminResponse(
id=r.id,
download_id=r.download_id,
download_title=r.download.title_ko if r.download else None,
email=r.email,
newsletter_agreed=r.newsletter_agreed,
ip_address=r.ip_address,
country=r.country,
country_code=r.country_code,
requested_at=r.requested_at
))
return result
@router.get("/admin/requests/export")
def admin_export_requests(
newsletter_only: bool = True,
db: Session = Depends(get_db),
admin = Depends(get_current_admin)
):
query = db.query(DownloadRequest).join(Download)
if newsletter_only:
query = query.filter(DownloadRequest.newsletter_agreed == True)
requests = query.order_by(desc(DownloadRequest.requested_at)).all()
output = io.StringIO()
writer = csv.writer(output)
writer.writerow(["Email", "Download", "Newsletter", "Country", "Date"])
for r in requests:
writer.writerow([
r.email,
r.download.title_ko if r.download else "",
"Yes" if r.newsletter_agreed else "No",
r.country or "",
r.requested_at.strftime("%Y-%m-%d %H:%M") if r.requested_at else ""
])
output.seek(0)
return StreamingResponse(
io.BytesIO(output.getvalue().encode("utf-8-sig")),
media_type="text/csv",
headers={"Content-Disposition": f"attachment; filename=download_requests_{datetime.now().strftime('%Y%m%d')}.csv"}
)
@router.get("/admin/requests/stats")
def admin_get_request_stats(
db: Session = Depends(get_db),
admin = Depends(get_current_admin)
):
total = db.query(DownloadRequest).count()
newsletter = db.query(DownloadRequest).filter(DownloadRequest.newsletter_agreed == True).count()
unique_emails = db.query(DownloadRequest.email).distinct().count()
return {
"total_requests": total,
"newsletter_subscribers": newsletter,
"unique_emails": unique_emails
}