""" Visitor Tracking Service - Tracks page visits with privacy-preserving IP hashing - Parses user agent for device/browser info - Geolocation using free ip-api.com service """ import hashlib import httpx import json from datetime import datetime, timedelta from typing import Optional, Dict from sqlalchemy.orm import Session from sqlalchemy import func from ..models.visitor import VisitorLog, VisitorDailyStats, VisitorSession # IP Geolocation service (free, 45 req/min limit) IP_API_URL = "http://ip-api.com/json/{ip}?fields=status,country,countryCode,regionName,city,lat,lon" # Cache for IP geolocation results (in-memory, simple) _geo_cache: Dict[str, Dict] = {} _geo_cache_expiry: Dict[str, datetime] = {} GEO_CACHE_TTL = timedelta(hours=24) def hash_ip(ip: str) -> str: """Hash IP address for privacy""" return hashlib.sha256(ip.encode()).hexdigest() def hash_visitor(ip: str, user_agent: str) -> str: """Create unique visitor hash from IP + User-Agent""" combined = f"{ip}:{user_agent}" return hashlib.sha256(combined.encode()).hexdigest() def parse_device_info(user_agent_string: str) -> Dict: """Parse user agent string for device/browser info""" try: from user_agents import parse as parse_user_agent ua = parse_user_agent(user_agent_string) # Determine device type if ua.is_mobile: device_type = "mobile" elif ua.is_tablet: device_type = "tablet" else: device_type = "desktop" return { "device_type": device_type, "browser": ua.browser.family, "browser_version": ua.browser.version_string, "os": ua.os.family, "os_version": ua.os.version_string, } except ImportError: # Fallback if user-agents not installed return { "device_type": "unknown", "browser": "unknown", "browser_version": "", "os": "unknown", "os_version": "", } async def get_geo_info(ip: str) -> Optional[Dict]: """Get geographic info from IP address using free ip-api.com""" # Check cache first if ip in _geo_cache: if datetime.now() < _geo_cache_expiry.get(ip, datetime.min): return _geo_cache[ip] # Skip private/local IPs if ip.startswith(('127.', '192.168.', '10.', '172.16.', '172.17.', '172.18.', '172.19.', '172.20.', '172.21.', '172.22.', '172.23.', '172.24.', '172.25.', '172.26.', '172.27.', '172.28.', '172.29.', '172.30.', '172.31.', 'localhost', '::1')): return {"country": "Local", "country_code": "LO", "region": "", "city": "", "latitude": None, "longitude": None} try: async with httpx.AsyncClient() as client: response = await client.get( IP_API_URL.format(ip=ip), timeout=5.0 ) if response.status_code == 200: data = response.json() if data.get("status") == "success": result = { "country": data.get("country", "Unknown"), "country_code": data.get("countryCode", ""), "region": data.get("regionName", ""), "city": data.get("city", ""), "latitude": data.get("lat"), "longitude": data.get("lon"), } # Cache the result _geo_cache[ip] = result _geo_cache_expiry[ip] = datetime.now() + GEO_CACHE_TTL return result except Exception as e: print(f"Geo lookup failed for {ip}: {e}") return None def extract_referrer_domain(referrer: str) -> Optional[str]: """Extract domain from referrer URL""" if not referrer: return None try: from urllib.parse import urlparse parsed = urlparse(referrer) return parsed.netloc or None except: return None async def log_visit( db: Session, ip: str, user_agent: str, page_path: str, page_title: Optional[str] = None, referrer: Optional[str] = None, session_id: Optional[str] = None, user_id: Optional[int] = None, utm_source: Optional[str] = None, utm_medium: Optional[str] = None, utm_campaign: Optional[str] = None, ) -> VisitorLog: """ Log a page visit """ # Hash IP for privacy ip_hash = hash_ip(ip) visitor_hash = hash_visitor(ip, user_agent) # Parse device info device_info = parse_device_info(user_agent) # Get geo info (async) geo_info = await get_geo_info(ip) or {} # Extract referrer domain referrer_domain = extract_referrer_domain(referrer) # Create log entry log = VisitorLog( visitor_hash=visitor_hash, ip_hash=ip_hash, session_id=session_id, user_id=user_id, page_path=page_path, page_title=page_title, referrer=referrer, referrer_domain=referrer_domain, device_type=device_info["device_type"], browser=device_info["browser"], browser_version=device_info["browser_version"], os=device_info["os"], os_version=device_info["os_version"], country=geo_info.get("country"), country_code=geo_info.get("country_code"), city=geo_info.get("city"), region=geo_info.get("region"), latitude=geo_info.get("latitude"), longitude=geo_info.get("longitude"), utm_source=utm_source, utm_medium=utm_medium, utm_campaign=utm_campaign, ) db.add(log) # Update or create session if session_id: session = db.query(VisitorSession).filter( VisitorSession.session_id == session_id ).first() if session: session.last_page = page_path session.page_count += 1 session.last_activity_at = datetime.utcnow() if user_id and not session.user_id: session.user_id = user_id else: session = VisitorSession( session_id=session_id, visitor_hash=visitor_hash, user_id=user_id, first_page=page_path, last_page=page_path, device_type=device_info["device_type"], browser=device_info["browser"], country=geo_info.get("country"), ) db.add(session) db.commit() db.refresh(log) return log def aggregate_daily_stats(db: Session, date_str: str) -> Optional[VisitorDailyStats]: """ Aggregate visitor stats for a given date (YYYY-MM-DD) Called by scheduled task """ # Query all visits for the date visits = db.query(VisitorLog).filter( func.date(VisitorLog.visited_at) == date_str ).all() if not visits: return None total_visits = len(visits) unique_visitors = len(set(v.visitor_hash for v in visits)) # Device breakdown device_counts = {} for v in visits: device = v.device_type or "unknown" device_counts[device] = device_counts.get(device, 0) + 1 # Browser breakdown browser_counts = {} for v in visits: browser = v.browser or "unknown" browser_counts[browser] = browser_counts.get(browser, 0) + 1 # Country breakdown country_counts = {} for v in visits: country = v.country_code or "unknown" country_counts[country] = country_counts.get(country, 0) + 1 # Top pages page_counts = {} for v in visits: page_counts[v.page_path] = page_counts.get(v.page_path, 0) + 1 top_pages = sorted( [{"path": k, "views": v} for k, v in page_counts.items()], key=lambda x: x["views"], reverse=True )[:20] # Top referrers referrer_counts = {} for v in visits: if v.referrer_domain: referrer_counts[v.referrer_domain] = referrer_counts.get(v.referrer_domain, 0) + 1 top_referrers = sorted( [{"domain": k, "visits": v} for k, v in referrer_counts.items()], key=lambda x: x["visits"], reverse=True )[:10] # Create or update daily stats existing = db.query(VisitorDailyStats).filter( VisitorDailyStats.stat_date == date_str ).first() if existing: existing.total_visits = total_visits existing.unique_visitors = unique_visitors existing.device_breakdown = json.dumps(device_counts) existing.browser_breakdown = json.dumps(browser_counts) existing.country_breakdown = json.dumps(country_counts) existing.top_pages = json.dumps(top_pages) existing.top_referrers = json.dumps(top_referrers) stats = existing else: stats = VisitorDailyStats( stat_date=date_str, total_visits=total_visits, unique_visitors=unique_visitors, device_breakdown=json.dumps(device_counts), browser_breakdown=json.dumps(browser_counts), country_breakdown=json.dumps(country_counts), top_pages=json.dumps(top_pages), top_referrers=json.dumps(top_referrers), ) db.add(stats) db.commit() return stats def cleanup_old_visitor_logs(db: Session, days: int = 90) -> int: """Delete visitor logs older than specified days""" cutoff = datetime.now() - timedelta(days=days) deleted = db.query(VisitorLog).filter( VisitorLog.visited_at < cutoff ).delete() db.commit() return deleted