Files
AutonetSellCar/backend/app/services/visitor_service.py
AutonetSellCar Deploy 1f0dcb1ddb Initial commit: AutonetSellCar platform with deployment system
- Frontend: Next.js 14 with TypeScript
- Backend: FastAPI with SQLAlchemy
- Agent: Carmodoo sync agent
- Deployment: Docker Compose based staging/production setup
- Scripts: Automated deployment with rollback support

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-30 13:24:39 +09:00

300 lines
9.2 KiB
Python

"""
Visitor Tracking Service
- Tracks page visits with privacy-preserving IP hashing
- Parses user agent for device/browser info
- Geolocation using free ip-api.com service
"""
import hashlib
import httpx
import json
from datetime import datetime, timedelta
from typing import Optional, Dict
from sqlalchemy.orm import Session
from sqlalchemy import func
from ..models.visitor import VisitorLog, VisitorDailyStats, VisitorSession
# IP Geolocation service (free, 45 req/min limit)
IP_API_URL = "http://ip-api.com/json/{ip}?fields=status,country,countryCode,regionName,city"
# Cache for IP geolocation results (in-memory, simple)
_geo_cache: Dict[str, Dict] = {}
_geo_cache_expiry: Dict[str, datetime] = {}
GEO_CACHE_TTL = timedelta(hours=24)
def hash_ip(ip: str) -> str:
"""Hash IP address for privacy"""
return hashlib.sha256(ip.encode()).hexdigest()
def hash_visitor(ip: str, user_agent: str) -> str:
"""Create unique visitor hash from IP + User-Agent"""
combined = f"{ip}:{user_agent}"
return hashlib.sha256(combined.encode()).hexdigest()
def parse_device_info(user_agent_string: str) -> Dict:
"""Parse user agent string for device/browser info"""
try:
from user_agents import parse as parse_user_agent
ua = parse_user_agent(user_agent_string)
# Determine device type
if ua.is_mobile:
device_type = "mobile"
elif ua.is_tablet:
device_type = "tablet"
else:
device_type = "desktop"
return {
"device_type": device_type,
"browser": ua.browser.family,
"browser_version": ua.browser.version_string,
"os": ua.os.family,
"os_version": ua.os.version_string,
}
except ImportError:
# Fallback if user-agents not installed
return {
"device_type": "unknown",
"browser": "unknown",
"browser_version": "",
"os": "unknown",
"os_version": "",
}
async def get_geo_info(ip: str) -> Optional[Dict]:
"""Get geographic info from IP address using free ip-api.com"""
# Check cache first
if ip in _geo_cache:
if datetime.now() < _geo_cache_expiry.get(ip, datetime.min):
return _geo_cache[ip]
# Skip private/local IPs
if ip.startswith(('127.', '192.168.', '10.', '172.16.', '172.17.', '172.18.', '172.19.',
'172.20.', '172.21.', '172.22.', '172.23.', '172.24.', '172.25.',
'172.26.', '172.27.', '172.28.', '172.29.', '172.30.', '172.31.', 'localhost', '::1')):
return {"country": "Local", "country_code": "LO", "region": "", "city": ""}
try:
async with httpx.AsyncClient() as client:
response = await client.get(
IP_API_URL.format(ip=ip),
timeout=5.0
)
if response.status_code == 200:
data = response.json()
if data.get("status") == "success":
result = {
"country": data.get("country", "Unknown"),
"country_code": data.get("countryCode", ""),
"region": data.get("regionName", ""),
"city": data.get("city", ""),
}
# Cache the result
_geo_cache[ip] = result
_geo_cache_expiry[ip] = datetime.now() + GEO_CACHE_TTL
return result
except Exception as e:
print(f"Geo lookup failed for {ip}: {e}")
return None
def extract_referrer_domain(referrer: str) -> Optional[str]:
"""Extract domain from referrer URL"""
if not referrer:
return None
try:
from urllib.parse import urlparse
parsed = urlparse(referrer)
return parsed.netloc or None
except:
return None
async def log_visit(
db: Session,
ip: str,
user_agent: str,
page_path: str,
page_title: Optional[str] = None,
referrer: Optional[str] = None,
session_id: Optional[str] = None,
user_id: Optional[int] = None,
utm_source: Optional[str] = None,
utm_medium: Optional[str] = None,
utm_campaign: Optional[str] = None,
) -> VisitorLog:
"""
Log a page visit
"""
# Hash IP for privacy
ip_hash = hash_ip(ip)
visitor_hash = hash_visitor(ip, user_agent)
# Parse device info
device_info = parse_device_info(user_agent)
# Get geo info (async)
geo_info = await get_geo_info(ip) or {}
# Extract referrer domain
referrer_domain = extract_referrer_domain(referrer)
# Create log entry
log = VisitorLog(
visitor_hash=visitor_hash,
ip_hash=ip_hash,
session_id=session_id,
user_id=user_id,
page_path=page_path,
page_title=page_title,
referrer=referrer,
referrer_domain=referrer_domain,
device_type=device_info["device_type"],
browser=device_info["browser"],
browser_version=device_info["browser_version"],
os=device_info["os"],
os_version=device_info["os_version"],
country=geo_info.get("country"),
country_code=geo_info.get("country_code"),
city=geo_info.get("city"),
region=geo_info.get("region"),
utm_source=utm_source,
utm_medium=utm_medium,
utm_campaign=utm_campaign,
)
db.add(log)
# Update or create session
if session_id:
session = db.query(VisitorSession).filter(
VisitorSession.session_id == session_id
).first()
if session:
session.last_page = page_path
session.page_count += 1
session.last_activity_at = datetime.utcnow()
if user_id and not session.user_id:
session.user_id = user_id
else:
session = VisitorSession(
session_id=session_id,
visitor_hash=visitor_hash,
user_id=user_id,
first_page=page_path,
last_page=page_path,
device_type=device_info["device_type"],
browser=device_info["browser"],
country=geo_info.get("country"),
)
db.add(session)
db.commit()
db.refresh(log)
return log
def aggregate_daily_stats(db: Session, date_str: str) -> Optional[VisitorDailyStats]:
"""
Aggregate visitor stats for a given date (YYYY-MM-DD)
Called by scheduled task
"""
# Query all visits for the date
visits = db.query(VisitorLog).filter(
func.date(VisitorLog.visited_at) == date_str
).all()
if not visits:
return None
total_visits = len(visits)
unique_visitors = len(set(v.visitor_hash for v in visits))
# Device breakdown
device_counts = {}
for v in visits:
device = v.device_type or "unknown"
device_counts[device] = device_counts.get(device, 0) + 1
# Browser breakdown
browser_counts = {}
for v in visits:
browser = v.browser or "unknown"
browser_counts[browser] = browser_counts.get(browser, 0) + 1
# Country breakdown
country_counts = {}
for v in visits:
country = v.country_code or "unknown"
country_counts[country] = country_counts.get(country, 0) + 1
# Top pages
page_counts = {}
for v in visits:
page_counts[v.page_path] = page_counts.get(v.page_path, 0) + 1
top_pages = sorted(
[{"path": k, "views": v} for k, v in page_counts.items()],
key=lambda x: x["views"],
reverse=True
)[:20]
# Top referrers
referrer_counts = {}
for v in visits:
if v.referrer_domain:
referrer_counts[v.referrer_domain] = referrer_counts.get(v.referrer_domain, 0) + 1
top_referrers = sorted(
[{"domain": k, "visits": v} for k, v in referrer_counts.items()],
key=lambda x: x["visits"],
reverse=True
)[:10]
# Create or update daily stats
existing = db.query(VisitorDailyStats).filter(
VisitorDailyStats.stat_date == date_str
).first()
if existing:
existing.total_visits = total_visits
existing.unique_visitors = unique_visitors
existing.device_breakdown = json.dumps(device_counts)
existing.browser_breakdown = json.dumps(browser_counts)
existing.country_breakdown = json.dumps(country_counts)
existing.top_pages = json.dumps(top_pages)
existing.top_referrers = json.dumps(top_referrers)
stats = existing
else:
stats = VisitorDailyStats(
stat_date=date_str,
total_visits=total_visits,
unique_visitors=unique_visitors,
device_breakdown=json.dumps(device_counts),
browser_breakdown=json.dumps(browser_counts),
country_breakdown=json.dumps(country_counts),
top_pages=json.dumps(top_pages),
top_referrers=json.dumps(top_referrers),
)
db.add(stats)
db.commit()
return stats
def cleanup_old_visitor_logs(db: Session, days: int = 90) -> int:
"""Delete visitor logs older than specified days"""
cutoff = datetime.now() - timedelta(days=days)
deleted = db.query(VisitorLog).filter(
VisitorLog.visited_at < cutoff
).delete()
db.commit()
return deleted