from fastapi import APIRouter, Depends, Request, Query from sqlalchemy.orm import Session from sqlalchemy import func, desc from datetime import date, datetime, timedelta from typing import Optional, List import httpx import re from app.core.database import get_db from app.models.visitor import DailyVisitor, VisitorLog, VisitorDailyStats from app.schemas.visitor import ( VisitorStatsResponse, CountryStatsResponse, OverviewStatsResponse, ChartDataResponse, BreakdownResponse, BreakdownItem, TopPagesResponse, TopPageItem, TopReferrersResponse, TopReferrerItem, RealtimeStatsResponse ) from app.api.auth import get_current_admin router = APIRouter(prefix="/visitors", tags=["visitors"]) def get_ip_location(ip_address: str) -> dict: """Get location info from IP address using ip-api.com (free, no API key)""" if ip_address in ("127.0.0.1", "localhost", "unknown") or ip_address.startswith(("192.168.", "10.", "172.")): return {"country": "Local", "country_code": "LO", "city": "Local", "region": "Local"} try: with httpx.Client(timeout=3.0) as client: response = client.get(f"http://ip-api.com/json/{ip_address}?fields=status,country,countryCode,city,regionName") if response.status_code == 200: data = response.json() if data.get("status") == "success": return { "country": data.get("country", "Unknown"), "country_code": data.get("countryCode", "??"), "city": data.get("city", "Unknown"), "region": data.get("regionName", "Unknown") } except Exception: pass return {"country": "Unknown", "country_code": "??", "city": "Unknown", "region": "Unknown"} def get_client_ip(request: Request) -> str: """Get client IP address from request""" forwarded = request.headers.get("X-Forwarded-For") if forwarded: return forwarded.split(",")[0].strip() return request.client.host if request.client else "unknown" def parse_user_agent(user_agent: str) -> dict: """Parse User-Agent string to extract device, browser, and OS info""" result = {"device_type": "desktop", "browser": "Unknown", "os": "Unknown"} if not user_agent: return result ua_lower = user_agent.lower() # Detect device type if any(x in ua_lower for x in ["mobile", "android", "iphone", "ipod"]): result["device_type"] = "mobile" elif any(x in ua_lower for x in ["ipad", "tablet"]): result["device_type"] = "tablet" # Detect browser if "edg" in ua_lower: result["browser"] = "Edge" elif "chrome" in ua_lower and "chromium" not in ua_lower: result["browser"] = "Chrome" elif "safari" in ua_lower and "chrome" not in ua_lower: result["browser"] = "Safari" elif "firefox" in ua_lower: result["browser"] = "Firefox" elif "opera" in ua_lower or "opr" in ua_lower: result["browser"] = "Opera" elif "msie" in ua_lower or "trident" in ua_lower: result["browser"] = "Internet Explorer" # Detect OS if "windows" in ua_lower: result["os"] = "Windows" elif "mac os" in ua_lower or "macintosh" in ua_lower: if "iphone" in ua_lower or "ipad" in ua_lower: result["os"] = "iOS" else: result["os"] = "macOS" elif "android" in ua_lower: result["os"] = "Android" elif "linux" in ua_lower: result["os"] = "Linux" elif "iphone" in ua_lower or "ipad" in ua_lower: result["os"] = "iOS" return result def extract_referrer_domain(referrer: str) -> str: """Extract domain from referrer URL""" if not referrer: return "(direct)" try: match = re.search(r'https?://([^/]+)', referrer) if match: domain = match.group(1) if domain.startswith("www."): domain = domain[4:] return domain except Exception: pass return "(direct)" @router.post("/track") def track_visitor( request: Request, page_path: Optional[str] = None, referrer: Optional[str] = None, db: Session = Depends(get_db) ): """Track a visitor with extended info""" today = date.today() ip_address = get_client_ip(request) user_agent = request.headers.get("User-Agent", "")[:500] existing_log = db.query(VisitorLog).filter( VisitorLog.ip_address == ip_address, VisitorLog.visit_date == today ).first() if existing_log: return {"message": "Already tracked", "new_visitor": False} location = get_ip_location(ip_address) ua_info = parse_user_agent(user_agent) referrer_domain = extract_referrer_domain(referrer) visitor_log = VisitorLog( ip_address=ip_address, user_agent=user_agent, visit_date=today, country=location["country"], country_code=location["country_code"], city=location["city"], region=location["region"], page_path=page_path, referrer=referrer, referrer_domain=referrer_domain, device_type=ua_info["device_type"], browser=ua_info["browser"], os=ua_info["os"] ) db.add(visitor_log) daily_record = db.query(DailyVisitor).filter( DailyVisitor.visit_date == today ).first() if daily_record: daily_record.visitor_count += 1 else: daily_record = DailyVisitor(visit_date=today, visitor_count=1) db.add(daily_record) db.commit() return {"message": "Tracked", "new_visitor": True} @router.get("/stats", response_model=VisitorStatsResponse) def get_visitor_stats(db: Session = Depends(get_db), _: str = Depends(get_current_admin)): today = date.today() today_record = db.query(DailyVisitor).filter(DailyVisitor.visit_date == today).first() today_visitors = today_record.visitor_count if today_record else 0 total_result = db.query(func.sum(DailyVisitor.visitor_count)).scalar() return VisitorStatsResponse(today_visitors=today_visitors, total_visitors=total_result or 0) @router.get("/stats/public", response_model=VisitorStatsResponse) def get_visitor_stats_public(db: Session = Depends(get_db)): today = date.today() today_record = db.query(DailyVisitor).filter(DailyVisitor.visit_date == today).first() today_visitors = today_record.visitor_count if today_record else 0 total_result = db.query(func.sum(DailyVisitor.visitor_count)).scalar() return VisitorStatsResponse(today_visitors=today_visitors, total_visitors=total_result or 0) @router.get("/stats/countries", response_model=List[CountryStatsResponse]) def get_country_stats(db: Session = Depends(get_db), _: str = Depends(get_current_admin)): country_stats = db.query( VisitorLog.country, VisitorLog.country_code, func.count(VisitorLog.id).label("visitor_count") ).group_by(VisitorLog.country, VisitorLog.country_code).order_by( func.count(VisitorLog.id).desc() ).limit(10).all() return [ CountryStatsResponse( country=stat.country or "Unknown", country_code=stat.country_code or "??", visitor_count=stat.visitor_count ) for stat in country_stats ] # ============ Extended Statistics Endpoints ============ @router.get("/admin/overview", response_model=OverviewStatsResponse) def get_overview_stats( days: int = Query(default=30, ge=7, le=90), db: Session = Depends(get_db), _: str = Depends(get_current_admin) ): """Get comprehensive overview statistics""" today = date.today() start_date = today - timedelta(days=days) yesterday = today - timedelta(days=1) total_visits = db.query(func.sum(DailyVisitor.visitor_count)).filter( DailyVisitor.visit_date >= start_date ).scalar() or 0 unique_visitors = db.query(func.count(func.distinct(VisitorLog.ip_address))).filter( VisitorLog.visit_date >= start_date ).scalar() or 0 pages_per_visit = round(total_visits / max(unique_visitors, 1), 1) total_with_device = db.query(func.count(VisitorLog.id)).filter( VisitorLog.visit_date >= start_date, VisitorLog.device_type.isnot(None) ).scalar() or 0 mobile_count = db.query(func.count(VisitorLog.id)).filter( VisitorLog.visit_date >= start_date, VisitorLog.device_type == "mobile" ).scalar() or 0 mobile_percentage = round((mobile_count / max(total_with_device, 1)) * 100, 1) today_record = db.query(DailyVisitor).filter(DailyVisitor.visit_date == today).first() today_visitors = today_record.visitor_count if today_record else 0 yesterday_record = db.query(DailyVisitor).filter(DailyVisitor.visit_date == yesterday).first() yesterday_visitors = yesterday_record.visitor_count if yesterday_record else 0 if yesterday_visitors > 0: growth_rate = round(((today_visitors - yesterday_visitors) / yesterday_visitors) * 100, 1) else: growth_rate = 0.0 if today_visitors == 0 else 100.0 return OverviewStatsResponse( period=f"{days}d", total_visits=total_visits, unique_visitors=unique_visitors, pages_per_visit=pages_per_visit, mobile_percentage=mobile_percentage, today_visitors=today_visitors, yesterday_visitors=yesterday_visitors, growth_rate=growth_rate ) @router.get("/admin/chart/visits", response_model=ChartDataResponse) def get_visits_chart( days: int = Query(default=30, ge=7, le=90), db: Session = Depends(get_db), _: str = Depends(get_current_admin) ): """Get daily visits chart data""" today = date.today() start_date = today - timedelta(days=days-1) records = db.query(DailyVisitor).filter( DailyVisitor.visit_date >= start_date, DailyVisitor.visit_date <= today ).order_by(DailyVisitor.visit_date).all() record_dict = {r.visit_date: r.visitor_count for r in records} labels = [] data = [] current = start_date while current <= today: labels.append(current.strftime("%m/%d")) data.append(record_dict.get(current, 0)) current += timedelta(days=1) return ChartDataResponse(labels=labels, data=data) @router.get("/admin/chart/unique", response_model=ChartDataResponse) def get_unique_visitors_chart( days: int = Query(default=30, ge=7, le=90), db: Session = Depends(get_db), _: str = Depends(get_current_admin) ): """Get daily unique visitors chart data""" today = date.today() start_date = today - timedelta(days=days-1) daily_unique = db.query( VisitorLog.visit_date, func.count(func.distinct(VisitorLog.ip_address)).label("unique_count") ).filter( VisitorLog.visit_date >= start_date, VisitorLog.visit_date <= today ).group_by(VisitorLog.visit_date).all() unique_dict = {r.visit_date: r.unique_count for r in daily_unique} labels = [] data = [] current = start_date while current <= today: labels.append(current.strftime("%m/%d")) data.append(unique_dict.get(current, 0)) current += timedelta(days=1) return ChartDataResponse(labels=labels, data=data) @router.get("/admin/breakdown/device", response_model=BreakdownResponse) def get_device_breakdown( days: int = Query(default=30, ge=7, le=90), db: Session = Depends(get_db), _: str = Depends(get_current_admin) ): start_date = date.today() - timedelta(days=days) stats = db.query( VisitorLog.device_type, func.count(VisitorLog.id).label("count") ).filter( VisitorLog.visit_date >= start_date, VisitorLog.device_type.isnot(None) ).group_by(VisitorLog.device_type).order_by(desc("count")).all() total = sum(s.count for s in stats) or 1 items = [ BreakdownItem(name=s.device_type or "Unknown", count=s.count, percentage=round((s.count / total) * 100, 1)) for s in stats ] return BreakdownResponse(items=items, total=total) @router.get("/admin/breakdown/browser", response_model=BreakdownResponse) def get_browser_breakdown( days: int = Query(default=30, ge=7, le=90), db: Session = Depends(get_db), _: str = Depends(get_current_admin) ): start_date = date.today() - timedelta(days=days) stats = db.query( VisitorLog.browser, func.count(VisitorLog.id).label("count") ).filter( VisitorLog.visit_date >= start_date, VisitorLog.browser.isnot(None) ).group_by(VisitorLog.browser).order_by(desc("count")).limit(10).all() total = sum(s.count for s in stats) or 1 items = [ BreakdownItem(name=s.browser or "Unknown", count=s.count, percentage=round((s.count / total) * 100, 1)) for s in stats ] return BreakdownResponse(items=items, total=total) @router.get("/admin/breakdown/os", response_model=BreakdownResponse) def get_os_breakdown( days: int = Query(default=30, ge=7, le=90), db: Session = Depends(get_db), _: str = Depends(get_current_admin) ): start_date = date.today() - timedelta(days=days) stats = db.query( VisitorLog.os, func.count(VisitorLog.id).label("count") ).filter( VisitorLog.visit_date >= start_date, VisitorLog.os.isnot(None) ).group_by(VisitorLog.os).order_by(desc("count")).limit(10).all() total = sum(s.count for s in stats) or 1 items = [ BreakdownItem(name=s.os or "Unknown", count=s.count, percentage=round((s.count / total) * 100, 1)) for s in stats ] return BreakdownResponse(items=items, total=total) @router.get("/admin/top-pages", response_model=TopPagesResponse) def get_top_pages( days: int = Query(default=30, ge=7, le=90), db: Session = Depends(get_db), _: str = Depends(get_current_admin) ): start_date = date.today() - timedelta(days=days) stats = db.query( VisitorLog.page_path, func.count(VisitorLog.id).label("count") ).filter( VisitorLog.visit_date >= start_date, VisitorLog.page_path.isnot(None) ).group_by(VisitorLog.page_path).order_by(desc("count")).limit(10).all() total = sum(s.count for s in stats) or 1 pages = [ TopPageItem(path=s.page_path or "/", count=s.count, percentage=round((s.count / total) * 100, 1)) for s in stats ] return TopPagesResponse(pages=pages, total=total) @router.get("/admin/top-referrers", response_model=TopReferrersResponse) def get_top_referrers( days: int = Query(default=30, ge=7, le=90), db: Session = Depends(get_db), _: str = Depends(get_current_admin) ): start_date = date.today() - timedelta(days=days) stats = db.query( VisitorLog.referrer_domain, func.count(VisitorLog.id).label("count") ).filter( VisitorLog.visit_date >= start_date, VisitorLog.referrer_domain.isnot(None) ).group_by(VisitorLog.referrer_domain).order_by(desc("count")).limit(10).all() total = sum(s.count for s in stats) or 1 referrers = [ TopReferrerItem(domain=s.referrer_domain or "(direct)", count=s.count, percentage=round((s.count / total) * 100, 1)) for s in stats ] return TopReferrersResponse(referrers=referrers, total=total) @router.get("/admin/realtime", response_model=RealtimeStatsResponse) def get_realtime_stats( db: Session = Depends(get_db), _: str = Depends(get_current_admin) ): now = datetime.now() five_minutes_ago = now - timedelta(minutes=5) active_count = db.query(func.count(VisitorLog.id)).filter( VisitorLog.visited_at >= five_minutes_ago ).scalar() or 0 return RealtimeStatsResponse(active_visitors=active_count, last_5_minutes=active_count)