- Frontend: Next.js 14 with TypeScript - Backend: FastAPI with SQLAlchemy - Agent: Carmodoo sync agent - Deployment: Docker Compose based staging/production setup - Scripts: Automated deployment with rollback support 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
300 lines
9.2 KiB
Python
300 lines
9.2 KiB
Python
"""
|
|
Visitor Tracking Service
|
|
- Tracks page visits with privacy-preserving IP hashing
|
|
- Parses user agent for device/browser info
|
|
- Geolocation using free ip-api.com service
|
|
"""
|
|
import hashlib
|
|
import httpx
|
|
import json
|
|
from datetime import datetime, timedelta
|
|
from typing import Optional, Dict
|
|
from sqlalchemy.orm import Session
|
|
from sqlalchemy import func
|
|
|
|
from ..models.visitor import VisitorLog, VisitorDailyStats, VisitorSession
|
|
|
|
# IP Geolocation service (free, 45 req/min limit)
|
|
IP_API_URL = "http://ip-api.com/json/{ip}?fields=status,country,countryCode,regionName,city"
|
|
|
|
# Cache for IP geolocation results (in-memory, simple)
|
|
_geo_cache: Dict[str, Dict] = {}
|
|
_geo_cache_expiry: Dict[str, datetime] = {}
|
|
GEO_CACHE_TTL = timedelta(hours=24)
|
|
|
|
|
|
def hash_ip(ip: str) -> str:
|
|
"""Hash IP address for privacy"""
|
|
return hashlib.sha256(ip.encode()).hexdigest()
|
|
|
|
|
|
def hash_visitor(ip: str, user_agent: str) -> str:
|
|
"""Create unique visitor hash from IP + User-Agent"""
|
|
combined = f"{ip}:{user_agent}"
|
|
return hashlib.sha256(combined.encode()).hexdigest()
|
|
|
|
|
|
def parse_device_info(user_agent_string: str) -> Dict:
|
|
"""Parse user agent string for device/browser info"""
|
|
try:
|
|
from user_agents import parse as parse_user_agent
|
|
ua = parse_user_agent(user_agent_string)
|
|
|
|
# Determine device type
|
|
if ua.is_mobile:
|
|
device_type = "mobile"
|
|
elif ua.is_tablet:
|
|
device_type = "tablet"
|
|
else:
|
|
device_type = "desktop"
|
|
|
|
return {
|
|
"device_type": device_type,
|
|
"browser": ua.browser.family,
|
|
"browser_version": ua.browser.version_string,
|
|
"os": ua.os.family,
|
|
"os_version": ua.os.version_string,
|
|
}
|
|
except ImportError:
|
|
# Fallback if user-agents not installed
|
|
return {
|
|
"device_type": "unknown",
|
|
"browser": "unknown",
|
|
"browser_version": "",
|
|
"os": "unknown",
|
|
"os_version": "",
|
|
}
|
|
|
|
|
|
async def get_geo_info(ip: str) -> Optional[Dict]:
|
|
"""Get geographic info from IP address using free ip-api.com"""
|
|
# Check cache first
|
|
if ip in _geo_cache:
|
|
if datetime.now() < _geo_cache_expiry.get(ip, datetime.min):
|
|
return _geo_cache[ip]
|
|
|
|
# Skip private/local IPs
|
|
if ip.startswith(('127.', '192.168.', '10.', '172.16.', '172.17.', '172.18.', '172.19.',
|
|
'172.20.', '172.21.', '172.22.', '172.23.', '172.24.', '172.25.',
|
|
'172.26.', '172.27.', '172.28.', '172.29.', '172.30.', '172.31.', 'localhost', '::1')):
|
|
return {"country": "Local", "country_code": "LO", "region": "", "city": ""}
|
|
|
|
try:
|
|
async with httpx.AsyncClient() as client:
|
|
response = await client.get(
|
|
IP_API_URL.format(ip=ip),
|
|
timeout=5.0
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
data = response.json()
|
|
if data.get("status") == "success":
|
|
result = {
|
|
"country": data.get("country", "Unknown"),
|
|
"country_code": data.get("countryCode", ""),
|
|
"region": data.get("regionName", ""),
|
|
"city": data.get("city", ""),
|
|
}
|
|
# Cache the result
|
|
_geo_cache[ip] = result
|
|
_geo_cache_expiry[ip] = datetime.now() + GEO_CACHE_TTL
|
|
return result
|
|
except Exception as e:
|
|
print(f"Geo lookup failed for {ip}: {e}")
|
|
|
|
return None
|
|
|
|
|
|
def extract_referrer_domain(referrer: str) -> Optional[str]:
|
|
"""Extract domain from referrer URL"""
|
|
if not referrer:
|
|
return None
|
|
try:
|
|
from urllib.parse import urlparse
|
|
parsed = urlparse(referrer)
|
|
return parsed.netloc or None
|
|
except:
|
|
return None
|
|
|
|
|
|
async def log_visit(
|
|
db: Session,
|
|
ip: str,
|
|
user_agent: str,
|
|
page_path: str,
|
|
page_title: Optional[str] = None,
|
|
referrer: Optional[str] = None,
|
|
session_id: Optional[str] = None,
|
|
user_id: Optional[int] = None,
|
|
utm_source: Optional[str] = None,
|
|
utm_medium: Optional[str] = None,
|
|
utm_campaign: Optional[str] = None,
|
|
) -> VisitorLog:
|
|
"""
|
|
Log a page visit
|
|
"""
|
|
# Hash IP for privacy
|
|
ip_hash = hash_ip(ip)
|
|
visitor_hash = hash_visitor(ip, user_agent)
|
|
|
|
# Parse device info
|
|
device_info = parse_device_info(user_agent)
|
|
|
|
# Get geo info (async)
|
|
geo_info = await get_geo_info(ip) or {}
|
|
|
|
# Extract referrer domain
|
|
referrer_domain = extract_referrer_domain(referrer)
|
|
|
|
# Create log entry
|
|
log = VisitorLog(
|
|
visitor_hash=visitor_hash,
|
|
ip_hash=ip_hash,
|
|
session_id=session_id,
|
|
user_id=user_id,
|
|
page_path=page_path,
|
|
page_title=page_title,
|
|
referrer=referrer,
|
|
referrer_domain=referrer_domain,
|
|
device_type=device_info["device_type"],
|
|
browser=device_info["browser"],
|
|
browser_version=device_info["browser_version"],
|
|
os=device_info["os"],
|
|
os_version=device_info["os_version"],
|
|
country=geo_info.get("country"),
|
|
country_code=geo_info.get("country_code"),
|
|
city=geo_info.get("city"),
|
|
region=geo_info.get("region"),
|
|
utm_source=utm_source,
|
|
utm_medium=utm_medium,
|
|
utm_campaign=utm_campaign,
|
|
)
|
|
|
|
db.add(log)
|
|
|
|
# Update or create session
|
|
if session_id:
|
|
session = db.query(VisitorSession).filter(
|
|
VisitorSession.session_id == session_id
|
|
).first()
|
|
|
|
if session:
|
|
session.last_page = page_path
|
|
session.page_count += 1
|
|
session.last_activity_at = datetime.utcnow()
|
|
if user_id and not session.user_id:
|
|
session.user_id = user_id
|
|
else:
|
|
session = VisitorSession(
|
|
session_id=session_id,
|
|
visitor_hash=visitor_hash,
|
|
user_id=user_id,
|
|
first_page=page_path,
|
|
last_page=page_path,
|
|
device_type=device_info["device_type"],
|
|
browser=device_info["browser"],
|
|
country=geo_info.get("country"),
|
|
)
|
|
db.add(session)
|
|
|
|
db.commit()
|
|
db.refresh(log)
|
|
|
|
return log
|
|
|
|
|
|
def aggregate_daily_stats(db: Session, date_str: str) -> Optional[VisitorDailyStats]:
|
|
"""
|
|
Aggregate visitor stats for a given date (YYYY-MM-DD)
|
|
Called by scheduled task
|
|
"""
|
|
# Query all visits for the date
|
|
visits = db.query(VisitorLog).filter(
|
|
func.date(VisitorLog.visited_at) == date_str
|
|
).all()
|
|
|
|
if not visits:
|
|
return None
|
|
|
|
total_visits = len(visits)
|
|
unique_visitors = len(set(v.visitor_hash for v in visits))
|
|
|
|
# Device breakdown
|
|
device_counts = {}
|
|
for v in visits:
|
|
device = v.device_type or "unknown"
|
|
device_counts[device] = device_counts.get(device, 0) + 1
|
|
|
|
# Browser breakdown
|
|
browser_counts = {}
|
|
for v in visits:
|
|
browser = v.browser or "unknown"
|
|
browser_counts[browser] = browser_counts.get(browser, 0) + 1
|
|
|
|
# Country breakdown
|
|
country_counts = {}
|
|
for v in visits:
|
|
country = v.country_code or "unknown"
|
|
country_counts[country] = country_counts.get(country, 0) + 1
|
|
|
|
# Top pages
|
|
page_counts = {}
|
|
for v in visits:
|
|
page_counts[v.page_path] = page_counts.get(v.page_path, 0) + 1
|
|
top_pages = sorted(
|
|
[{"path": k, "views": v} for k, v in page_counts.items()],
|
|
key=lambda x: x["views"],
|
|
reverse=True
|
|
)[:20]
|
|
|
|
# Top referrers
|
|
referrer_counts = {}
|
|
for v in visits:
|
|
if v.referrer_domain:
|
|
referrer_counts[v.referrer_domain] = referrer_counts.get(v.referrer_domain, 0) + 1
|
|
top_referrers = sorted(
|
|
[{"domain": k, "visits": v} for k, v in referrer_counts.items()],
|
|
key=lambda x: x["visits"],
|
|
reverse=True
|
|
)[:10]
|
|
|
|
# Create or update daily stats
|
|
existing = db.query(VisitorDailyStats).filter(
|
|
VisitorDailyStats.stat_date == date_str
|
|
).first()
|
|
|
|
if existing:
|
|
existing.total_visits = total_visits
|
|
existing.unique_visitors = unique_visitors
|
|
existing.device_breakdown = json.dumps(device_counts)
|
|
existing.browser_breakdown = json.dumps(browser_counts)
|
|
existing.country_breakdown = json.dumps(country_counts)
|
|
existing.top_pages = json.dumps(top_pages)
|
|
existing.top_referrers = json.dumps(top_referrers)
|
|
stats = existing
|
|
else:
|
|
stats = VisitorDailyStats(
|
|
stat_date=date_str,
|
|
total_visits=total_visits,
|
|
unique_visitors=unique_visitors,
|
|
device_breakdown=json.dumps(device_counts),
|
|
browser_breakdown=json.dumps(browser_counts),
|
|
country_breakdown=json.dumps(country_counts),
|
|
top_pages=json.dumps(top_pages),
|
|
top_referrers=json.dumps(top_referrers),
|
|
)
|
|
db.add(stats)
|
|
|
|
db.commit()
|
|
return stats
|
|
|
|
|
|
def cleanup_old_visitor_logs(db: Session, days: int = 90) -> int:
|
|
"""Delete visitor logs older than specified days"""
|
|
cutoff = datetime.now() - timedelta(days=days)
|
|
deleted = db.query(VisitorLog).filter(
|
|
VisitorLog.visited_at < cutoff
|
|
).delete()
|
|
db.commit()
|
|
return deleted
|