Files
AutonetSellCar/backend/app/api/visitor.py
AutonetSellCar Deploy b8f0ae4d28 feat: Add real-time visitor map with IP geolocation
- Add latitude/longitude columns to visitor_logs model
- Update visitor_service to fetch and store coordinates from ip-api.com
- Add /admin/map-data API endpoint for map visualization
- Create VisitorMap component using Leaflet/OpenStreetMap
- Integrate map into visitor-stats admin page
- 30-second auto-refresh with animation for new visitors
- Color-coded markers (red: active, blue: recent)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-05 18:31:22 +09:00

434 lines
12 KiB
Python

"""
Visitor Analytics API
"""
from fastapi import APIRouter, Depends, Request, BackgroundTasks
from sqlalchemy.orm import Session
from sqlalchemy import func, and_, desc
from datetime import datetime, timedelta
from typing import Optional, List
from pydantic import BaseModel
from ..database import get_db
from ..models.visitor import VisitorLog, VisitorDailyStats
from ..models import User
from ..services.visitor_service import log_visit, aggregate_daily_stats
from .auth import get_current_admin_user, get_current_user_optional
router = APIRouter(prefix="/visitor", tags=["Visitor Analytics"])
# Pydantic schemas
class VisitLogRequest(BaseModel):
page_path: str
page_title: Optional[str] = None
referrer: Optional[str] = None
session_id: Optional[str] = None
utm_source: Optional[str] = None
utm_medium: Optional[str] = None
utm_campaign: Optional[str] = None
class VisitorStatsResponse(BaseModel):
total_visits: int
unique_visitors: int
device_breakdown: dict
browser_breakdown: dict
country_breakdown: dict
class ChartData(BaseModel):
labels: List[str]
values: List[int]
class TopPage(BaseModel):
path: str
views: int
title: Optional[str] = None
class TopReferrer(BaseModel):
domain: str
visits: int
# Background task wrapper for async log_visit
async def _log_visit_background(
db: Session,
ip: str,
user_agent: str,
page_path: str,
page_title: Optional[str],
referrer: Optional[str],
session_id: Optional[str],
user_id: Optional[int],
utm_source: Optional[str],
utm_medium: Optional[str],
utm_campaign: Optional[str],
):
"""Background wrapper for log_visit"""
try:
await log_visit(
db, ip, user_agent, page_path, page_title,
referrer, session_id, user_id,
utm_source, utm_medium, utm_campaign
)
except Exception as e:
print(f"[Visitor] Log visit failed: {e}")
# Public endpoint for logging visits
@router.post("/log")
async def log_page_visit(
visit_data: VisitLogRequest,
request: Request,
db: Session = Depends(get_db),
current_user: Optional[User] = Depends(get_current_user_optional),
):
"""
Log a page visit (called from frontend)
"""
# Get client IP (handle proxies)
forwarded_for = request.headers.get("X-Forwarded-For")
if forwarded_for:
ip = forwarded_for.split(",")[0].strip()
else:
ip = request.client.host if request.client else "unknown"
user_agent = request.headers.get("User-Agent", "")
user_id = current_user.id if current_user else None
# Log visit directly (async)
try:
await log_visit(
db,
ip,
user_agent,
visit_data.page_path,
visit_data.page_title,
visit_data.referrer,
visit_data.session_id,
user_id,
visit_data.utm_source,
visit_data.utm_medium,
visit_data.utm_campaign,
)
except Exception as e:
print(f"[Visitor] Log visit failed: {e}")
return {"status": "logged"}
# Admin endpoints
@router.get("/admin/overview", response_model=VisitorStatsResponse)
def get_visitor_overview(
days: int = 30,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_admin_user),
):
"""Get visitor statistics overview for last N days"""
start_date = datetime.utcnow() - timedelta(days=days)
# Total visits
total_visits = db.query(func.count(VisitorLog.id)).filter(
VisitorLog.visited_at >= start_date
).scalar() or 0
# Unique visitors
unique_visitors = db.query(func.count(func.distinct(VisitorLog.visitor_hash))).filter(
VisitorLog.visited_at >= start_date
).scalar() or 0
# Device breakdown
device_query = db.query(
VisitorLog.device_type,
func.count(VisitorLog.id)
).filter(
VisitorLog.visited_at >= start_date
).group_by(VisitorLog.device_type).all()
device_breakdown = {d[0] or "unknown": d[1] for d in device_query}
# Browser breakdown
browser_query = db.query(
VisitorLog.browser,
func.count(VisitorLog.id)
).filter(
VisitorLog.visited_at >= start_date
).group_by(VisitorLog.browser).all()
browser_breakdown = {b[0] or "unknown": b[1] for b in browser_query}
# Country breakdown
country_query = db.query(
VisitorLog.country_code,
func.count(VisitorLog.id)
).filter(
VisitorLog.visited_at >= start_date
).group_by(VisitorLog.country_code).all()
country_breakdown = {c[0] or "unknown": c[1] for c in country_query}
return VisitorStatsResponse(
total_visits=total_visits,
unique_visitors=unique_visitors,
device_breakdown=device_breakdown,
browser_breakdown=browser_breakdown,
country_breakdown=country_breakdown,
)
@router.get("/admin/chart/visits", response_model=ChartData)
def get_visits_chart(
days: int = 30,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_admin_user),
):
"""Get daily visits chart data"""
today = datetime.utcnow().date()
labels = []
values = []
for i in range(days - 1, -1, -1):
date = today - timedelta(days=i)
date_str = date.strftime("%Y-%m-%d")
count = db.query(func.count(VisitorLog.id)).filter(
func.date(VisitorLog.visited_at) == date_str
).scalar() or 0
labels.append(date.strftime("%m/%d"))
values.append(count)
return ChartData(labels=labels, values=values)
@router.get("/admin/chart/unique-visitors", response_model=ChartData)
def get_unique_visitors_chart(
days: int = 30,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_admin_user),
):
"""Get daily unique visitors chart data"""
today = datetime.utcnow().date()
labels = []
values = []
for i in range(days - 1, -1, -1):
date = today - timedelta(days=i)
date_str = date.strftime("%Y-%m-%d")
count = db.query(func.count(func.distinct(VisitorLog.visitor_hash))).filter(
func.date(VisitorLog.visited_at) == date_str
).scalar() or 0
labels.append(date.strftime("%m/%d"))
values.append(count)
return ChartData(labels=labels, values=values)
@router.get("/admin/top-pages", response_model=List[TopPage])
def get_top_pages(
days: int = 30,
limit: int = 20,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_admin_user),
):
"""Get top visited pages"""
start_date = datetime.utcnow() - timedelta(days=days)
pages = db.query(
VisitorLog.page_path,
VisitorLog.page_title,
func.count(VisitorLog.id).label("views")
).filter(
VisitorLog.visited_at >= start_date
).group_by(
VisitorLog.page_path, VisitorLog.page_title
).order_by(
desc("views")
).limit(limit).all()
return [
TopPage(path=p[0], title=p[1], views=p[2])
for p in pages
]
@router.get("/admin/top-referrers", response_model=List[TopReferrer])
def get_top_referrers(
days: int = 30,
limit: int = 10,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_admin_user),
):
"""Get top referrer sources"""
start_date = datetime.utcnow() - timedelta(days=days)
referrers = db.query(
VisitorLog.referrer_domain,
func.count(VisitorLog.id).label("visits")
).filter(
and_(
VisitorLog.visited_at >= start_date,
VisitorLog.referrer_domain.isnot(None),
VisitorLog.referrer_domain != ""
)
).group_by(
VisitorLog.referrer_domain
).order_by(
desc("visits")
).limit(limit).all()
return [
TopReferrer(domain=r[0], visits=r[1])
for r in referrers
]
@router.get("/admin/realtime")
def get_realtime_visitors(
minutes: int = 5,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_admin_user),
):
"""Get visitors in the last N minutes (real-time)"""
start_time = datetime.utcnow() - timedelta(minutes=minutes)
active_count = db.query(func.count(func.distinct(VisitorLog.visitor_hash))).filter(
VisitorLog.visited_at >= start_time
).scalar() or 0
# Recent pages
recent_pages = db.query(
VisitorLog.page_path,
func.count(VisitorLog.id).label("views")
).filter(
VisitorLog.visited_at >= start_time
).group_by(
VisitorLog.page_path
).order_by(
desc("views")
).limit(5).all()
return {
"active_visitors": active_count,
"minutes": minutes,
"recent_pages": [{"path": p[0], "views": p[1]} for p in recent_pages],
}
@router.post("/admin/aggregate/{date_str}")
def trigger_aggregation(
date_str: str,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_admin_user),
):
"""Manually trigger aggregation for a specific date (YYYY-MM-DD)"""
result = aggregate_daily_stats(db, date_str)
if result:
return {"status": "success", "date": date_str}
return {"status": "no_data", "date": date_str}
@router.get("/admin/map-data")
def get_visitor_map_data(
minutes: int = 30,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_admin_user),
):
"""
Get visitor locations for map display
Returns recent visitors with lat/lng coordinates
"""
start_time = datetime.utcnow() - timedelta(minutes=minutes)
# Get recent visitors with coordinates
visitors = db.query(
VisitorLog.latitude,
VisitorLog.longitude,
VisitorLog.country,
VisitorLog.country_code,
VisitorLog.city,
VisitorLog.visited_at,
VisitorLog.page_path,
VisitorLog.device_type,
).filter(
and_(
VisitorLog.visited_at >= start_time,
VisitorLog.latitude.isnot(None),
VisitorLog.longitude.isnot(None),
)
).order_by(
desc(VisitorLog.visited_at)
).limit(100).all()
# Group by unique location for clustering
locations = []
location_set = set()
for v in visitors:
# Round coordinates for grouping (roughly city-level)
lat_key = round(v.latitude, 2) if v.latitude else 0
lng_key = round(v.longitude, 2) if v.longitude else 0
location_key = f"{lat_key}:{lng_key}"
if location_key not in location_set:
location_set.add(location_key)
locations.append({
"lat": v.latitude,
"lng": v.longitude,
"country": v.country,
"country_code": v.country_code,
"city": v.city,
"last_visit": v.visited_at.isoformat() if v.visited_at else None,
"page": v.page_path,
"device": v.device_type,
"count": 1, # Will be aggregated
})
else:
# Increment count for existing location
for loc in locations:
if round(loc["lat"], 2) == lat_key and round(loc["lng"], 2) == lng_key:
loc["count"] += 1
break
# Get recent activity for animation (last 5 minutes)
recent_start = datetime.utcnow() - timedelta(minutes=5)
recent_visitors = db.query(
VisitorLog.latitude,
VisitorLog.longitude,
VisitorLog.country,
VisitorLog.city,
VisitorLog.visited_at,
).filter(
and_(
VisitorLog.visited_at >= recent_start,
VisitorLog.latitude.isnot(None),
VisitorLog.longitude.isnot(None),
)
).order_by(
desc(VisitorLog.visited_at)
).limit(20).all()
recent = [
{
"lat": v.latitude,
"lng": v.longitude,
"country": v.country,
"city": v.city,
"time": v.visited_at.isoformat() if v.visited_at else None,
}
for v in recent_visitors
]
return {
"locations": locations,
"recent": recent,
"total_with_coords": len(visitors),
"minutes": minutes,
}