Files
AutonetSellCar/backend/app/services/visitor_service.py
AutonetSellCar Deploy b8f0ae4d28 feat: Add real-time visitor map with IP geolocation
- Add latitude/longitude columns to visitor_logs model
- Update visitor_service to fetch and store coordinates from ip-api.com
- Add /admin/map-data API endpoint for map visualization
- Create VisitorMap component using Leaflet/OpenStreetMap
- Integrate map into visitor-stats admin page
- 30-second auto-refresh with animation for new visitors
- Color-coded markers (red: active, blue: recent)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-05 18:31:22 +09:00

304 lines
9.4 KiB
Python

"""
Visitor Tracking Service
- Tracks page visits with privacy-preserving IP hashing
- Parses user agent for device/browser info
- Geolocation using free ip-api.com service
"""
import hashlib
import httpx
import json
from datetime import datetime, timedelta
from typing import Optional, Dict
from sqlalchemy.orm import Session
from sqlalchemy import func
from ..models.visitor import VisitorLog, VisitorDailyStats, VisitorSession
# IP Geolocation service (free, 45 req/min limit)
IP_API_URL = "http://ip-api.com/json/{ip}?fields=status,country,countryCode,regionName,city,lat,lon"
# Cache for IP geolocation results (in-memory, simple)
_geo_cache: Dict[str, Dict] = {}
_geo_cache_expiry: Dict[str, datetime] = {}
GEO_CACHE_TTL = timedelta(hours=24)
def hash_ip(ip: str) -> str:
"""Hash IP address for privacy"""
return hashlib.sha256(ip.encode()).hexdigest()
def hash_visitor(ip: str, user_agent: str) -> str:
"""Create unique visitor hash from IP + User-Agent"""
combined = f"{ip}:{user_agent}"
return hashlib.sha256(combined.encode()).hexdigest()
def parse_device_info(user_agent_string: str) -> Dict:
"""Parse user agent string for device/browser info"""
try:
from user_agents import parse as parse_user_agent
ua = parse_user_agent(user_agent_string)
# Determine device type
if ua.is_mobile:
device_type = "mobile"
elif ua.is_tablet:
device_type = "tablet"
else:
device_type = "desktop"
return {
"device_type": device_type,
"browser": ua.browser.family,
"browser_version": ua.browser.version_string,
"os": ua.os.family,
"os_version": ua.os.version_string,
}
except ImportError:
# Fallback if user-agents not installed
return {
"device_type": "unknown",
"browser": "unknown",
"browser_version": "",
"os": "unknown",
"os_version": "",
}
async def get_geo_info(ip: str) -> Optional[Dict]:
"""Get geographic info from IP address using free ip-api.com"""
# Check cache first
if ip in _geo_cache:
if datetime.now() < _geo_cache_expiry.get(ip, datetime.min):
return _geo_cache[ip]
# Skip private/local IPs
if ip.startswith(('127.', '192.168.', '10.', '172.16.', '172.17.', '172.18.', '172.19.',
'172.20.', '172.21.', '172.22.', '172.23.', '172.24.', '172.25.',
'172.26.', '172.27.', '172.28.', '172.29.', '172.30.', '172.31.', 'localhost', '::1')):
return {"country": "Local", "country_code": "LO", "region": "", "city": "", "latitude": None, "longitude": None}
try:
async with httpx.AsyncClient() as client:
response = await client.get(
IP_API_URL.format(ip=ip),
timeout=5.0
)
if response.status_code == 200:
data = response.json()
if data.get("status") == "success":
result = {
"country": data.get("country", "Unknown"),
"country_code": data.get("countryCode", ""),
"region": data.get("regionName", ""),
"city": data.get("city", ""),
"latitude": data.get("lat"),
"longitude": data.get("lon"),
}
# Cache the result
_geo_cache[ip] = result
_geo_cache_expiry[ip] = datetime.now() + GEO_CACHE_TTL
return result
except Exception as e:
print(f"Geo lookup failed for {ip}: {e}")
return None
def extract_referrer_domain(referrer: str) -> Optional[str]:
"""Extract domain from referrer URL"""
if not referrer:
return None
try:
from urllib.parse import urlparse
parsed = urlparse(referrer)
return parsed.netloc or None
except:
return None
async def log_visit(
db: Session,
ip: str,
user_agent: str,
page_path: str,
page_title: Optional[str] = None,
referrer: Optional[str] = None,
session_id: Optional[str] = None,
user_id: Optional[int] = None,
utm_source: Optional[str] = None,
utm_medium: Optional[str] = None,
utm_campaign: Optional[str] = None,
) -> VisitorLog:
"""
Log a page visit
"""
# Hash IP for privacy
ip_hash = hash_ip(ip)
visitor_hash = hash_visitor(ip, user_agent)
# Parse device info
device_info = parse_device_info(user_agent)
# Get geo info (async)
geo_info = await get_geo_info(ip) or {}
# Extract referrer domain
referrer_domain = extract_referrer_domain(referrer)
# Create log entry
log = VisitorLog(
visitor_hash=visitor_hash,
ip_hash=ip_hash,
session_id=session_id,
user_id=user_id,
page_path=page_path,
page_title=page_title,
referrer=referrer,
referrer_domain=referrer_domain,
device_type=device_info["device_type"],
browser=device_info["browser"],
browser_version=device_info["browser_version"],
os=device_info["os"],
os_version=device_info["os_version"],
country=geo_info.get("country"),
country_code=geo_info.get("country_code"),
city=geo_info.get("city"),
region=geo_info.get("region"),
latitude=geo_info.get("latitude"),
longitude=geo_info.get("longitude"),
utm_source=utm_source,
utm_medium=utm_medium,
utm_campaign=utm_campaign,
)
db.add(log)
# Update or create session
if session_id:
session = db.query(VisitorSession).filter(
VisitorSession.session_id == session_id
).first()
if session:
session.last_page = page_path
session.page_count += 1
session.last_activity_at = datetime.utcnow()
if user_id and not session.user_id:
session.user_id = user_id
else:
session = VisitorSession(
session_id=session_id,
visitor_hash=visitor_hash,
user_id=user_id,
first_page=page_path,
last_page=page_path,
device_type=device_info["device_type"],
browser=device_info["browser"],
country=geo_info.get("country"),
)
db.add(session)
db.commit()
db.refresh(log)
return log
def aggregate_daily_stats(db: Session, date_str: str) -> Optional[VisitorDailyStats]:
"""
Aggregate visitor stats for a given date (YYYY-MM-DD)
Called by scheduled task
"""
# Query all visits for the date
visits = db.query(VisitorLog).filter(
func.date(VisitorLog.visited_at) == date_str
).all()
if not visits:
return None
total_visits = len(visits)
unique_visitors = len(set(v.visitor_hash for v in visits))
# Device breakdown
device_counts = {}
for v in visits:
device = v.device_type or "unknown"
device_counts[device] = device_counts.get(device, 0) + 1
# Browser breakdown
browser_counts = {}
for v in visits:
browser = v.browser or "unknown"
browser_counts[browser] = browser_counts.get(browser, 0) + 1
# Country breakdown
country_counts = {}
for v in visits:
country = v.country_code or "unknown"
country_counts[country] = country_counts.get(country, 0) + 1
# Top pages
page_counts = {}
for v in visits:
page_counts[v.page_path] = page_counts.get(v.page_path, 0) + 1
top_pages = sorted(
[{"path": k, "views": v} for k, v in page_counts.items()],
key=lambda x: x["views"],
reverse=True
)[:20]
# Top referrers
referrer_counts = {}
for v in visits:
if v.referrer_domain:
referrer_counts[v.referrer_domain] = referrer_counts.get(v.referrer_domain, 0) + 1
top_referrers = sorted(
[{"domain": k, "visits": v} for k, v in referrer_counts.items()],
key=lambda x: x["visits"],
reverse=True
)[:10]
# Create or update daily stats
existing = db.query(VisitorDailyStats).filter(
VisitorDailyStats.stat_date == date_str
).first()
if existing:
existing.total_visits = total_visits
existing.unique_visitors = unique_visitors
existing.device_breakdown = json.dumps(device_counts)
existing.browser_breakdown = json.dumps(browser_counts)
existing.country_breakdown = json.dumps(country_counts)
existing.top_pages = json.dumps(top_pages)
existing.top_referrers = json.dumps(top_referrers)
stats = existing
else:
stats = VisitorDailyStats(
stat_date=date_str,
total_visits=total_visits,
unique_visitors=unique_visitors,
device_breakdown=json.dumps(device_counts),
browser_breakdown=json.dumps(browser_counts),
country_breakdown=json.dumps(country_counts),
top_pages=json.dumps(top_pages),
top_referrers=json.dumps(top_referrers),
)
db.add(stats)
db.commit()
return stats
def cleanup_old_visitor_logs(db: Session, days: int = 90) -> int:
"""Delete visitor logs older than specified days"""
cutoff = datetime.now() - timedelta(days=days)
deleted = db.query(VisitorLog).filter(
VisitorLog.visited_at < cutoff
).delete()
db.commit()
return deleted