""" Visitor Analytics API """ from fastapi import APIRouter, Depends, Request, BackgroundTasks from sqlalchemy.orm import Session from sqlalchemy import func, and_, desc from datetime import datetime, timedelta from typing import Optional, List from pydantic import BaseModel from ..database import get_db from ..models.visitor import VisitorLog, VisitorDailyStats from ..models import User from ..services.visitor_service import log_visit, aggregate_daily_stats from .auth import get_current_admin_user, get_current_user_optional router = APIRouter(prefix="/visitor", tags=["Visitor Analytics"]) # Pydantic schemas class VisitLogRequest(BaseModel): page_path: str page_title: Optional[str] = None referrer: Optional[str] = None session_id: Optional[str] = None utm_source: Optional[str] = None utm_medium: Optional[str] = None utm_campaign: Optional[str] = None class VisitorStatsResponse(BaseModel): total_visits: int unique_visitors: int device_breakdown: dict browser_breakdown: dict country_breakdown: dict class ChartData(BaseModel): labels: List[str] values: List[int] class TopPage(BaseModel): path: str views: int title: Optional[str] = None class TopReferrer(BaseModel): domain: str visits: int # Background task wrapper for async log_visit async def _log_visit_background( db: Session, ip: str, user_agent: str, page_path: str, page_title: Optional[str], referrer: Optional[str], session_id: Optional[str], user_id: Optional[int], utm_source: Optional[str], utm_medium: Optional[str], utm_campaign: Optional[str], ): """Background wrapper for log_visit""" try: await log_visit( db, ip, user_agent, page_path, page_title, referrer, session_id, user_id, utm_source, utm_medium, utm_campaign ) except Exception as e: print(f"[Visitor] Log visit failed: {e}") # Public endpoint for logging visits @router.post("/log") async def log_page_visit( visit_data: VisitLogRequest, request: Request, db: Session = Depends(get_db), current_user: Optional[User] = Depends(get_current_user_optional), ): """ Log a page visit (called from frontend) """ # Get client IP (handle proxies) forwarded_for = request.headers.get("X-Forwarded-For") if forwarded_for: ip = forwarded_for.split(",")[0].strip() else: ip = request.client.host if request.client else "unknown" user_agent = request.headers.get("User-Agent", "") user_id = current_user.id if current_user else None # Log visit directly (async) try: await log_visit( db, ip, user_agent, visit_data.page_path, visit_data.page_title, visit_data.referrer, visit_data.session_id, user_id, visit_data.utm_source, visit_data.utm_medium, visit_data.utm_campaign, ) except Exception as e: print(f"[Visitor] Log visit failed: {e}") return {"status": "logged"} # Admin endpoints @router.get("/admin/overview", response_model=VisitorStatsResponse) def get_visitor_overview( days: int = 30, db: Session = Depends(get_db), current_user: User = Depends(get_current_admin_user), ): """Get visitor statistics overview for last N days""" start_date = datetime.utcnow() - timedelta(days=days) # Total visits total_visits = db.query(func.count(VisitorLog.id)).filter( VisitorLog.visited_at >= start_date ).scalar() or 0 # Unique visitors unique_visitors = db.query(func.count(func.distinct(VisitorLog.visitor_hash))).filter( VisitorLog.visited_at >= start_date ).scalar() or 0 # Device breakdown device_query = db.query( VisitorLog.device_type, func.count(VisitorLog.id) ).filter( VisitorLog.visited_at >= start_date ).group_by(VisitorLog.device_type).all() device_breakdown = {d[0] or "unknown": d[1] for d in device_query} # Browser breakdown browser_query = db.query( VisitorLog.browser, func.count(VisitorLog.id) ).filter( VisitorLog.visited_at >= start_date ).group_by(VisitorLog.browser).all() browser_breakdown = {b[0] or "unknown": b[1] for b in browser_query} # Country breakdown country_query = db.query( VisitorLog.country_code, func.count(VisitorLog.id) ).filter( VisitorLog.visited_at >= start_date ).group_by(VisitorLog.country_code).all() country_breakdown = {c[0] or "unknown": c[1] for c in country_query} return VisitorStatsResponse( total_visits=total_visits, unique_visitors=unique_visitors, device_breakdown=device_breakdown, browser_breakdown=browser_breakdown, country_breakdown=country_breakdown, ) @router.get("/admin/chart/visits", response_model=ChartData) def get_visits_chart( days: int = 30, db: Session = Depends(get_db), current_user: User = Depends(get_current_admin_user), ): """Get daily visits chart data""" today = datetime.utcnow().date() labels = [] values = [] for i in range(days - 1, -1, -1): date = today - timedelta(days=i) date_str = date.strftime("%Y-%m-%d") count = db.query(func.count(VisitorLog.id)).filter( func.date(VisitorLog.visited_at) == date_str ).scalar() or 0 labels.append(date.strftime("%m/%d")) values.append(count) return ChartData(labels=labels, values=values) @router.get("/admin/chart/unique-visitors", response_model=ChartData) def get_unique_visitors_chart( days: int = 30, db: Session = Depends(get_db), current_user: User = Depends(get_current_admin_user), ): """Get daily unique visitors chart data""" today = datetime.utcnow().date() labels = [] values = [] for i in range(days - 1, -1, -1): date = today - timedelta(days=i) date_str = date.strftime("%Y-%m-%d") count = db.query(func.count(func.distinct(VisitorLog.visitor_hash))).filter( func.date(VisitorLog.visited_at) == date_str ).scalar() or 0 labels.append(date.strftime("%m/%d")) values.append(count) return ChartData(labels=labels, values=values) @router.get("/admin/top-pages", response_model=List[TopPage]) def get_top_pages( days: int = 30, limit: int = 20, db: Session = Depends(get_db), current_user: User = Depends(get_current_admin_user), ): """Get top visited pages""" start_date = datetime.utcnow() - timedelta(days=days) pages = db.query( VisitorLog.page_path, VisitorLog.page_title, func.count(VisitorLog.id).label("views") ).filter( VisitorLog.visited_at >= start_date ).group_by( VisitorLog.page_path, VisitorLog.page_title ).order_by( desc("views") ).limit(limit).all() return [ TopPage(path=p[0], title=p[1], views=p[2]) for p in pages ] @router.get("/admin/top-referrers", response_model=List[TopReferrer]) def get_top_referrers( days: int = 30, limit: int = 10, db: Session = Depends(get_db), current_user: User = Depends(get_current_admin_user), ): """Get top referrer sources""" start_date = datetime.utcnow() - timedelta(days=days) referrers = db.query( VisitorLog.referrer_domain, func.count(VisitorLog.id).label("visits") ).filter( and_( VisitorLog.visited_at >= start_date, VisitorLog.referrer_domain.isnot(None), VisitorLog.referrer_domain != "" ) ).group_by( VisitorLog.referrer_domain ).order_by( desc("visits") ).limit(limit).all() return [ TopReferrer(domain=r[0], visits=r[1]) for r in referrers ] @router.get("/admin/realtime") def get_realtime_visitors( minutes: int = 5, db: Session = Depends(get_db), current_user: User = Depends(get_current_admin_user), ): """Get visitors in the last N minutes (real-time)""" start_time = datetime.utcnow() - timedelta(minutes=minutes) active_count = db.query(func.count(func.distinct(VisitorLog.visitor_hash))).filter( VisitorLog.visited_at >= start_time ).scalar() or 0 # Recent pages recent_pages = db.query( VisitorLog.page_path, func.count(VisitorLog.id).label("views") ).filter( VisitorLog.visited_at >= start_time ).group_by( VisitorLog.page_path ).order_by( desc("views") ).limit(5).all() return { "active_visitors": active_count, "minutes": minutes, "recent_pages": [{"path": p[0], "views": p[1]} for p in recent_pages], } @router.post("/admin/aggregate/{date_str}") def trigger_aggregation( date_str: str, db: Session = Depends(get_db), current_user: User = Depends(get_current_admin_user), ): """Manually trigger aggregation for a specific date (YYYY-MM-DD)""" result = aggregate_daily_stats(db, date_str) if result: return {"status": "success", "date": date_str} return {"status": "no_data", "date": date_str} @router.get("/admin/map-data") def get_visitor_map_data( minutes: int = 30, db: Session = Depends(get_db), current_user: User = Depends(get_current_admin_user), ): """ Get visitor locations for map display Returns recent visitors with lat/lng coordinates """ start_time = datetime.utcnow() - timedelta(minutes=minutes) # Get recent visitors with coordinates visitors = db.query( VisitorLog.latitude, VisitorLog.longitude, VisitorLog.country, VisitorLog.country_code, VisitorLog.city, VisitorLog.visited_at, VisitorLog.page_path, VisitorLog.device_type, ).filter( and_( VisitorLog.visited_at >= start_time, VisitorLog.latitude.isnot(None), VisitorLog.longitude.isnot(None), ) ).order_by( desc(VisitorLog.visited_at) ).limit(100).all() # Group by unique location for clustering locations = [] location_set = set() for v in visitors: # Round coordinates for grouping (roughly city-level) lat_key = round(v.latitude, 2) if v.latitude else 0 lng_key = round(v.longitude, 2) if v.longitude else 0 location_key = f"{lat_key}:{lng_key}" if location_key not in location_set: location_set.add(location_key) locations.append({ "lat": v.latitude, "lng": v.longitude, "country": v.country, "country_code": v.country_code, "city": v.city, "last_visit": v.visited_at.isoformat() if v.visited_at else None, "page": v.page_path, "device": v.device_type, "count": 1, # Will be aggregated }) else: # Increment count for existing location for loc in locations: if round(loc["lat"], 2) == lat_key and round(loc["lng"], 2) == lng_key: loc["count"] += 1 break # Get recent activity for animation (last 5 minutes) recent_start = datetime.utcnow() - timedelta(minutes=5) recent_visitors = db.query( VisitorLog.latitude, VisitorLog.longitude, VisitorLog.country, VisitorLog.city, VisitorLog.visited_at, ).filter( and_( VisitorLog.visited_at >= recent_start, VisitorLog.latitude.isnot(None), VisitorLog.longitude.isnot(None), ) ).order_by( desc(VisitorLog.visited_at) ).limit(20).all() recent = [ { "lat": v.latitude, "lng": v.longitude, "country": v.country, "city": v.city, "time": v.visited_at.isoformat() if v.visited_at else None, } for v in recent_visitors ] return { "locations": locations, "recent": recent, "total_with_coords": len(visitors), "minutes": minutes, }