simondh commited on
Commit
0f1938f
·
1 Parent(s): 442b8d8

clean classifier

Browse files
classifiers/__init__.py ADDED
@@ -0,0 +1,3 @@
 
 
 
 
1
+ from .tfidf import TFIDFClassifier
2
+
3
+ __all__ = ['TFIDFClassifier', 'LLMClassifier']
classifiers/base.py ADDED
@@ -0,0 +1,48 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+
2
+
3
+ import numpy as np
4
+ import pandas as pd
5
+ from sklearn.feature_extraction.text import TfidfVectorizer
6
+ from sklearn.cluster import KMeans
7
+ from sklearn.metrics.pairwise import cosine_similarity
8
+ import random
9
+ import json
10
+ from concurrent.futures import ThreadPoolExecutor, as_completed
11
+ from typing import List, Dict, Any, Optional
12
+ from prompts import CATEGORY_SUGGESTION_PROMPT, TEXT_CLASSIFICATION_PROMPT
13
+
14
+
15
+ class BaseClassifier:
16
+ """Base class for text classifiers"""
17
+
18
+ def __init__(self):
19
+ pass
20
+
21
+ def classify(self, texts, categories=None):
22
+ """
23
+ Classify a list of texts into categories
24
+
25
+ Args:
26
+ texts (list): List of text strings to classify
27
+ categories (list, optional): List of category names. If None, categories will be auto-detected
28
+
29
+ Returns:
30
+ list: List of classification results with categories, confidence scores, and explanations
31
+ """
32
+ raise NotImplementedError("Subclasses must implement this method")
33
+
34
+ def _generate_default_categories(self, texts, num_clusters=5):
35
+ """
36
+ Generate default categories based on text clustering
37
+
38
+ Args:
39
+ texts (list): List of text strings
40
+ num_clusters (int): Number of clusters to generate
41
+
42
+ Returns:
43
+ list: List of category names
44
+ """
45
+ # Simple implementation - in real system this would be more sophisticated
46
+ default_categories = [f"Category {i+1}" for i in range(num_clusters)]
47
+ return default_categories
48
+
classifiers/llm.py ADDED
@@ -0,0 +1,137 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+
2
+ import numpy as np
3
+ import pandas as pd
4
+ from sklearn.feature_extraction.text import TfidfVectorizer
5
+ from sklearn.cluster import KMeans
6
+ from sklearn.metrics.pairwise import cosine_similarity
7
+ import random
8
+ import json
9
+ from concurrent.futures import ThreadPoolExecutor, as_completed
10
+ from typing import List, Dict, Any, Optional
11
+ from prompts import CATEGORY_SUGGESTION_PROMPT, TEXT_CLASSIFICATION_PROMPT
12
+
13
+ from base import BaseClassifier
14
+
15
+
16
+
17
+ class LLMClassifier(BaseClassifier):
18
+ """Classifier using a Large Language Model for more accurate but slower classification"""
19
+
20
+ def __init__(self, client, model="gpt-3.5-turbo"):
21
+ super().__init__()
22
+ self.client = client
23
+ self.model = model
24
+
25
+ def classify(
26
+ self, texts: List[str], categories: Optional[List[str]] = None
27
+ ) -> List[Dict[str, Any]]:
28
+ """Classify texts using an LLM with parallel processing"""
29
+ if not categories:
30
+ # First, use LLM to generate appropriate categories
31
+ categories = self._suggest_categories(texts)
32
+
33
+ # Process texts in parallel
34
+ with ThreadPoolExecutor(max_workers=10) as executor:
35
+ # Submit all tasks with their original indices
36
+ future_to_index = {
37
+ executor.submit(self._classify_text, text, categories): idx
38
+ for idx, text in enumerate(texts)
39
+ }
40
+
41
+ # Initialize results list with None values
42
+ results = [None] * len(texts)
43
+
44
+ # Collect results as they complete
45
+ for future in as_completed(future_to_index):
46
+ original_idx = future_to_index[future]
47
+ try:
48
+ result = future.result()
49
+ results[original_idx] = result
50
+ except Exception as e:
51
+ print(f"Error processing text: {str(e)}")
52
+ results[original_idx] = {
53
+ "category": categories[0],
54
+ "confidence": 50,
55
+ "explanation": f"Error during classification: {str(e)}",
56
+ }
57
+
58
+ return results
59
+
60
+ def _suggest_categories(self, texts: List[str], sample_size: int = 20) -> List[str]:
61
+ """Use LLM to suggest appropriate categories for the dataset"""
62
+ # Take a sample of texts to avoid token limitations
63
+ if len(texts) > sample_size:
64
+ sample_texts = random.sample(texts, sample_size)
65
+ else:
66
+ sample_texts = texts
67
+
68
+ prompt = CATEGORY_SUGGESTION_PROMPT.format("\n---\n".join(sample_texts))
69
+
70
+ try:
71
+ response = self.client.chat.completions.create(
72
+ model=self.model,
73
+ messages=[{"role": "user", "content": prompt}],
74
+ temperature=0.2,
75
+ max_tokens=100,
76
+ )
77
+
78
+ # Parse response to get categories
79
+ categories_text = response.choices[0].message.content.strip()
80
+ categories = [cat.strip() for cat in categories_text.split(",")]
81
+
82
+ return categories
83
+ except Exception as e:
84
+ # Fallback to default categories on error
85
+ print(f"Error suggesting categories: {str(e)}")
86
+ return self._generate_default_categories(texts)
87
+
88
+ def _classify_text(self, text: str, categories: List[str]) -> Dict[str, Any]:
89
+ """Use LLM to classify a single text"""
90
+ prompt = TEXT_CLASSIFICATION_PROMPT.format(
91
+ categories=", ".join(categories), text=text
92
+ )
93
+
94
+ try:
95
+ response = self.client.chat.completions.create(
96
+ model=self.model,
97
+ messages=[{"role": "user", "content": prompt}],
98
+ temperature=0,
99
+ max_tokens=200,
100
+ )
101
+
102
+ # Parse JSON response
103
+ response_text = response.choices[0].message.content.strip()
104
+
105
+ result = json.loads(response_text)
106
+ # Ensure all required fields are present
107
+ if not all(k in result for k in ["category", "confidence", "explanation"]):
108
+ raise ValueError("Missing required fields in LLM response")
109
+
110
+ # Validate category is in the list
111
+ if result["category"] not in categories:
112
+ result["category"] = categories[
113
+ 0
114
+ ] # Default to first category if invalid
115
+
116
+ # Validate confidence is a number between 0 and 100
117
+ try:
118
+ result["confidence"] = float(result["confidence"])
119
+ if not 0 <= result["confidence"] <= 100:
120
+ result["confidence"] = 50
121
+ except:
122
+ result["confidence"] = 50
123
+
124
+ return result
125
+ except json.JSONDecodeError:
126
+ # Fall back to simple parsing if JSON fails
127
+ category = categories[0] # Default
128
+ for cat in categories:
129
+ if cat.lower() in response_text.lower():
130
+ category = cat
131
+ break
132
+
133
+ return {
134
+ "category": category,
135
+ "confidence": 50,
136
+ "explanation": f"Classification based on language model analysis. (Note: Structured response parsing failed)",
137
+ }
classifiers/tfidf.py ADDED
@@ -0,0 +1,113 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+
2
+ import numpy as np
3
+ import pandas as pd
4
+ from sklearn.feature_extraction.text import TfidfVectorizer
5
+ from sklearn.cluster import KMeans
6
+ from sklearn.metrics.pairwise import cosine_similarity
7
+ import random
8
+ import json
9
+ from concurrent.futures import ThreadPoolExecutor, as_completed
10
+ from typing import List, Dict, Any, Optional
11
+ from prompts import CATEGORY_SUGGESTION_PROMPT, TEXT_CLASSIFICATION_PROMPT
12
+
13
+ from base import BaseClassifier
14
+
15
+
16
+ class TFIDFClassifier(BaseClassifier):
17
+ """Classifier using TF-IDF and clustering for fast classification"""
18
+
19
+ def __init__(self):
20
+ super().__init__()
21
+ self.vectorizer = TfidfVectorizer(
22
+ max_features=1000, stop_words="english", ngram_range=(1, 2)
23
+ )
24
+ self.model = None
25
+ self.feature_names = None
26
+ self.categories = None
27
+ self.centroids = None
28
+
29
+ def classify(self, texts, categories=None):
30
+ """Classify texts using TF-IDF and clustering"""
31
+ # Vectorize the texts
32
+ X = self.vectorizer.fit_transform(texts)
33
+ self.feature_names = self.vectorizer.get_feature_names_out()
34
+
35
+ # Auto-detect categories if not provided
36
+ if not categories:
37
+ num_clusters = min(5, len(texts)) # Don't create more clusters than texts
38
+ self.categories = self._generate_default_categories(texts, num_clusters)
39
+ else:
40
+ self.categories = categories
41
+ num_clusters = len(categories)
42
+
43
+ # Cluster the texts
44
+ self.model = KMeans(n_clusters=num_clusters, random_state=42)
45
+ clusters = self.model.fit_predict(X)
46
+ self.centroids = self.model.cluster_centers_
47
+
48
+ # Calculate distances to centroids for confidence
49
+ distances = self._calculate_distances(X)
50
+
51
+ # Prepare results
52
+ results = []
53
+ for i, text in enumerate(texts):
54
+ cluster_idx = clusters[i]
55
+
56
+ # Calculate confidence (inverse of distance, normalized)
57
+ confidence = self._calculate_confidence(distances[i])
58
+
59
+ # Create explanation
60
+ explanation = self._generate_explanation(X[i], cluster_idx)
61
+
62
+ results.append(
63
+ {
64
+ "category": self.categories[cluster_idx],
65
+ "confidence": confidence,
66
+ "explanation": explanation,
67
+ }
68
+ )
69
+
70
+ return results
71
+
72
+ def _calculate_distances(self, X):
73
+ """Calculate distances from each point to each centroid"""
74
+ return np.sqrt(
75
+ (
76
+ (X.toarray()[:, np.newaxis, :] - self.centroids[np.newaxis, :, :]) ** 2
77
+ ).sum(axis=2)
78
+ )
79
+
80
+ def _calculate_confidence(self, distances):
81
+ """Convert distances to confidence scores (0-100)"""
82
+ min_dist = np.min(distances)
83
+ max_dist = np.max(distances)
84
+
85
+ # Normalize and invert (smaller distance = higher confidence)
86
+ if max_dist == min_dist:
87
+ return 70 # Default mid-range confidence when all distances are equal
88
+
89
+ normalized_dist = (distances - min_dist) / (max_dist - min_dist)
90
+ min_normalized = np.min(normalized_dist)
91
+
92
+ # Invert and scale to 50-100 range (TF-IDF is never 100% confident)
93
+ confidence = 100 - (min_normalized * 50)
94
+ return round(confidence, 1)
95
+
96
+ def _generate_explanation(self, text_vector, cluster_idx):
97
+ """Generate an explanation for the classification"""
98
+ # Get the most important features for this cluster
99
+ centroid = self.centroids[cluster_idx]
100
+
101
+ # Get indices of top features for this text
102
+ text_array = text_vector.toarray()[0]
103
+ top_indices = text_array.argsort()[-5:][::-1]
104
+
105
+ # Get the feature names for these indices
106
+ top_features = [self.feature_names[i] for i in top_indices if text_array[i] > 0]
107
+
108
+ if not top_features:
109
+ return "No significant features identified for this classification."
110
+
111
+ explanation = f"Classification based on key terms: {', '.join(top_features)}"
112
+ return explanation
113
+