Files
AutonetSellCar/backend/app/services/cache_service.py
AutonetSellCar Deploy 1f0dcb1ddb Initial commit: AutonetSellCar platform with deployment system
- Frontend: Next.js 14 with TypeScript
- Backend: FastAPI with SQLAlchemy
- Agent: Carmodoo sync agent
- Deployment: Docker Compose based staging/production setup
- Scripts: Automated deployment with rollback support

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-30 13:24:39 +09:00

311 lines
10 KiB
Python

"""
캐시 서비스 - 카모두 검색 결과 캐싱 및 필터링
"""
import asyncio
import json
from datetime import datetime, timedelta
from typing import List, Optional, Dict, Any, Tuple, TYPE_CHECKING
from sqlalchemy.orm import Session
from sqlalchemy import and_
from ..models.cache import CarCache, CarDetailCache, CacheRequestQueue
if TYPE_CHECKING:
from ..api.carmodoo import CarmodooClient
# 캐시 TTL 설정 (시간 단위)
CACHE_TTL_HOURS = 2
# 요청 큐 락
_request_lock = asyncio.Lock()
_pending_requests: Dict[str, asyncio.Event] = {}
class CacheService:
def __init__(self, db: Session, carmodoo_client: "CarmodooClient" = None):
self.db = db
self.carmodoo_client = carmodoo_client
def get_cache_key(self, maker_code: str, model_code: str) -> str:
"""캐시 키 생성"""
return f"{maker_code}_{model_code}"
def get_cache(self, cache_key: str) -> Optional[CarCache]:
"""캐시 조회 (만료 확인)"""
cache = self.db.query(CarCache).filter(
CarCache.cache_key == cache_key
).first()
if cache:
# 만료 확인
if cache.expires_at < datetime.utcnow():
# 만료된 캐시 삭제
self.db.delete(cache)
self.db.commit()
return None
return cache
return None
def save_cache(
self,
cache_key: str,
maker_code: str,
maker_name: str,
model_code: str,
model_name: str,
cars: List[Dict[str, Any]]
) -> CarCache:
"""캐시 저장"""
expires_at = datetime.utcnow() + timedelta(hours=CACHE_TTL_HOURS)
# 기존 캐시 삭제
existing = self.db.query(CarCache).filter(
CarCache.cache_key == cache_key
).first()
if existing:
self.db.delete(existing)
self.db.commit()
# 새 캐시 저장
cache = CarCache(
cache_key=cache_key,
maker_code=maker_code,
maker_name=maker_name,
model_code=model_code,
model_name=model_name,
total_count=len(cars),
cars_data=json.dumps(cars, ensure_ascii=False),
expires_at=expires_at
)
self.db.add(cache)
self.db.commit()
self.db.refresh(cache)
return cache
def get_cars_from_cache(self, cache: CarCache) -> List[Dict[str, Any]]:
"""캐시에서 차량 목록 가져오기"""
return json.loads(cache.cars_data)
def filter_cars(
self,
cars: List[Dict[str, Any]],
year_min: Optional[int] = None,
year_max: Optional[int] = None,
mileage_min: Optional[int] = None,
mileage_max: Optional[int] = None,
price_min: Optional[int] = None,
price_max: Optional[int] = None,
fuel: Optional[str] = None,
transmission: Optional[str] = None,
displacement_min: Optional[int] = None,
displacement_max: Optional[int] = None
) -> List[Dict[str, Any]]:
"""캐시된 데이터에서 필터링"""
filtered = cars
if year_min:
filtered = [c for c in filtered if c.get('year') and c['year'] >= year_min]
if year_max:
filtered = [c for c in filtered if c.get('year') and c['year'] <= year_max]
if mileage_min:
filtered = [c for c in filtered if c.get('mileage') and c['mileage'] >= mileage_min]
if mileage_max:
filtered = [c for c in filtered if c.get('mileage') and c['mileage'] <= mileage_max]
if price_min:
# 'price' 또는 'original_price' 키 둘 다 체크 (카모두 파싱 결과는 'price', 변환 후에는 'original_price')
filtered = [c for c in filtered if (c.get('price') or c.get('original_price')) and (c.get('price') or c.get('original_price', 0)) >= price_min]
if price_max:
filtered = [c for c in filtered if (c.get('price') or c.get('original_price')) and (c.get('price') or c.get('original_price', 0)) <= price_max]
if fuel:
# 연료 타입 매핑 (프론트엔드 값 -> 카모두 값)
fuel_map = {
'가솔린': ['휘발유', '가솔린'],
'디젤': ['경유', '디젤'],
'LPG': ['LPG'],
'하이브리드': ['하이브리드'],
'전기': ['전기'],
'휘발유': ['휘발유', '가솔린'],
'경유': ['경유', '디젤'],
}
allowed_fuels = fuel_map.get(fuel, [fuel])
filtered = [c for c in filtered if c.get('fuel') in allowed_fuels]
if transmission:
# 변속기 타입 매핑
trans_map = {
'자동': ['오토', '자동'],
'수동': ['수동'],
'세미오토': ['세미오토'],
'CVT': ['CVT'],
}
allowed_trans = trans_map.get(transmission, [transmission])
filtered = [c for c in filtered if c.get('transmission') in allowed_trans]
if displacement_min:
filtered = [c for c in filtered if c.get('displacement') and c['displacement'] >= displacement_min]
if displacement_max:
filtered = [c for c in filtered if c.get('displacement') and c['displacement'] <= displacement_max]
return filtered
def paginate_cars(
self,
cars: List[Dict[str, Any]],
page: int = 1,
page_size: int = 20
) -> Tuple[List[Dict[str, Any]], int]:
"""페이징 처리"""
total = len(cars)
start = (page - 1) * page_size
end = start + page_size
return cars[start:end], total
async def fetch_all_cars_for_cache(
self,
maker_code: str,
model_code: str,
maker_name: str = "",
model_name: str = ""
) -> List[Dict[str, Any]]:
"""캐시용 전체 데이터 수집 (연도별 분할 검색)
카모두 API는 페이징이 제대로 동작하지 않아 한 번에 최대 50대만 반환합니다.
연도별로 나누어 검색하여 더 많은 차량을 수집합니다.
"""
if not self.carmodoo_client:
return []
try:
# 연도별 분할 검색 사용 (최근 15년간)
all_cars = await self.carmodoo_client.search_cars_by_year_segment(
maker_code=maker_code,
model_code=model_code,
year_start=2010, # 2010년부터
year_end=None # 현재 연도까지
)
return all_cars
except Exception as e:
print(f"Error fetching cars for cache: {e}")
return []
async def get_or_fetch_cache(
self,
maker_code: str,
model_code: str,
maker_name: str = "",
model_name: str = ""
) -> Optional[CarCache]:
"""캐시 조회 또는 새로 가져오기 (요청 병합 포함)"""
cache_key = self.get_cache_key(maker_code, model_code)
# 1. 캐시 확인
cache = self.get_cache(cache_key)
if cache:
return cache
# 2. 요청 락으로 동시 요청 병합
async with _request_lock:
# 다른 요청이 이미 처리 중인지 확인
if cache_key in _pending_requests:
event = _pending_requests[cache_key]
else:
# 새 이벤트 생성
event = asyncio.Event()
_pending_requests[cache_key] = event
# 백그라운드에서 데이터 가져오기
asyncio.create_task(
self._fetch_and_cache(cache_key, maker_code, model_code, maker_name, model_name, event)
)
# 3. 완료 대기
await event.wait()
# 4. 캐시 반환
return self.get_cache(cache_key)
async def _fetch_and_cache(
self,
cache_key: str,
maker_code: str,
model_code: str,
maker_name: str,
model_name: str,
event: asyncio.Event
):
"""데이터 가져와서 캐시에 저장"""
try:
cars = await self.fetch_all_cars_for_cache(
maker_code, model_code, maker_name, model_name
)
if cars:
self.save_cache(
cache_key=cache_key,
maker_code=maker_code,
maker_name=maker_name,
model_code=model_code,
model_name=model_name,
cars=cars
)
except Exception as e:
print(f"Error caching {cache_key}: {e}")
finally:
# 완료 시그널
event.set()
# 대기열에서 제거
if cache_key in _pending_requests:
del _pending_requests[cache_key]
def cleanup_expired_cache(self):
"""만료된 캐시 정리"""
expired = self.db.query(CarCache).filter(
CarCache.expires_at < datetime.utcnow()
).all()
for cache in expired:
self.db.delete(cache)
self.db.commit()
return len(expired)
# 상세 정보 캐시 관련
def get_detail_cache(self, car_id: str) -> Optional[CarDetailCache]:
"""상세 정보 캐시 조회"""
cache = self.db.query(CarDetailCache).filter(
CarDetailCache.car_id == car_id
).first()
if cache:
if cache.expires_at < datetime.utcnow():
self.db.delete(cache)
self.db.commit()
return None
return cache
return None
def save_detail_cache(self, car_id: str, detail_data: Dict[str, Any]) -> CarDetailCache:
"""상세 정보 캐시 저장"""
expires_at = datetime.utcnow() + timedelta(hours=CACHE_TTL_HOURS)
existing = self.db.query(CarDetailCache).filter(
CarDetailCache.car_id == car_id
).first()
if existing:
self.db.delete(existing)
self.db.commit()
cache = CarDetailCache(
car_id=car_id,
detail_data=json.dumps(detail_data, ensure_ascii=False),
expires_at=expires_at
)
self.db.add(cache)
self.db.commit()
self.db.refresh(cache)
return cache
def get_detail_from_cache(self, cache: CarDetailCache) -> Dict[str, Any]:
"""상세 정보 캐시에서 데이터 가져오기"""
return json.loads(cache.detail_data)