File size: 3,967 Bytes
67a227e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e533adc
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
import os
import json
from datetime import datetime, timedelta

class UsageTracker:
    def __init__(self, filename='usage_data.json'):
        self.filename = filename
        self.usage_data = self.load_usage_data()

    def load_usage_data(self):
        """Load existing usage data from file or create a new structure"""
        try:
            if os.path.exists(self.filename):
                with open(self.filename, 'r') as f:
                    return json.load(f)
        except (json.JSONDecodeError, FileNotFoundError):
            pass
        
        # Initialize usage data structure
        return {
            "total_requests": 0,
            "models": {},
            "daily_usage": {},
            "api_endpoints": {}
        }

    def record_request(self, model=None, endpoint=None):
        """Record a request for a specific model and/or endpoint"""
        # Get current date
        today = datetime.now().strftime("%Y-%m-%d")
        
        # Update total requests
        self.usage_data["total_requests"] += 1
        
        # Track model if provided
        if model:
            if model not in self.usage_data["models"]:
                self.usage_data["models"][model] = {
                    "total_requests": 0,
                    "first_used": datetime.now().isoformat(),
                    "last_used": datetime.now().isoformat()
                }
            
            model_data = self.usage_data["models"][model]
            model_data["total_requests"] += 1
            model_data["last_used"] = datetime.now().isoformat()
        
        # Track endpoint if provided
        if endpoint:
            if endpoint not in self.usage_data["api_endpoints"]:
                self.usage_data["api_endpoints"][endpoint] = {
                    "total_requests": 0,
                    "first_used": datetime.now().isoformat(),
                    "last_used": datetime.now().isoformat()
                }
            
            endpoint_data = self.usage_data["api_endpoints"][endpoint]
            endpoint_data["total_requests"] += 1
            endpoint_data["last_used"] = datetime.now().isoformat()
        
        # Update daily usage
        if today not in self.usage_data["daily_usage"]:
            self.usage_data["daily_usage"][today] = {}
        
        # Track daily usage for model
        if model:
            if model not in self.usage_data["daily_usage"][today]:
                self.usage_data["daily_usage"][today][model] = 0
            self.usage_data["daily_usage"][today][model] += 1
        
        # Track daily usage for endpoint
        if endpoint:
            if endpoint not in self.usage_data["daily_usage"][today]:
                self.usage_data["daily_usage"][today][endpoint] = 0
            self.usage_data["daily_usage"][today][endpoint] += 1
        
        # Save updated usage data
        self.save_usage_data()

    def save_usage_data(self):
        """Save usage data to file"""
        try:
            with open(self.filename, 'w') as f:
                json.dump(self.usage_data, f, indent=4)
        except Exception as e:
            print(f"Error saving usage data: {e}")

    def get_usage_summary(self, days=7):
        """Get usage summary for the last specified number of days"""
        # Calculate the date threshold
        threshold_date = (datetime.now() - timedelta(days=days)).strftime("%Y-%m-%d")
        
        # Prepare summary
        summary = {
            "total_requests": self.usage_data["total_requests"],
            "models": self.usage_data["models"],
            "api_endpoints": self.usage_data["api_endpoints"],
            "recent_daily_usage": {}
        }
        
        # Collect recent daily usage
        for date, date_data in sorted(self.usage_data["daily_usage"].items(), reverse=True):
            if date >= threshold_date:
                summary["recent_daily_usage"][date] = date_data
        
        return summary