|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
"""TODO: Add a description here.""" |
|
|
|
import evaluate |
|
import datasets |
|
import re |
|
import dateutil.parser |
|
import numpy as np |
|
from typing import List, Dict, Any |
|
|
|
|
|
timestamp_regex = r'^\s*\[?\s*(\d{4}[-/.]\d{2}[-/.]\d{2}(?:[ T]\d{2}[:]\d{2}(?:[:]\d{2}(?:[.,]\d+)?)?(?:Z|[+-]\d{2}[:]\d{2})?)?)\s*\]?\s*' |
|
TIMESTAMP_PATTERN = re.compile(timestamp_regex, re.MULTILINE) |
|
|
|
INT_PATTERN = re.compile(r'(-?\d+)') |
|
FLOAT_PATTERN = re.compile(r'(-?\d+\.\d+)') |
|
SACREBLEU_METRIC = evaluate.load("evaluate-metric/sacrebleu") |
|
|
|
|
|
_CITATION = """\ |
|
@InProceedings{huggingface:module, |
|
title = {A great new module}, |
|
authors={huggingface, Inc.}, |
|
year={2020} |
|
} |
|
""" |
|
|
|
|
|
_DESCRIPTION = """\ |
|
This new module is designed to solve this great ML task and is crafted with a lot of care. |
|
""" |
|
|
|
|
|
|
|
_KWARGS_DESCRIPTION = """ |
|
Calculates how good are predictions given some references, using certain scores |
|
Args: |
|
predictions: list of predictions to score. Each predictions |
|
should be a string with tokens separated by spaces. |
|
references: list of reference for each prediction. Each |
|
reference should be a string with tokens separated by spaces. |
|
Returns: |
|
accuracy: description of the first score, |
|
another_score: description of the second score, |
|
Examples: |
|
Examples should be written in doctest format, and should illustrate how |
|
to use the function. |
|
|
|
>>> my_new_module = evaluate.load("my_new_module") |
|
>>> results = my_new_module.compute(references=[0, 1], predictions=[0, 1]) |
|
>>> print(results) |
|
{'accuracy': 1.0} |
|
""" |
|
|
|
|
|
BAD_WORDS_URL = "http://url/to/external/resource/bad_words.txt" |
|
|
|
|
|
@evaluate.utils.file_utils.add_start_docstrings(_DESCRIPTION, _KWARGS_DESCRIPTION) |
|
class LogMetric(evaluate.Metric): |
|
"""TODO: Short description of my evaluation module.""" |
|
|
|
def _info(self): |
|
|
|
return evaluate.MetricInfo( |
|
|
|
module_type="metric", |
|
description=_DESCRIPTION, |
|
citation=_CITATION, |
|
inputs_description=_KWARGS_DESCRIPTION, |
|
|
|
|
|
features=datasets.Features({ |
|
"predictions": datasets.Value("string", id="sequence"), |
|
"references": datasets.Value("string", id="sequence"), |
|
}), |
|
|
|
homepage="http://module.homepage", |
|
|
|
codebase_urls=["http://github.com/path/to/codebase/of/new_module"], |
|
reference_urls=["http://path.to.reference.url/new_module"] |
|
) |
|
|
|
def _download_and_prepare(self, dl_manager): |
|
"""Optional: download external resources useful to compute the scores""" |
|
|
|
pass |
|
|
|
|
|
def _compute(self, predictions, references): |
|
|
|
metric_dicts = [PredRefScore(p,r).run() for p,r in zip(predictions,references)] |
|
|
|
keys = metric_dicts[0].keys() |
|
|
|
|
|
values = np.array([list(d.values()) for d in metric_dicts]) |
|
|
|
|
|
mean_values = np.mean(values, axis=0) |
|
|
|
|
|
metric_result = dict(zip(keys, mean_values)) |
|
|
|
return metric_result |
|
|
|
|
|
class PredRefScore: |
|
scores : Dict[str, float]= {} |
|
|
|
def __init__(self, prediction : str, reference: str) -> Dict[str, float]: |
|
self.reference = reference.strip(' \t\n\r') |
|
self.prediction = prediction.strip(' \t\n\r') |
|
|
|
def run(self): |
|
self.getLogMetric() |
|
return self.scores |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_length_score(self, preds_split : List[Any], refs_split : List[Any]) -> float: |
|
pred_content_lengths = np.vectorize(len)(preds_split) |
|
ref_content_lengths = np.vectorize(len)(refs_split) |
|
|
|
return self.smapeScore(pred_content_lengths, ref_content_lengths) |
|
|
|
|
|
def smapeScore(self, P, R) -> float: |
|
P_isnumber = isinstance(P, (int, float)) |
|
R_isnumber = isinstance(R, (int, float)) |
|
|
|
|
|
assert P_isnumber == R_isnumber |
|
|
|
if not P_isnumber: |
|
assert(len(P) == len(R)) |
|
|
|
if P_isnumber and R_isnumber: |
|
if P == 0 and R == 0: |
|
return 1.0 |
|
return 1 - (np.sum(np.abs(R - P) / (np.abs(R) + np.abs(P)))) |
|
else: |
|
if len(P) == 0 and len(R) == 0: |
|
return 1.0 |
|
n = len(P) |
|
P = np.array(P) |
|
R = np.array(R) |
|
denominator = np.abs(R) + np.abs(P) |
|
|
|
|
|
denominator[denominator == 0] = 1 |
|
|
|
return 1 - (1.0/n * np.sum(np.abs(R - P) / denominator)) |
|
|
|
|
|
def replaceNumbers(self, text : str) -> str: |
|
text = INT_PATTERN.sub(r'<|INT|>', text) |
|
text = FLOAT_PATTERN.sub(r'<|FLOAT|>', text) |
|
return text |
|
|
|
|
|
def split_log_entry(self, pred : str, ref: str): |
|
pred_lines = pred.splitlines() |
|
ref_lines = ref.splitlines() |
|
|
|
|
|
pred_timestamps, pred_logMessages = [], [] |
|
ref_timestamps, ref_logMessages = [], [] |
|
|
|
for i in range(len(pred_lines)): |
|
if TIMESTAMP_PATTERN.match(pred_lines[i]) is not None: |
|
|
|
_, pred_ts, pred_msg = TIMESTAMP_PATTERN.split(pred_lines[i]) |
|
pred_timestamps.append(pred_ts) |
|
pred_logMessages.append(pred_msg) |
|
else: |
|
|
|
pred_msg = pred_lines[i] |
|
pred_logMessages.append(pred_msg) |
|
|
|
for i in range(len(ref_lines)): |
|
if TIMESTAMP_PATTERN.match(ref_lines[i]) is None: |
|
raise ValueError("The provided regex can't parse a timestamp in a reference log. Please make sure that the regex can parse a provided reference log format. Line: " + ref_lines[i]) |
|
_, ref_ts, ref_msg = TIMESTAMP_PATTERN.split(ref_lines[i]) |
|
ref_timestamps.append(ref_ts) |
|
ref_logMessages.append(ref_msg) |
|
|
|
|
|
max_logentries = max(len(pred_logMessages), len(ref_logMessages)) |
|
|
|
pred_logMessages += (max_logentries - len(pred_logMessages)) * [" "] |
|
ref_logMessages += (max_logentries- len(ref_logMessages)) * [" "] |
|
|
|
return pred_timestamps, pred_logMessages, ref_timestamps, ref_logMessages |
|
|
|
|
|
|
|
|
|
def set_linecount_score(self, pred : str, ref : str) -> None: |
|
pred_lines_amt = len(pred.splitlines()) |
|
ref_lines_amt = len(ref.splitlines()) |
|
self.scores["linecount_difference_SMAPE_score"] = self.smapeScore(pred_lines_amt, ref_lines_amt) |
|
|
|
def set_sacrebleu_score(self, pred_log_messages : List[str], ref_log_messages : List[str]) -> None: |
|
sacrebleu_score = SACREBLEU_METRIC.compute(predictions=pred_log_messages, references=ref_log_messages)["score"] / 100.0 |
|
self.scores["linecontent_sacrebleu_score"] = sacrebleu_score |
|
|
|
def set_smape_length_score(self, pred_log_messages : List[str], ref_log_messages : List[str]) -> None: |
|
smape_length_score = self.get_length_score(pred_log_messages, ref_log_messages) |
|
self.scores["linecontentlength_difference_SMAPE_score"] = smape_length_score |
|
|
|
def set_sacrebleu_withoutexplnumbers_score(self, pred_log_messages : List[str], ref_log_messages : List[str]): |
|
vectorized_replaceNumbers = np.vectorize(self.replaceNumbers) |
|
cleaned_pred_logMessages = vectorized_replaceNumbers(pred_log_messages) |
|
cleaned_ref_logMessages = vectorized_replaceNumbers(ref_log_messages) |
|
sacrebleu_withoutExplicitNumbers_score = SACREBLEU_METRIC.compute(predictions=cleaned_pred_logMessages, references=cleaned_ref_logMessages)["score"] / 100.0 |
|
self.scores["linecontent_sacrebleu_withoutExplicitNumbers_score"] = sacrebleu_withoutExplicitNumbers_score |
|
|
|
|
|
def all_linecontent_scores(self, pred_logMessages : List[str], ref_logMessages: List[str]) -> None: |
|
if pred_logMessages == [] and ref_logMessages == []: |
|
pred_logMessages = [""] |
|
ref_logMessages = [""] |
|
|
|
self.set_sacrebleu_score(pred_logMessages, ref_logMessages) |
|
self.set_smape_length_score(pred_logMessages, ref_logMessages) |
|
self.set_sacrebleu_withoutexplnumbers_score(pred_logMessages, ref_logMessages) |
|
|
|
def set_timestamp_amt_score(self, pred_timestamps : List[str], ref_timestamps : List[str]): |
|
timestamp_amt_score = self.smapeScore(len(pred_timestamps), len(ref_timestamps)) |
|
self.scores["timestamps_SMAPE_difference_score"] = timestamp_amt_score |
|
|
|
def set_timestamp_format_consistency_score(self, pred_timestamps, ref_timestamps): |
|
if (len(pred_timestamps) == 0): |
|
self.scores["timestamps_formatConsistency_score"] = 1.0 |
|
return |
|
|
|
pred_timestring_pattern = re.sub(r'\d', r'\\d', re.escape(pred_timestamps[0])).strip() |
|
all_consistent = all(re.fullmatch(pred_timestring_pattern, ts.strip()) is not None for ts in ref_timestamps) |
|
|
|
self.scores["timestamps_formatConsistency_score"] = 1.0 if all_consistent else 0.0 |
|
|
|
def set_timestamp_monotonicity_score(self, pred_timestamps) -> None: |
|
try: |
|
parsed_times = [dateutil.parser.parse(ts) for ts in pred_timestamps] |
|
except dateutil.parser.ParserError: |
|
self.scores["timestamps_monotinicity_score"] = 0.0 |
|
return |
|
|
|
|
|
all_monotone = all(t1 <= t2 for t1, t2 in zip(parsed_times, parsed_times[1:])) |
|
self.scores["timestamps_monotinicity_score"] = 1.0 if all_monotone else 0.0 |
|
|
|
|
|
def all_timestamp_scores(self, pred_timestamps, ref_timestamps) -> None: |
|
self.set_timestamp_amt_score(pred_timestamps, ref_timestamps) |
|
self.set_timestamp_format_consistency_score(pred_timestamps, ref_timestamps) |
|
self.set_timestamp_monotonicity_score(pred_timestamps) |
|
|
|
|
|
def getLogMetric(self): |
|
self.set_linecount_score(self.prediction, self.reference) |
|
|
|
pred_timestamps, pred_logMessages, ref_timestamps, ref_logMessages = self.split_log_entry(self.prediction, self.reference) |
|
self.all_linecontent_scores(pred_logMessages, ref_logMessages) |
|
self.all_timestamp_scores(pred_timestamps, ref_timestamps) |
|
|