462 lines
16 KiB
Python
462 lines
16 KiB
Python
from fastapi import APIRouter, Depends, Request, Query
|
|
from sqlalchemy.orm import Session
|
|
from sqlalchemy import func, desc
|
|
from datetime import date, datetime, timedelta
|
|
from typing import Optional, List
|
|
import httpx
|
|
import re
|
|
from app.core.database import get_db
|
|
from app.models.visitor import DailyVisitor, VisitorLog, VisitorDailyStats
|
|
from app.schemas.visitor import (
|
|
VisitorStatsResponse, CountryStatsResponse,
|
|
OverviewStatsResponse, ChartDataResponse, BreakdownResponse, BreakdownItem,
|
|
TopPagesResponse, TopPageItem, TopReferrersResponse, TopReferrerItem,
|
|
RealtimeStatsResponse
|
|
)
|
|
from app.api.auth import get_current_admin
|
|
|
|
router = APIRouter(prefix="/visitors", tags=["visitors"])
|
|
|
|
|
|
def get_ip_location(ip_address: str) -> dict:
|
|
"""Get location info from IP address using ip-api.com (free, no API key)"""
|
|
if ip_address in ("127.0.0.1", "localhost", "unknown") or ip_address.startswith(("192.168.", "10.", "172.")):
|
|
return {"country": "Local", "country_code": "LO", "city": "Local", "region": "Local"}
|
|
|
|
try:
|
|
with httpx.Client(timeout=3.0) as client:
|
|
response = client.get(f"http://ip-api.com/json/{ip_address}?fields=status,country,countryCode,city,regionName")
|
|
if response.status_code == 200:
|
|
data = response.json()
|
|
if data.get("status") == "success":
|
|
return {
|
|
"country": data.get("country", "Unknown"),
|
|
"country_code": data.get("countryCode", "??"),
|
|
"city": data.get("city", "Unknown"),
|
|
"region": data.get("regionName", "Unknown")
|
|
}
|
|
except Exception:
|
|
pass
|
|
|
|
return {"country": "Unknown", "country_code": "??", "city": "Unknown", "region": "Unknown"}
|
|
|
|
|
|
def get_client_ip(request: Request) -> str:
|
|
"""Get client IP address from request"""
|
|
forwarded = request.headers.get("X-Forwarded-For")
|
|
if forwarded:
|
|
return forwarded.split(",")[0].strip()
|
|
return request.client.host if request.client else "unknown"
|
|
|
|
|
|
def parse_user_agent(user_agent: str) -> dict:
|
|
"""Parse User-Agent string to extract device, browser, and OS info"""
|
|
result = {"device_type": "desktop", "browser": "Unknown", "os": "Unknown"}
|
|
|
|
if not user_agent:
|
|
return result
|
|
|
|
ua_lower = user_agent.lower()
|
|
|
|
# Detect device type
|
|
if any(x in ua_lower for x in ["mobile", "android", "iphone", "ipod"]):
|
|
result["device_type"] = "mobile"
|
|
elif any(x in ua_lower for x in ["ipad", "tablet"]):
|
|
result["device_type"] = "tablet"
|
|
|
|
# Detect browser
|
|
if "edg" in ua_lower:
|
|
result["browser"] = "Edge"
|
|
elif "chrome" in ua_lower and "chromium" not in ua_lower:
|
|
result["browser"] = "Chrome"
|
|
elif "safari" in ua_lower and "chrome" not in ua_lower:
|
|
result["browser"] = "Safari"
|
|
elif "firefox" in ua_lower:
|
|
result["browser"] = "Firefox"
|
|
elif "opera" in ua_lower or "opr" in ua_lower:
|
|
result["browser"] = "Opera"
|
|
elif "msie" in ua_lower or "trident" in ua_lower:
|
|
result["browser"] = "Internet Explorer"
|
|
|
|
# Detect OS
|
|
if "windows" in ua_lower:
|
|
result["os"] = "Windows"
|
|
elif "mac os" in ua_lower or "macintosh" in ua_lower:
|
|
if "iphone" in ua_lower or "ipad" in ua_lower:
|
|
result["os"] = "iOS"
|
|
else:
|
|
result["os"] = "macOS"
|
|
elif "android" in ua_lower:
|
|
result["os"] = "Android"
|
|
elif "linux" in ua_lower:
|
|
result["os"] = "Linux"
|
|
elif "iphone" in ua_lower or "ipad" in ua_lower:
|
|
result["os"] = "iOS"
|
|
|
|
return result
|
|
|
|
|
|
def extract_referrer_domain(referrer: str) -> str:
|
|
"""Extract domain from referrer URL"""
|
|
if not referrer:
|
|
return "(direct)"
|
|
try:
|
|
match = re.search(r'https?://([^/]+)', referrer)
|
|
if match:
|
|
domain = match.group(1)
|
|
if domain.startswith("www."):
|
|
domain = domain[4:]
|
|
return domain
|
|
except Exception:
|
|
pass
|
|
return "(direct)"
|
|
|
|
|
|
@router.post("/track")
|
|
def track_visitor(
|
|
request: Request,
|
|
page_path: Optional[str] = None,
|
|
referrer: Optional[str] = None,
|
|
db: Session = Depends(get_db)
|
|
):
|
|
"""Track a visitor with extended info"""
|
|
today = date.today()
|
|
ip_address = get_client_ip(request)
|
|
user_agent = request.headers.get("User-Agent", "")[:500]
|
|
|
|
existing_log = db.query(VisitorLog).filter(
|
|
VisitorLog.ip_address == ip_address,
|
|
VisitorLog.visit_date == today
|
|
).first()
|
|
|
|
if existing_log:
|
|
return {"message": "Already tracked", "new_visitor": False}
|
|
|
|
location = get_ip_location(ip_address)
|
|
ua_info = parse_user_agent(user_agent)
|
|
referrer_domain = extract_referrer_domain(referrer)
|
|
|
|
visitor_log = VisitorLog(
|
|
ip_address=ip_address,
|
|
user_agent=user_agent,
|
|
visit_date=today,
|
|
country=location["country"],
|
|
country_code=location["country_code"],
|
|
city=location["city"],
|
|
region=location["region"],
|
|
page_path=page_path,
|
|
referrer=referrer,
|
|
referrer_domain=referrer_domain,
|
|
device_type=ua_info["device_type"],
|
|
browser=ua_info["browser"],
|
|
os=ua_info["os"]
|
|
)
|
|
db.add(visitor_log)
|
|
|
|
daily_record = db.query(DailyVisitor).filter(
|
|
DailyVisitor.visit_date == today
|
|
).first()
|
|
|
|
if daily_record:
|
|
daily_record.visitor_count += 1
|
|
else:
|
|
daily_record = DailyVisitor(visit_date=today, visitor_count=1)
|
|
db.add(daily_record)
|
|
|
|
db.commit()
|
|
return {"message": "Tracked", "new_visitor": True}
|
|
|
|
|
|
@router.get("/stats", response_model=VisitorStatsResponse)
|
|
def get_visitor_stats(db: Session = Depends(get_db), _: str = Depends(get_current_admin)):
|
|
today = date.today()
|
|
today_record = db.query(DailyVisitor).filter(DailyVisitor.visit_date == today).first()
|
|
today_visitors = today_record.visitor_count if today_record else 0
|
|
total_result = db.query(func.sum(DailyVisitor.visitor_count)).scalar()
|
|
return VisitorStatsResponse(today_visitors=today_visitors, total_visitors=total_result or 0)
|
|
|
|
|
|
@router.get("/stats/public", response_model=VisitorStatsResponse)
|
|
def get_visitor_stats_public(db: Session = Depends(get_db)):
|
|
today = date.today()
|
|
today_record = db.query(DailyVisitor).filter(DailyVisitor.visit_date == today).first()
|
|
today_visitors = today_record.visitor_count if today_record else 0
|
|
total_result = db.query(func.sum(DailyVisitor.visitor_count)).scalar()
|
|
return VisitorStatsResponse(today_visitors=today_visitors, total_visitors=total_result or 0)
|
|
|
|
|
|
@router.get("/stats/countries", response_model=List[CountryStatsResponse])
|
|
def get_country_stats(db: Session = Depends(get_db), _: str = Depends(get_current_admin)):
|
|
country_stats = db.query(
|
|
VisitorLog.country, VisitorLog.country_code,
|
|
func.count(VisitorLog.id).label("visitor_count")
|
|
).group_by(VisitorLog.country, VisitorLog.country_code).order_by(
|
|
func.count(VisitorLog.id).desc()
|
|
).limit(10).all()
|
|
|
|
return [
|
|
CountryStatsResponse(
|
|
country=stat.country or "Unknown",
|
|
country_code=stat.country_code or "??",
|
|
visitor_count=stat.visitor_count
|
|
)
|
|
for stat in country_stats
|
|
]
|
|
|
|
|
|
# ============ Extended Statistics Endpoints ============
|
|
|
|
@router.get("/admin/overview", response_model=OverviewStatsResponse)
|
|
def get_overview_stats(
|
|
days: int = Query(default=30, ge=7, le=90),
|
|
db: Session = Depends(get_db),
|
|
_: str = Depends(get_current_admin)
|
|
):
|
|
"""Get comprehensive overview statistics"""
|
|
today = date.today()
|
|
start_date = today - timedelta(days=days)
|
|
yesterday = today - timedelta(days=1)
|
|
|
|
total_visits = db.query(func.sum(DailyVisitor.visitor_count)).filter(
|
|
DailyVisitor.visit_date >= start_date
|
|
).scalar() or 0
|
|
|
|
unique_visitors = db.query(func.count(func.distinct(VisitorLog.ip_address))).filter(
|
|
VisitorLog.visit_date >= start_date
|
|
).scalar() or 0
|
|
|
|
pages_per_visit = round(total_visits / max(unique_visitors, 1), 1)
|
|
|
|
total_with_device = db.query(func.count(VisitorLog.id)).filter(
|
|
VisitorLog.visit_date >= start_date,
|
|
VisitorLog.device_type.isnot(None)
|
|
).scalar() or 0
|
|
|
|
mobile_count = db.query(func.count(VisitorLog.id)).filter(
|
|
VisitorLog.visit_date >= start_date,
|
|
VisitorLog.device_type == "mobile"
|
|
).scalar() or 0
|
|
|
|
mobile_percentage = round((mobile_count / max(total_with_device, 1)) * 100, 1)
|
|
|
|
today_record = db.query(DailyVisitor).filter(DailyVisitor.visit_date == today).first()
|
|
today_visitors = today_record.visitor_count if today_record else 0
|
|
|
|
yesterday_record = db.query(DailyVisitor).filter(DailyVisitor.visit_date == yesterday).first()
|
|
yesterday_visitors = yesterday_record.visitor_count if yesterday_record else 0
|
|
|
|
if yesterday_visitors > 0:
|
|
growth_rate = round(((today_visitors - yesterday_visitors) / yesterday_visitors) * 100, 1)
|
|
else:
|
|
growth_rate = 0.0 if today_visitors == 0 else 100.0
|
|
|
|
return OverviewStatsResponse(
|
|
period=f"{days}d",
|
|
total_visits=total_visits,
|
|
unique_visitors=unique_visitors,
|
|
pages_per_visit=pages_per_visit,
|
|
mobile_percentage=mobile_percentage,
|
|
today_visitors=today_visitors,
|
|
yesterday_visitors=yesterday_visitors,
|
|
growth_rate=growth_rate
|
|
)
|
|
|
|
|
|
@router.get("/admin/chart/visits", response_model=ChartDataResponse)
|
|
def get_visits_chart(
|
|
days: int = Query(default=30, ge=7, le=90),
|
|
db: Session = Depends(get_db),
|
|
_: str = Depends(get_current_admin)
|
|
):
|
|
"""Get daily visits chart data"""
|
|
today = date.today()
|
|
start_date = today - timedelta(days=days-1)
|
|
|
|
records = db.query(DailyVisitor).filter(
|
|
DailyVisitor.visit_date >= start_date,
|
|
DailyVisitor.visit_date <= today
|
|
).order_by(DailyVisitor.visit_date).all()
|
|
|
|
record_dict = {r.visit_date: r.visitor_count for r in records}
|
|
|
|
labels = []
|
|
data = []
|
|
current = start_date
|
|
while current <= today:
|
|
labels.append(current.strftime("%m/%d"))
|
|
data.append(record_dict.get(current, 0))
|
|
current += timedelta(days=1)
|
|
|
|
return ChartDataResponse(labels=labels, data=data)
|
|
|
|
|
|
@router.get("/admin/chart/unique", response_model=ChartDataResponse)
|
|
def get_unique_visitors_chart(
|
|
days: int = Query(default=30, ge=7, le=90),
|
|
db: Session = Depends(get_db),
|
|
_: str = Depends(get_current_admin)
|
|
):
|
|
"""Get daily unique visitors chart data"""
|
|
today = date.today()
|
|
start_date = today - timedelta(days=days-1)
|
|
|
|
daily_unique = db.query(
|
|
VisitorLog.visit_date,
|
|
func.count(func.distinct(VisitorLog.ip_address)).label("unique_count")
|
|
).filter(
|
|
VisitorLog.visit_date >= start_date,
|
|
VisitorLog.visit_date <= today
|
|
).group_by(VisitorLog.visit_date).all()
|
|
|
|
unique_dict = {r.visit_date: r.unique_count for r in daily_unique}
|
|
|
|
labels = []
|
|
data = []
|
|
current = start_date
|
|
while current <= today:
|
|
labels.append(current.strftime("%m/%d"))
|
|
data.append(unique_dict.get(current, 0))
|
|
current += timedelta(days=1)
|
|
|
|
return ChartDataResponse(labels=labels, data=data)
|
|
|
|
|
|
@router.get("/admin/breakdown/device", response_model=BreakdownResponse)
|
|
def get_device_breakdown(
|
|
days: int = Query(default=30, ge=7, le=90),
|
|
db: Session = Depends(get_db),
|
|
_: str = Depends(get_current_admin)
|
|
):
|
|
start_date = date.today() - timedelta(days=days)
|
|
|
|
stats = db.query(
|
|
VisitorLog.device_type,
|
|
func.count(VisitorLog.id).label("count")
|
|
).filter(
|
|
VisitorLog.visit_date >= start_date,
|
|
VisitorLog.device_type.isnot(None)
|
|
).group_by(VisitorLog.device_type).order_by(desc("count")).all()
|
|
|
|
total = sum(s.count for s in stats) or 1
|
|
items = [
|
|
BreakdownItem(name=s.device_type or "Unknown", count=s.count, percentage=round((s.count / total) * 100, 1))
|
|
for s in stats
|
|
]
|
|
|
|
return BreakdownResponse(items=items, total=total)
|
|
|
|
|
|
@router.get("/admin/breakdown/browser", response_model=BreakdownResponse)
|
|
def get_browser_breakdown(
|
|
days: int = Query(default=30, ge=7, le=90),
|
|
db: Session = Depends(get_db),
|
|
_: str = Depends(get_current_admin)
|
|
):
|
|
start_date = date.today() - timedelta(days=days)
|
|
|
|
stats = db.query(
|
|
VisitorLog.browser,
|
|
func.count(VisitorLog.id).label("count")
|
|
).filter(
|
|
VisitorLog.visit_date >= start_date,
|
|
VisitorLog.browser.isnot(None)
|
|
).group_by(VisitorLog.browser).order_by(desc("count")).limit(10).all()
|
|
|
|
total = sum(s.count for s in stats) or 1
|
|
items = [
|
|
BreakdownItem(name=s.browser or "Unknown", count=s.count, percentage=round((s.count / total) * 100, 1))
|
|
for s in stats
|
|
]
|
|
|
|
return BreakdownResponse(items=items, total=total)
|
|
|
|
|
|
@router.get("/admin/breakdown/os", response_model=BreakdownResponse)
|
|
def get_os_breakdown(
|
|
days: int = Query(default=30, ge=7, le=90),
|
|
db: Session = Depends(get_db),
|
|
_: str = Depends(get_current_admin)
|
|
):
|
|
start_date = date.today() - timedelta(days=days)
|
|
|
|
stats = db.query(
|
|
VisitorLog.os,
|
|
func.count(VisitorLog.id).label("count")
|
|
).filter(
|
|
VisitorLog.visit_date >= start_date,
|
|
VisitorLog.os.isnot(None)
|
|
).group_by(VisitorLog.os).order_by(desc("count")).limit(10).all()
|
|
|
|
total = sum(s.count for s in stats) or 1
|
|
items = [
|
|
BreakdownItem(name=s.os or "Unknown", count=s.count, percentage=round((s.count / total) * 100, 1))
|
|
for s in stats
|
|
]
|
|
|
|
return BreakdownResponse(items=items, total=total)
|
|
|
|
|
|
@router.get("/admin/top-pages", response_model=TopPagesResponse)
|
|
def get_top_pages(
|
|
days: int = Query(default=30, ge=7, le=90),
|
|
db: Session = Depends(get_db),
|
|
_: str = Depends(get_current_admin)
|
|
):
|
|
start_date = date.today() - timedelta(days=days)
|
|
|
|
stats = db.query(
|
|
VisitorLog.page_path,
|
|
func.count(VisitorLog.id).label("count")
|
|
).filter(
|
|
VisitorLog.visit_date >= start_date,
|
|
VisitorLog.page_path.isnot(None)
|
|
).group_by(VisitorLog.page_path).order_by(desc("count")).limit(10).all()
|
|
|
|
total = sum(s.count for s in stats) or 1
|
|
pages = [
|
|
TopPageItem(path=s.page_path or "/", count=s.count, percentage=round((s.count / total) * 100, 1))
|
|
for s in stats
|
|
]
|
|
|
|
return TopPagesResponse(pages=pages, total=total)
|
|
|
|
|
|
@router.get("/admin/top-referrers", response_model=TopReferrersResponse)
|
|
def get_top_referrers(
|
|
days: int = Query(default=30, ge=7, le=90),
|
|
db: Session = Depends(get_db),
|
|
_: str = Depends(get_current_admin)
|
|
):
|
|
start_date = date.today() - timedelta(days=days)
|
|
|
|
stats = db.query(
|
|
VisitorLog.referrer_domain,
|
|
func.count(VisitorLog.id).label("count")
|
|
).filter(
|
|
VisitorLog.visit_date >= start_date,
|
|
VisitorLog.referrer_domain.isnot(None)
|
|
).group_by(VisitorLog.referrer_domain).order_by(desc("count")).limit(10).all()
|
|
|
|
total = sum(s.count for s in stats) or 1
|
|
referrers = [
|
|
TopReferrerItem(domain=s.referrer_domain or "(direct)", count=s.count, percentage=round((s.count / total) * 100, 1))
|
|
for s in stats
|
|
]
|
|
|
|
return TopReferrersResponse(referrers=referrers, total=total)
|
|
|
|
|
|
@router.get("/admin/realtime", response_model=RealtimeStatsResponse)
|
|
def get_realtime_stats(
|
|
db: Session = Depends(get_db),
|
|
_: str = Depends(get_current_admin)
|
|
):
|
|
now = datetime.now()
|
|
five_minutes_ago = now - timedelta(minutes=5)
|
|
|
|
active_count = db.query(func.count(VisitorLog.id)).filter(
|
|
VisitorLog.visited_at >= five_minutes_ago
|
|
).scalar() or 0
|
|
|
|
return RealtimeStatsResponse(active_visitors=active_count, last_5_minutes=active_count)
|