OmniThink / src /DeepThink /modules /storm_dataclass.py
ZekunXi's picture
push
80a598c
import copy
import re
import sys
from collections import OrderedDict
from typing import Union, Optional, Any, List, Tuple, Dict
import numpy as np
from interface import articleSectionNode, article
from sentence_transformers import SentenceTransformer
from sklearn.metrics.pairwise import cosine_similarity
# sys.path.append('./demo/src')
# from utils import ArticleTextProcessing, FileIOHelper
import concurrent.futures
import json
import os
import pickle
import re
import sys
from typing import List, Dict
import httpx
import toml
from langchain_text_splitters import RecursiveCharacterTextSplitter
from trafilatura import extract
class ArticleTextProcessing:
@staticmethod
def limit_word_count_preserve_newline(input_string, max_word_count):
"""
Limit the word count of an input string to a specified maximum, while preserving the integrity of complete lines.
The function truncates the input string at the nearest word that does not exceed the maximum word count,
ensuring that no partial lines are included in the output. Words are defined as text separated by spaces,
and lines are defined as text separated by newline characters.
Args:
input_string (str): The string to be truncated. This string may contain multiple lines.
max_word_count (int): The maximum number of words allowed in the truncated string.
Returns:
str: The truncated string with word count limited to `max_word_count`, preserving complete lines.
"""
word_count = 0
limited_string = ''
for word in input_string.split('\n'):
line_words = word.split()
for lw in line_words:
if word_count < max_word_count:
limited_string += lw + ' '
word_count += 1
else:
break
if word_count >= max_word_count:
break
limited_string = limited_string.strip() + '\n'
return limited_string.strip()
@staticmethod
def remove_citations(s):
"""
Removes all citations from a given string. Citations are assumed to be in the format
of numbers enclosed in square brackets, such as [1], [2], or [1, 2], etc. This function searches
for all occurrences of such patterns and removes them, returning the cleaned string.
Args:
s (str): The string from which citations are to be removed.
Returns:
str: The string with all citation patterns removed.
"""
return re.sub(r'\[\d+(?:,\s*\d+)*\]', '', s)
@staticmethod
def get_first_section_dict_and_list(s):
"""
"""
text = s
sections = text.strip().split('\n# ')
titles = []
content_dict = {}
for section in sections:
if section:
lines = section.split('\n', 1)
title = lines[0].strip()
content = lines[1].strip() if len(lines) > 1 else ""
titles.append(title)
content_dict[title] = content
return content_dict, titles
@staticmethod
def parse_citation_indices(s):
"""
Extracts citation indexes from the provided content string and returns them as a list of integers.
Args:
content (str): The content string containing citations in the format [number].
Returns:
List[int]: A list of unique citation indexes extracted from the content, in the order they appear.
"""
matches = re.findall(r'\[\d+\]', s)
return [int(index[1:-1]) for index in matches]
@staticmethod
def remove_uncompleted_sentences_with_citations(text):
"""
Removes uncompleted sentences and standalone citations from the input text. Sentences are identified
by their ending punctuation (.!?), optionally followed by a citation in square brackets (e.g., "[1]").
Grouped citations (e.g., "[1, 2]") are split into individual ones (e.g., "[1] [2]"). Only text up to
and including the last complete sentence and its citation is retained.
Args:
text (str): The input text from which uncompleted sentences and their citations are to be removed.
Returns:
str: The processed string with uncompleted sentences and standalone citations removed, leaving only
complete sentences and their associated citations if present.
"""
# Convert citations like [1, 2, 3] to [1][2][3].
def replace_with_individual_brackets(match):
numbers = match.group(1).split(', ')
return ' '.join(f'[{n}]' for n in numbers)
# Deduplicate and sort individual groups of citations.
def deduplicate_group(match):
citations = match.group(0)
unique_citations = list(set(re.findall(r'\[\d+\]', citations)))
sorted_citations = sorted(unique_citations, key=lambda x: int(x.strip('[]')))
# Return the sorted unique citations as a string
return ''.join(sorted_citations)
text = re.sub(r'\[([0-9, ]+)\]', replace_with_individual_brackets, text)
text = re.sub(r'(\[\d+\])+', deduplicate_group, text)
# Deprecated: Remove sentence without proper ending punctuation and citations.
# Split the text into sentences (including citations).
# sentences_with_trailing = re.findall(r'([^.!?]*[.!?].*?)(?=[^.!?]*[.!?]|$)', text)
# Filter sentences to ensure they end with a punctuation mark and properly formatted citations
# complete_sentences = []
# for sentence in sentences_with_trailing:
# # Check if the sentence ends with properly formatted citations
# if re.search(r'[.!?]( \[\d+\])*$|^[^.!?]*[.!?]$', sentence.strip()):
# complete_sentences.append(sentence.strip())
# combined_sentences = ' '.join(complete_sentences)
# Check for and append any complete citations that follow the last sentence
# trailing_citations = re.findall(r'(\[\d+\]) ', text[text.rfind(combined_sentences) + len(combined_sentences):])
# if trailing_citations:
# combined_sentences += ' '.join(trailing_citations)
# Regex pattern to match sentence endings, including optional citation markers.
eos_pattern = r'([.!?])\s*(\[\d+\])?\s*'
matches = list(re.finditer(eos_pattern, text))
if matches:
last_match = matches[-1]
text = text[:last_match.end()].strip()
return text
@staticmethod
def clean_up_citation(conv):
for turn in conv.dlg_history:
turn.agent_utterance = turn.agent_utterance[:turn.agent_utterance.find('References:')]
turn.agent_utterance = turn.agent_utterance[:turn.agent_utterance.find('Sources:')]
turn.agent_utterance = turn.agent_utterance.replace('Answer:', '').strip()
try:
max_ref_num = max([int(x) for x in re.findall(r'\[(\d+)\]', turn.agent_utterance)])
except Exception as e:
max_ref_num = 0
if max_ref_num > len(turn.search_results):
for i in range(len(turn.search_results), max_ref_num + 1):
turn.agent_utterance = turn.agent_utterance.replace(f'[{i}]', '')
turn.agent_utterance = ArticleTextProcessing.remove_uncompleted_sentences_with_citations(
turn.agent_utterance)
return conv
@staticmethod
def clean_up_outline(outline, topic=""):
output_lines = []
current_level = 0 # To track the current section level
for line in outline.split('\n'):
stripped_line = line.strip()
if topic != "" and f"# {topic.lower()}" in stripped_line.lower():
output_lines = []
# Check if the line is a section header
if stripped_line.startswith('#') and stripped_line != '#':
current_level = stripped_line.count('#')
output_lines.append(stripped_line)
# Check if the line is a bullet point
# elif stripped_line.startswith('-'):
# subsection_header = '#' * (current_level + 1) + ' ' + stripped_line[1:].strip()
# output_lines.append(subsection_header)
# Preserve lines with @
elif stripped_line.startswith('@'):
output_lines.append(stripped_line)
outline = '\n'.join(output_lines)
# Remove references.
outline = re.sub(r"#[#]? See also.*?(?=##|$)", '', outline, flags=re.DOTALL)
outline = re.sub(r"#[#]? See Also.*?(?=##|$)", '', outline, flags=re.DOTALL)
outline = re.sub(r"#[#]? Notes.*?(?=##|$)", '', outline, flags=re.DOTALL)
outline = re.sub(r"#[#]? References.*?(?=##|$)", '', outline, flags=re.DOTALL)
outline = re.sub(r"#[#]? External links.*?(?=##|$)", '', outline, flags=re.DOTALL)
outline = re.sub(r"#[#]? External Links.*?(?=##|$)", '', outline, flags=re.DOTALL)
outline = re.sub(r"#[#]? Bibliography.*?(?=##|$)", '', outline, flags=re.DOTALL)
outline = re.sub(r"#[#]? Further reading*?(?=##|$)", '', outline, flags=re.DOTALL)
outline = re.sub(r"#[#]? Further Reading*?(?=##|$)", '', outline, flags=re.DOTALL)
outline = re.sub(r"#[#]? Summary.*?(?=##|$)", '', outline, flags=re.DOTALL)
outline = re.sub(r"#[#]? Appendices.*?(?=##|$)", '', outline, flags=re.DOTALL)
outline = re.sub(r"#[#]? Appendix.*?(?=##|$)", '', outline, flags=re.DOTALL)
return outline
@staticmethod
def clean_up_section(text):
"""Clean up a section:
1. Remove uncompleted sentences (usually due to output token limitation).
2. Deduplicate individual groups of citations.
3. Remove unnecessary summary."""
paragraphs = text.split('\n')
output_paragraphs = []
summary_sec_flag = False
for p in paragraphs:
p = p.strip()
if len(p) == 0:
continue
if not p.startswith('#'):
p = ArticleTextProcessing.remove_uncompleted_sentences_with_citations(p)
if summary_sec_flag:
if p.startswith('#'):
summary_sec_flag = False
else:
continue
if p.startswith('Overall') or p.startswith('In summary') or p.startswith('In conclusion'):
continue
if "# Summary" in p or '# Conclusion' in p:
summary_sec_flag = True
continue
output_paragraphs.append(p)
return '\n\n'.join(output_paragraphs) # Join with '\n\n' for markdown format.
@staticmethod
def update_citation_index(s, citation_map):
"""Update citation index in the string based on the citation map."""
for original_citation in citation_map:
s = s.replace(f"[{original_citation}]", f"__PLACEHOLDER_{original_citation}__")
for original_citation, unify_citation in citation_map.items():
s = s.replace(f"__PLACEHOLDER_{original_citation}__", f"[{unify_citation}]")
return s
@staticmethod
def parse_article_into_dict(input_string):
"""
Parses a structured text into a nested dictionary. The structure of the text
is defined by markdown-like headers (using '#' symbols) to denote sections
and subsections. Each section can contain content and further nested subsections.
The resulting dictionary captures the hierarchical structure of sections, where
each section is represented as a key (the section's title) mapping to a value
that is another dictionary. This dictionary contains two keys:
- 'content': content of the section
- 'subsections': a list of dictionaries, each representing a nested subsection
following the same structure.
Args:
input_string (str): A string containing the structured text to parse.
Returns:
A dictionary representing contains the section title as the key, and another dictionary
as the value, which includes the 'content' and 'subsections' keys as described above.
"""
lines = input_string.split('\n')
lines = [line for line in lines if line.strip()]
root = {'content': '', 'subsections': {}}
current_path = [(root, -1)] # (current_dict, level)
for line in lines:
if line.startswith('#'):
level = line.count('#')
title = line.strip('# ').strip()
new_section = {'content': '', 'subsections': {}}
# Pop from stack until find the parent level
while current_path and current_path[-1][1] >= level:
current_path.pop()
# Append new section to the nearest upper level's subsections
current_path[-1][0]['subsections'][title] = new_section
current_path.append((new_section, level))
else:
current_path[-1][0]['content'] += line + '\n'
return root['subsections']
class FileIOHelper:
@staticmethod
def dump_json(obj, file_name, encoding="utf-8"):
with open(file_name, 'w', encoding=encoding) as fw:
json.dump(obj, fw, default=FileIOHelper.handle_non_serializable, ensure_ascii=False)
@staticmethod
def handle_non_serializable(obj):
return "non-serializable contents" # mark the non-serializable part
@staticmethod
def load_json(file_name, encoding="utf-8"):
with open(file_name, 'r', encoding=encoding) as fr:
return json.load(fr)
@staticmethod
def write_str(s, path):
with open(path, 'w') as f:
f.write(s)
@staticmethod
def load_str(path):
with open(path, 'r') as f:
return '\n'.join(f.readlines())
@staticmethod
def dump_pickle(obj, path):
with open(path, 'wb') as f:
pickle.dump(obj, f)
@staticmethod
def load_pickle(path):
with open(path, 'rb') as f:
return pickle.load(f)
class Article(article):
def __init__(self, topic_name):
super().__init__(topic_name=topic_name)
self.reference = {
"url_to_unified_index": {},
"url_to_info": {}
}
def find_section(self, node: articleSectionNode, name: str) -> Optional[articleSectionNode]:
"""
Return the node of the section given the section name.
Args:
node: the node as the root to find.
name: the name of node as section name
Return:
reference of the node or None if section name has no match
"""
if node.section_name == name:
return node
for child in node.children:
result = self.find_section(child, name)
if result:
return result
return None
def update_section(self,
current_section_content: str,
current_section_info_list: List,
parent_section_name: Optional[str] = None) -> Optional[articleSectionNode]:
"""
Add new section to the article.
Args:
current_section_name: new section heading name in string format.
parent_section_name: under which parent section to add the new one. Default to root.
current_section_content: optional section content.
Returns:
the ArticleSectionNode for current section if successfully created / updated. Otherwise none.
"""
if current_section_info_list is not None:
references = set([int(x) for x in re.findall(r'\[(\d+)\]', current_section_content)])
# for any reference number greater than max number of references, delete the reference
if len(references) > 0:
max_ref_num = max(references)
if max_ref_num > len(current_section_info_list):
for i in range(len(current_section_info_list), max_ref_num + 1):
current_section_content = current_section_content.replace(f'[{i}]', '')
if i in references:
references.remove(i)
# for any reference that is not used, trim it from current_section_info_list
index_to_keep = [i - 1 for i in references]
citation_mapping = self._merge_new_info_to_references(current_section_info_list, index_to_keep)
current_section_content = ArticleTextProcessing.update_citation_index(current_section_content,
citation_mapping)
if parent_section_name is None:
parent_section_name = self.root.section_name
article_dict = ArticleTextProcessing.parse_article_into_dict(current_section_content)
self.insert_or_create_section(article_dict=article_dict, parent_section_name=parent_section_name,
trim_children=False)
def insert_or_create_section(self, article_dict: Dict[str, Dict], parent_section_name: str = None,
trim_children=False):
parent_node = self.root if parent_section_name is None else self.find_section(self.root, parent_section_name)
if trim_children:
section_names = set(article_dict.keys())
for child in parent_node.children[:]:
if child.section_name not in section_names:
parent_node.remove_child(child)
for section_name, content_dict in article_dict.items():
current_section_node = self.find_section(parent_node, section_name)
if current_section_node is None:
current_section_node = articleSectionNode(section_name=section_name,
content=content_dict["content"].strip())
insert_to_front = parent_node.section_name == (self.root.section_name and current_section_node.section_name == "基本介绍") or (self.root.section_name and current_section_node.section_name == "引言")
parent_node.add_child(current_section_node, insert_to_front=insert_to_front)
else:
current_section_node.content = content_dict["content"].strip()
self.insert_or_create_section(article_dict=content_dict["subsections"], parent_section_name=section_name,
trim_children=True)
def _merge_new_info_to_references(self, new_info_list: List, index_to_keep=None) -> Dict[
int, int]:
"""
Merges new storm information into existing references and updates the citation index mapping.
Args:
new_info_list (List[StormInformation]): A list of dictionaries representing new storm information.
index_to_keep (List[int]): A list of index of the new_info_list to keep. If none, keep all.
Returns:
Dict[int, int]: A dictionary mapping the index of each storm information piece in the input list
to its unified citation index in the references.
"""
citation_idx_mapping = {}
for idx, storm_info in enumerate(new_info_list):
if index_to_keep is not None and idx not in index_to_keep:
continue
url = storm_info['url']
if url not in self.reference["url_to_unified_index"]:
self.reference["url_to_unified_index"][url] = len(
self.reference["url_to_unified_index"]) + 1 # The citation index starts from 1.
self.reference["url_to_info"][url] = storm_info
else:
existing_snippets = self.reference["url_to_info"][url]['snippets']
existing_snippets.extend(storm_info['snippets'])
self.reference["url_to_info"][url]['snippets'] = list(set(existing_snippets))
citation_idx_mapping[idx + 1] = self.reference["url_to_unified_index"][
url] # The citation index starts from 1.
return citation_idx_mapping
def get_outline_as_list(self, root_section_name: Optional[str] = None, add_hashtags: bool = False,
include_root: bool = True) -> List[str]:
"""
Get outline of the article as a list.
Args:
section_name: get all section names in pre-order travel ordering in the subtree of section_name.
For example:
#root
##section1
###section1.1
###section1.2
##section2
article.get_outline_as_list("section1") returns [section1, section1.1, section1.2, section2]
Returns:
list of section and subsection names.
"""
if root_section_name is None or root_section_name == '':
section_node = self.root
else:
section_node = self.find_section(self.root, root_section_name)
include_root = include_root or section_node != self.root.section_name
if section_node is None:
return []
result = []
def preorder_traverse(node, level):
prefix = "#" * level if add_hashtags else "" # Adjust level if excluding root
result.append(f"{prefix} {node.section_name}".strip() if add_hashtags else node.section_name)
for child in node.children:
preorder_traverse(child, level + 1)
# Adjust the initial level based on whether root is included and hashtags are added
if include_root:
preorder_traverse(section_node, level=1)
else:
for child in section_node.children:
preorder_traverse(child, level=1)
return result
def to_string(self) -> str:
"""
Get outline of the article as a list.
Returns:
list of section and subsection names.
"""
result = []
def preorder_traverse(node, level):
prefix = "#" * level
result.append(f"{prefix} {node.section_name}".strip())
result.append(node.content)
for child in node.children:
preorder_traverse(child, level + 1)
# Adjust the initial level based on whether root is included and hashtags are added
for child in self.root.children:
preorder_traverse(child, level=1)
result = [i.strip() for i in result if i is not None and i.strip()]
return "\n\n".join(result)
def reorder_reference_index(self):
# pre-order traversal to get order of references appear in the article
ref_indices = []
def pre_order_find_index(node):
if node is not None:
if node.content is not None and node.content:
ref_indices.extend(ArticleTextProcessing.parse_citation_indices(node.content))
for child in node.children:
pre_order_find_index(child)
pre_order_find_index(self.root)
# constrcut index mapping
ref_index_mapping = {}
for ref_index in ref_indices:
if ref_index not in ref_index_mapping:
ref_index_mapping[ref_index] = len(ref_index_mapping) + 1
# update content
def pre_order_update_index(node):
if node is not None:
if node.content is not None and node.content:
node.content = ArticleTextProcessing.update_citation_index(node.content, ref_index_mapping)
for child in node.children:
pre_order_update_index(child)
pre_order_update_index(self.root)
# update reference
for url in list(self.reference["url_to_unified_index"]):
pre_index = self.reference["url_to_unified_index"][url]
if pre_index not in ref_index_mapping:
del self.reference["url_to_unified_index"][url]
else:
new_index = ref_index_mapping[pre_index]
self.reference["url_to_unified_index"][url] = new_index
def get_outline_tree(self):
def build_tree(node) -> Dict[str, Dict]:
tree = {}
for child in node.children:
tree[child.section_name] = build_tree(child)
return tree if tree else {}
return build_tree(self.root)
def get_outline(self) -> List[str]:
"""
Get the entire outline with all section names (all levels)
"""
def _get_all_section_names(section: Section) -> List[str]:
names = [section.section_name]
for child in section.children:
names.extend(_get_all_section_names(child))
return names
return _get_all_section_names(self.root)
def get_first_level_section_names(self) -> List[str]:
"""
Get first level section names
"""
return [i.section_name for i in self.root.children]
def get_leaf_nodes(self) -> List[articleSectionNode]:
"""
Get all leaf nodes containing the given keyword in their content.
Args:
keyword: The keyword to search for in leaf nodes.
Returns:
A list of leaf nodes that contain the keyword in their content.
"""
result = []
def traverse(node):
# A leaf node is defined as having no children
if not node.children or len(node.children) == 0:
if len(node.keywords)>0:
result.append(node)
else:
for child in node.children:
if len(node.keywords)>0:
result.append(node)
traverse(child)
traverse(self.root)
return result
@classmethod
def from_outline_file(cls, topic: str, file_path: str):
"""
Create StormArticle class instance from outline file.
"""
outline_str = FileIOHelper.load_str(file_path)
return StormArticle.from_outline_str(topic=topic, outline_str=outline_str)
@classmethod
def from_outline_str(cls, topic: str, outline_str: str):
"""
Create StormArticle class instance from outline only string.
"""
lines = []
try:
lines = outline_str.split("\n")
lines = [line.strip() for line in lines if line.strip()]
except:
pass
instance = cls(topic)
if lines:
a = lines[0].startswith("#") and lines[0].replace("#", "").strip().lower()
b = topic.lower().replace("_", " ")
adjust_level = lines[0].startswith("#") and lines[0].replace(
"#", ""
).strip().lower() == topic.lower().replace("_", " ")
if adjust_level:
lines = lines[1:]
node_stack = [(0, instance.root)] # Stack to keep track of (level, node)
for line in lines:
level = line.count("#") - adjust_level
section_name = line.replace("#", "").strip()
if section_name == topic:
continue
new_node = articleSectionNode(section_name)
while node_stack and level <= node_stack[-1][0]:
node_stack.pop()
node_stack[-1][1].add_child(new_node)
node_stack.append((level, new_node))
return instance
def dump_outline_to_file(self, file_path):
outline = self.get_outline_as_list(add_hashtags=True, include_root=False)
FileIOHelper.write_str("\n".join(outline), file_path)
def dump_reference_to_file(self, file_path):
reference = copy.deepcopy(self.reference)
for url in reference["url_to_info"]:
reference["url_to_info"][url] = reference["url_to_info"][url].to_dict()
FileIOHelper.dump_json(reference, file_path)
def dump_article_as_plain_text(self, file_path):
text = self.to_string()
FileIOHelper.write_str(text, file_path)
@classmethod
def from_string(cls, topic_name: str, article_text: str, references: dict):
article_dict = ArticleTextProcessing.parse_article_into_dict(article_text)
article = cls(topic_name=topic_name)
article.insert_or_create_section(article_dict=article_dict)
for url in list(references["url_to_info"]):
references["url_to_info"][url] = StormInformation.from_dict(references["url_to_info"][url])
article.reference = references
return article
def post_processing(self):
self.prune_empty_nodes()
self.reorder_reference_index()