import os import json from datetime import datetime, timedelta class UsageTracker: def __init__(self, filename='usage_data.json'): self.filename = filename self.usage_data = self.load_usage_data() def load_usage_data(self): """Load existing usage data from file or create a new structure""" try: if os.path.exists(self.filename): with open(self.filename, 'r') as f: return json.load(f) except (json.JSONDecodeError, FileNotFoundError): pass # Initialize usage data structure return { "total_requests": 0, "models": {}, "daily_usage": {}, "api_endpoints": {} } def record_request(self, model=None, endpoint=None): """Record a request for a specific model and/or endpoint""" # Get current date today = datetime.now().strftime("%Y-%m-%d") # Update total requests self.usage_data["total_requests"] += 1 # Track model if provided if model: if model not in self.usage_data["models"]: self.usage_data["models"][model] = { "total_requests": 0, "first_used": datetime.now().isoformat(), "last_used": datetime.now().isoformat() } model_data = self.usage_data["models"][model] model_data["total_requests"] += 1 model_data["last_used"] = datetime.now().isoformat() # Track endpoint if provided if endpoint: if endpoint not in self.usage_data["api_endpoints"]: self.usage_data["api_endpoints"][endpoint] = { "total_requests": 0, "first_used": datetime.now().isoformat(), "last_used": datetime.now().isoformat() } endpoint_data = self.usage_data["api_endpoints"][endpoint] endpoint_data["total_requests"] += 1 endpoint_data["last_used"] = datetime.now().isoformat() # Update daily usage if today not in self.usage_data["daily_usage"]: self.usage_data["daily_usage"][today] = {} # Track daily usage for model if model: if model not in self.usage_data["daily_usage"][today]: self.usage_data["daily_usage"][today][model] = 0 self.usage_data["daily_usage"][today][model] += 1 # Track daily usage for endpoint if endpoint: if endpoint not in self.usage_data["daily_usage"][today]: self.usage_data["daily_usage"][today][endpoint] = 0 self.usage_data["daily_usage"][today][endpoint] += 1 # Save updated usage data self.save_usage_data() def save_usage_data(self): """Save usage data to file""" try: with open(self.filename, 'w') as f: json.dump(self.usage_data, f, indent=4) except Exception as e: print(f"Error saving usage data: {e}") def get_usage_summary(self, days=7): """Get usage summary for the last specified number of days""" # Calculate the date threshold threshold_date = (datetime.now() - timedelta(days=days)).strftime("%Y-%m-%d") # Prepare summary summary = { "total_requests": self.usage_data["total_requests"], "models": self.usage_data["models"], "api_endpoints": self.usage_data["api_endpoints"], "recent_daily_usage": {} } # Collect recent daily usage for date, date_data in sorted(self.usage_data["daily_usage"].items(), reverse=True): if date >= threshold_date: summary["recent_daily_usage"][date] = date_data return summary