logmetric / logmetric.py
svenwey's picture
implement 0. space heuristic as fallback in case the timestamp can't be parsed
582c535
# Copyright 2020 The HuggingFace Datasets Authors and the current dataset script contributor.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""TODO: Add a description here."""
import evaluate
import datasets
import re
import dateutil.parser
import numpy as np
from typing import List, Dict, Any
# Constant regex to get timestrings
timestamp_regex = r'^\s*\[?\s*(\d{4}[-/.]\d{2}[-/.]\d{2}(?:[ T]\d{2}[:]\d{2}(?:[:]\d{2}(?:[.,]\d+)?)?(?:Z|[+-]\d{2}[:]\d{2})?)?)\s*\]?\s*'
TIMESTAMP_PATTERN = re.compile(timestamp_regex, re.MULTILINE)
INT_PATTERN = re.compile(r'(-?\d+)')
FLOAT_PATTERN = re.compile(r'(-?\d+\.\d+)')
SACREBLEU_METRIC = evaluate.load("evaluate-metric/sacrebleu")
# TODO: Add BibTeX citation
_CITATION = """\
@InProceedings{huggingface:module,
title = {A great new module},
authors={huggingface, Inc.},
year={2020}
}
"""
# TODO: Add description of the module here
_DESCRIPTION = """\
This new module is designed to solve this great ML task and is crafted with a lot of care.
"""
# TODO: Add description of the arguments of the module here
_KWARGS_DESCRIPTION = """
Calculates how good are predictions given some references, using certain scores
Args:
predictions: list of predictions to score. Each predictions
should be a string with tokens separated by spaces.
references: list of reference for each prediction. Each
reference should be a string with tokens separated by spaces.
Returns:
accuracy: description of the first score,
another_score: description of the second score,
Examples:
Examples should be written in doctest format, and should illustrate how
to use the function.
>>> my_new_module = evaluate.load("my_new_module")
>>> results = my_new_module.compute(references=[0, 1], predictions=[0, 1])
>>> print(results)
{'accuracy': 1.0}
"""
# TODO: Define external resources urls if needed
BAD_WORDS_URL = "http://url/to/external/resource/bad_words.txt"
@evaluate.utils.file_utils.add_start_docstrings(_DESCRIPTION, _KWARGS_DESCRIPTION)
class LogMetric(evaluate.Metric):
"""TODO: Short description of my evaluation module."""
def _info(self):
# TODO: Specifies the evaluate.EvaluationModuleInfo object
return evaluate.MetricInfo(
# This is the description that will appear on the modules page.
module_type="metric",
description=_DESCRIPTION,
citation=_CITATION,
inputs_description=_KWARGS_DESCRIPTION,
# This defines the format of each prediction and reference
# Both prediction and reference are strings
features=datasets.Features({
"predictions": datasets.Value("string", id="sequence"),
"references": datasets.Value("string", id="sequence"),
}),
# Homepage of the module for documentation
homepage="http://module.homepage",
# Additional links to the codebase or references
codebase_urls=["http://github.com/path/to/codebase/of/new_module"],
reference_urls=["http://path.to.reference.url/new_module"]
)
def _download_and_prepare(self, dl_manager):
"""Optional: download external resources useful to compute the scores"""
# TODO: Download external resources if needed
pass
def _compute(self, predictions, references):
# TODO: get separate log entries (split before timestamps), replace timestamps with token and compare the log entry with BLEU
metric_dicts = [PredRefScore(p,r).run() for p,r in zip(predictions,references)]
# Extract keys (assuming all dictionaries have the same keys)
keys = metric_dicts[0].keys()
# Convert list of dictionaries into a 2D numpy array
values = np.array([list(d.values()) for d in metric_dicts])
# Calculate the mean along the vertical axis (axis=0)
mean_values = np.mean(values, axis=0)
# a dictionary, matching the keys with their corresponding mean values
metric_result = dict(zip(keys, mean_values))
return metric_result
class PredRefScore:
scores : Dict[str, float]= {}
def __init__(self, prediction : str, reference: str) -> Dict[str, float]:
self.reference = reference.strip(' \t\n\r')
self.prediction = prediction.strip(' \t\n\r')
def run(self):
self.getLogMetric()
return self.scores
##### Convenience Methods #####
# TODO: also set pred_ts, ref_ts, pred_msgs and ref_msgs as fields
# A score depending on the difference in length of two sentences
def get_length_score(self, preds_split : List[Any], refs_split : List[Any]) -> float:
pred_content_lengths = np.vectorize(len)(preds_split)
ref_content_lengths = np.vectorize(len)(refs_split)
return self.smapeScore(pred_content_lengths, ref_content_lengths)
# helper function that computes the smape_score either between two numbers or two lists of numbers (must be the same length)
def smapeScore(self, P, R) -> float:
P_isnumber = isinstance(P, (int, float))
R_isnumber = isinstance(R, (int, float))
# either both must be numbers or both must be no number
assert P_isnumber == R_isnumber
if not P_isnumber:
assert(len(P) == len(R))
if P_isnumber and R_isnumber:
if P == 0 and R == 0:
return 1.0 # since this leads to (|R| + |P|) = 0
return 1 - (np.sum(np.abs(R - P) / (np.abs(R) + np.abs(P)))) # (n = 1)
else:
if len(P) == 0 and len(R) == 0:
return 1.0 # since this leads to n = 0
n = len(P)
P = np.array(P)
R = np.array(R)
denominator = np.abs(R) + np.abs(P)
# Replace zeros in the denominator with 1 to avoid division by zero.
# the denominator[i] = 0 is only possible if R[i] == P[i] == 0, hence we can set denominator[i] = 1 and still achieve the result of 0 after division at index i
denominator[denominator == 0] = 1
return 1 - (1.0/n * np.sum(np.abs(R - P) / denominator))
# Replaces numbers in a string with a placeholder
def replaceNumbers(self, text : str) -> str:
text = INT_PATTERN.sub(r'<|INT|>', text)
text = FLOAT_PATTERN.sub(r'<|FLOAT|>', text)
return text
# Split all log-entries in timestamps and log-messages
def split_log_entry(self, pred : str, ref: str):
pred_lines = pred.splitlines()
ref_lines = ref.splitlines()
# One logentry always consists of timestamp + log-message
pred_timestamps, pred_logMessages = [], []
ref_timestamps, ref_logMessages = [], []
for i in range(len(pred_lines)):
if TIMESTAMP_PATTERN.match(pred_lines[i]) is not None:
# try to match timestamp
_, pred_ts, pred_msg = TIMESTAMP_PATTERN.split(pred_lines[i])
pred_timestamps.append(pred_ts)
pred_logMessages.append(pred_msg)
else:
# 0. space heuristic
pred_msg = pred_lines[i]
pred_logMessages.append(pred_msg)
for i in range(len(ref_lines)):
if TIMESTAMP_PATTERN.match(ref_lines[i]) is None:
raise ValueError("The provided regex can't parse a timestamp in a reference log. Please make sure that the regex can parse a provided reference log format. Line: " + ref_lines[i])
_, ref_ts, ref_msg = TIMESTAMP_PATTERN.split(ref_lines[i])
ref_timestamps.append(ref_ts)
ref_logMessages.append(ref_msg)
# We extend the shorter list to the length of the longer one
max_logentries = max(len(pred_logMessages), len(ref_logMessages))
pred_logMessages += (max_logentries - len(pred_logMessages)) * [" "]
ref_logMessages += (max_logentries- len(ref_logMessages)) * [" "]
return pred_timestamps, pred_logMessages, ref_timestamps, ref_logMessages
##### Individual Setter Methods for Scores #####
# splits both strings at \n and then computes the smape_score of their lengths
def set_linecount_score(self, pred : str, ref : str) -> None:
pred_lines_amt = len(pred.splitlines())
ref_lines_amt = len(ref.splitlines())
self.scores["linecount_difference_SMAPE_score"] = self.smapeScore(pred_lines_amt, ref_lines_amt)
def set_sacrebleu_score(self, pred_log_messages : List[str], ref_log_messages : List[str]) -> None:
sacrebleu_score = SACREBLEU_METRIC.compute(predictions=pred_log_messages, references=ref_log_messages)["score"] / 100.0
self.scores["linecontent_sacrebleu_score"] = sacrebleu_score
def set_smape_length_score(self, pred_log_messages : List[str], ref_log_messages : List[str]) -> None:
smape_length_score = self.get_length_score(pred_log_messages, ref_log_messages)
self.scores["linecontentlength_difference_SMAPE_score"] = smape_length_score
def set_sacrebleu_withoutexplnumbers_score(self, pred_log_messages : List[str], ref_log_messages : List[str]):
vectorized_replaceNumbers = np.vectorize(self.replaceNumbers)
cleaned_pred_logMessages = vectorized_replaceNumbers(pred_log_messages)
cleaned_ref_logMessages = vectorized_replaceNumbers(ref_log_messages)
sacrebleu_withoutExplicitNumbers_score = SACREBLEU_METRIC.compute(predictions=cleaned_pred_logMessages, references=cleaned_ref_logMessages)["score"] / 100.0
self.scores["linecontent_sacrebleu_withoutExplicitNumbers_score"] = sacrebleu_withoutExplicitNumbers_score
# Get differenct scores regarding the content of a log-message
def all_linecontent_scores(self, pred_logMessages : List[str], ref_logMessages: List[str]) -> None:
if pred_logMessages == [] and ref_logMessages == []:
pred_logMessages = [""]
ref_logMessages = [""]
self.set_sacrebleu_score(pred_logMessages, ref_logMessages)
self.set_smape_length_score(pred_logMessages, ref_logMessages)
self.set_sacrebleu_withoutexplnumbers_score(pred_logMessages, ref_logMessages)
def set_timestamp_amt_score(self, pred_timestamps : List[str], ref_timestamps : List[str]):
timestamp_amt_score = self.smapeScore(len(pred_timestamps), len(ref_timestamps))
self.scores["timestamps_SMAPE_difference_score"] = timestamp_amt_score
def set_timestamp_format_consistency_score(self, pred_timestamps, ref_timestamps):
if (len(pred_timestamps) == 0):
self.scores["timestamps_formatConsistency_score"] = 1.0
return
pred_timestring_pattern = re.sub(r'\d', r'\\d', re.escape(pred_timestamps[0])).strip()
all_consistent = all(re.fullmatch(pred_timestring_pattern, ts.strip()) is not None for ts in ref_timestamps)
self.scores["timestamps_formatConsistency_score"] = 1.0 if all_consistent else 0.0
def set_timestamp_monotonicity_score(self, pred_timestamps) -> None:
try:
parsed_times = [dateutil.parser.parse(ts) for ts in pred_timestamps] # Parse all timestamps
except dateutil.parser.ParserError:
self.scores["timestamps_monotinicity_score"] = 0.0
return
# Check if the timestamps are monotonically increasing
all_monotone = all(t1 <= t2 for t1, t2 in zip(parsed_times, parsed_times[1:]))
self.scores["timestamps_monotinicity_score"] = 1.0 if all_monotone else 0.0
# get different scores regarding the timestamp
def all_timestamp_scores(self, pred_timestamps, ref_timestamps) -> None:
self.set_timestamp_amt_score(pred_timestamps, ref_timestamps)
self.set_timestamp_format_consistency_score(pred_timestamps, ref_timestamps)
self.set_timestamp_monotonicity_score(pred_timestamps)
# driver method for different score computations
def getLogMetric(self):
self.set_linecount_score(self.prediction, self.reference)
# Split log on timestamps
pred_timestamps, pred_logMessages, ref_timestamps, ref_logMessages = self.split_log_entry(self.prediction, self.reference)
self.all_linecontent_scores(pred_logMessages, ref_logMessages)
self.all_timestamp_scores(pred_timestamps, ref_timestamps)