|
import copy |
|
import re |
|
import sys |
|
from collections import OrderedDict |
|
from typing import Union, Optional, Any, List, Tuple, Dict |
|
|
|
import numpy as np |
|
|
|
from interface import articleSectionNode, article |
|
from sentence_transformers import SentenceTransformer |
|
from sklearn.metrics.pairwise import cosine_similarity |
|
|
|
|
|
|
|
|
|
import concurrent.futures |
|
import json |
|
import os |
|
import pickle |
|
import re |
|
import sys |
|
from typing import List, Dict |
|
|
|
import httpx |
|
import toml |
|
from langchain_text_splitters import RecursiveCharacterTextSplitter |
|
from trafilatura import extract |
|
|
|
|
|
class ArticleTextProcessing: |
|
@staticmethod |
|
def limit_word_count_preserve_newline(input_string, max_word_count): |
|
""" |
|
Limit the word count of an input string to a specified maximum, while preserving the integrity of complete lines. |
|
|
|
The function truncates the input string at the nearest word that does not exceed the maximum word count, |
|
ensuring that no partial lines are included in the output. Words are defined as text separated by spaces, |
|
and lines are defined as text separated by newline characters. |
|
|
|
Args: |
|
input_string (str): The string to be truncated. This string may contain multiple lines. |
|
max_word_count (int): The maximum number of words allowed in the truncated string. |
|
|
|
Returns: |
|
str: The truncated string with word count limited to `max_word_count`, preserving complete lines. |
|
""" |
|
|
|
word_count = 0 |
|
limited_string = '' |
|
|
|
for word in input_string.split('\n'): |
|
line_words = word.split() |
|
for lw in line_words: |
|
if word_count < max_word_count: |
|
limited_string += lw + ' ' |
|
word_count += 1 |
|
else: |
|
break |
|
if word_count >= max_word_count: |
|
break |
|
limited_string = limited_string.strip() + '\n' |
|
|
|
return limited_string.strip() |
|
|
|
@staticmethod |
|
def remove_citations(s): |
|
""" |
|
Removes all citations from a given string. Citations are assumed to be in the format |
|
of numbers enclosed in square brackets, such as [1], [2], or [1, 2], etc. This function searches |
|
for all occurrences of such patterns and removes them, returning the cleaned string. |
|
|
|
Args: |
|
s (str): The string from which citations are to be removed. |
|
|
|
Returns: |
|
str: The string with all citation patterns removed. |
|
""" |
|
|
|
return re.sub(r'\[\d+(?:,\s*\d+)*\]', '', s) |
|
|
|
@staticmethod |
|
def get_first_section_dict_and_list(s): |
|
""" |
|
""" |
|
text = s |
|
sections = text.strip().split('\n# ') |
|
titles = [] |
|
content_dict = {} |
|
|
|
for section in sections: |
|
if section: |
|
lines = section.split('\n', 1) |
|
title = lines[0].strip() |
|
content = lines[1].strip() if len(lines) > 1 else "" |
|
|
|
titles.append(title) |
|
content_dict[title] = content |
|
return content_dict, titles |
|
|
|
@staticmethod |
|
def parse_citation_indices(s): |
|
""" |
|
Extracts citation indexes from the provided content string and returns them as a list of integers. |
|
|
|
Args: |
|
content (str): The content string containing citations in the format [number]. |
|
|
|
Returns: |
|
List[int]: A list of unique citation indexes extracted from the content, in the order they appear. |
|
""" |
|
matches = re.findall(r'\[\d+\]', s) |
|
return [int(index[1:-1]) for index in matches] |
|
|
|
@staticmethod |
|
def remove_uncompleted_sentences_with_citations(text): |
|
""" |
|
Removes uncompleted sentences and standalone citations from the input text. Sentences are identified |
|
by their ending punctuation (.!?), optionally followed by a citation in square brackets (e.g., "[1]"). |
|
Grouped citations (e.g., "[1, 2]") are split into individual ones (e.g., "[1] [2]"). Only text up to |
|
and including the last complete sentence and its citation is retained. |
|
|
|
Args: |
|
text (str): The input text from which uncompleted sentences and their citations are to be removed. |
|
|
|
Returns: |
|
str: The processed string with uncompleted sentences and standalone citations removed, leaving only |
|
complete sentences and their associated citations if present. |
|
""" |
|
|
|
|
|
def replace_with_individual_brackets(match): |
|
numbers = match.group(1).split(', ') |
|
return ' '.join(f'[{n}]' for n in numbers) |
|
|
|
|
|
def deduplicate_group(match): |
|
citations = match.group(0) |
|
unique_citations = list(set(re.findall(r'\[\d+\]', citations))) |
|
sorted_citations = sorted(unique_citations, key=lambda x: int(x.strip('[]'))) |
|
|
|
return ''.join(sorted_citations) |
|
|
|
text = re.sub(r'\[([0-9, ]+)\]', replace_with_individual_brackets, text) |
|
text = re.sub(r'(\[\d+\])+', deduplicate_group, text) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
eos_pattern = r'([.!?])\s*(\[\d+\])?\s*' |
|
matches = list(re.finditer(eos_pattern, text)) |
|
if matches: |
|
last_match = matches[-1] |
|
text = text[:last_match.end()].strip() |
|
|
|
return text |
|
|
|
@staticmethod |
|
def clean_up_citation(conv): |
|
for turn in conv.dlg_history: |
|
turn.agent_utterance = turn.agent_utterance[:turn.agent_utterance.find('References:')] |
|
turn.agent_utterance = turn.agent_utterance[:turn.agent_utterance.find('Sources:')] |
|
turn.agent_utterance = turn.agent_utterance.replace('Answer:', '').strip() |
|
try: |
|
max_ref_num = max([int(x) for x in re.findall(r'\[(\d+)\]', turn.agent_utterance)]) |
|
except Exception as e: |
|
max_ref_num = 0 |
|
if max_ref_num > len(turn.search_results): |
|
for i in range(len(turn.search_results), max_ref_num + 1): |
|
turn.agent_utterance = turn.agent_utterance.replace(f'[{i}]', '') |
|
turn.agent_utterance = ArticleTextProcessing.remove_uncompleted_sentences_with_citations( |
|
turn.agent_utterance) |
|
|
|
return conv |
|
|
|
@staticmethod |
|
def clean_up_outline(outline, topic=""): |
|
output_lines = [] |
|
current_level = 0 |
|
|
|
for line in outline.split('\n'): |
|
stripped_line = line.strip() |
|
|
|
if topic != "" and f"# {topic.lower()}" in stripped_line.lower(): |
|
output_lines = [] |
|
|
|
|
|
if stripped_line.startswith('#') and stripped_line != '#': |
|
current_level = stripped_line.count('#') |
|
output_lines.append(stripped_line) |
|
|
|
|
|
|
|
|
|
|
|
elif stripped_line.startswith('@'): |
|
output_lines.append(stripped_line) |
|
|
|
outline = '\n'.join(output_lines) |
|
|
|
|
|
outline = re.sub(r"#[#]? See also.*?(?=##|$)", '', outline, flags=re.DOTALL) |
|
outline = re.sub(r"#[#]? See Also.*?(?=##|$)", '', outline, flags=re.DOTALL) |
|
outline = re.sub(r"#[#]? Notes.*?(?=##|$)", '', outline, flags=re.DOTALL) |
|
outline = re.sub(r"#[#]? References.*?(?=##|$)", '', outline, flags=re.DOTALL) |
|
outline = re.sub(r"#[#]? External links.*?(?=##|$)", '', outline, flags=re.DOTALL) |
|
outline = re.sub(r"#[#]? External Links.*?(?=##|$)", '', outline, flags=re.DOTALL) |
|
outline = re.sub(r"#[#]? Bibliography.*?(?=##|$)", '', outline, flags=re.DOTALL) |
|
outline = re.sub(r"#[#]? Further reading*?(?=##|$)", '', outline, flags=re.DOTALL) |
|
outline = re.sub(r"#[#]? Further Reading*?(?=##|$)", '', outline, flags=re.DOTALL) |
|
outline = re.sub(r"#[#]? Summary.*?(?=##|$)", '', outline, flags=re.DOTALL) |
|
outline = re.sub(r"#[#]? Appendices.*?(?=##|$)", '', outline, flags=re.DOTALL) |
|
outline = re.sub(r"#[#]? Appendix.*?(?=##|$)", '', outline, flags=re.DOTALL) |
|
|
|
return outline |
|
|
|
|
|
@staticmethod |
|
def clean_up_section(text): |
|
"""Clean up a section: |
|
1. Remove uncompleted sentences (usually due to output token limitation). |
|
2. Deduplicate individual groups of citations. |
|
3. Remove unnecessary summary.""" |
|
|
|
paragraphs = text.split('\n') |
|
output_paragraphs = [] |
|
summary_sec_flag = False |
|
for p in paragraphs: |
|
p = p.strip() |
|
if len(p) == 0: |
|
continue |
|
if not p.startswith('#'): |
|
p = ArticleTextProcessing.remove_uncompleted_sentences_with_citations(p) |
|
if summary_sec_flag: |
|
if p.startswith('#'): |
|
summary_sec_flag = False |
|
else: |
|
continue |
|
if p.startswith('Overall') or p.startswith('In summary') or p.startswith('In conclusion'): |
|
continue |
|
if "# Summary" in p or '# Conclusion' in p: |
|
summary_sec_flag = True |
|
continue |
|
output_paragraphs.append(p) |
|
|
|
return '\n\n'.join(output_paragraphs) |
|
|
|
@staticmethod |
|
def update_citation_index(s, citation_map): |
|
"""Update citation index in the string based on the citation map.""" |
|
for original_citation in citation_map: |
|
s = s.replace(f"[{original_citation}]", f"__PLACEHOLDER_{original_citation}__") |
|
for original_citation, unify_citation in citation_map.items(): |
|
s = s.replace(f"__PLACEHOLDER_{original_citation}__", f"[{unify_citation}]") |
|
|
|
return s |
|
|
|
@staticmethod |
|
def parse_article_into_dict(input_string): |
|
""" |
|
Parses a structured text into a nested dictionary. The structure of the text |
|
is defined by markdown-like headers (using '#' symbols) to denote sections |
|
and subsections. Each section can contain content and further nested subsections. |
|
|
|
The resulting dictionary captures the hierarchical structure of sections, where |
|
each section is represented as a key (the section's title) mapping to a value |
|
that is another dictionary. This dictionary contains two keys: |
|
- 'content': content of the section |
|
- 'subsections': a list of dictionaries, each representing a nested subsection |
|
following the same structure. |
|
|
|
Args: |
|
input_string (str): A string containing the structured text to parse. |
|
|
|
Returns: |
|
A dictionary representing contains the section title as the key, and another dictionary |
|
as the value, which includes the 'content' and 'subsections' keys as described above. |
|
""" |
|
lines = input_string.split('\n') |
|
lines = [line for line in lines if line.strip()] |
|
root = {'content': '', 'subsections': {}} |
|
current_path = [(root, -1)] |
|
|
|
for line in lines: |
|
if line.startswith('#'): |
|
level = line.count('#') |
|
title = line.strip('# ').strip() |
|
new_section = {'content': '', 'subsections': {}} |
|
|
|
|
|
while current_path and current_path[-1][1] >= level: |
|
current_path.pop() |
|
|
|
|
|
current_path[-1][0]['subsections'][title] = new_section |
|
current_path.append((new_section, level)) |
|
else: |
|
current_path[-1][0]['content'] += line + '\n' |
|
|
|
return root['subsections'] |
|
|
|
|
|
class FileIOHelper: |
|
@staticmethod |
|
def dump_json(obj, file_name, encoding="utf-8"): |
|
with open(file_name, 'w', encoding=encoding) as fw: |
|
json.dump(obj, fw, default=FileIOHelper.handle_non_serializable, ensure_ascii=False) |
|
|
|
@staticmethod |
|
def handle_non_serializable(obj): |
|
return "non-serializable contents" |
|
|
|
@staticmethod |
|
def load_json(file_name, encoding="utf-8"): |
|
with open(file_name, 'r', encoding=encoding) as fr: |
|
return json.load(fr) |
|
|
|
@staticmethod |
|
def write_str(s, path): |
|
with open(path, 'w') as f: |
|
f.write(s) |
|
|
|
@staticmethod |
|
def load_str(path): |
|
with open(path, 'r') as f: |
|
return '\n'.join(f.readlines()) |
|
|
|
@staticmethod |
|
def dump_pickle(obj, path): |
|
with open(path, 'wb') as f: |
|
pickle.dump(obj, f) |
|
|
|
@staticmethod |
|
def load_pickle(path): |
|
with open(path, 'rb') as f: |
|
return pickle.load(f) |
|
|
|
class Article(article): |
|
def __init__(self, topic_name): |
|
super().__init__(topic_name=topic_name) |
|
self.reference = { |
|
"url_to_unified_index": {}, |
|
"url_to_info": {} |
|
} |
|
|
|
def find_section(self, node: articleSectionNode, name: str) -> Optional[articleSectionNode]: |
|
""" |
|
Return the node of the section given the section name. |
|
|
|
Args: |
|
node: the node as the root to find. |
|
name: the name of node as section name |
|
|
|
Return: |
|
reference of the node or None if section name has no match |
|
""" |
|
if node.section_name == name: |
|
return node |
|
for child in node.children: |
|
result = self.find_section(child, name) |
|
if result: |
|
return result |
|
return None |
|
|
|
def update_section(self, |
|
current_section_content: str, |
|
current_section_info_list: List, |
|
parent_section_name: Optional[str] = None) -> Optional[articleSectionNode]: |
|
""" |
|
Add new section to the article. |
|
|
|
Args: |
|
current_section_name: new section heading name in string format. |
|
parent_section_name: under which parent section to add the new one. Default to root. |
|
current_section_content: optional section content. |
|
|
|
Returns: |
|
the ArticleSectionNode for current section if successfully created / updated. Otherwise none. |
|
""" |
|
|
|
if current_section_info_list is not None: |
|
references = set([int(x) for x in re.findall(r'\[(\d+)\]', current_section_content)]) |
|
|
|
if len(references) > 0: |
|
max_ref_num = max(references) |
|
if max_ref_num > len(current_section_info_list): |
|
for i in range(len(current_section_info_list), max_ref_num + 1): |
|
current_section_content = current_section_content.replace(f'[{i}]', '') |
|
if i in references: |
|
references.remove(i) |
|
|
|
index_to_keep = [i - 1 for i in references] |
|
citation_mapping = self._merge_new_info_to_references(current_section_info_list, index_to_keep) |
|
current_section_content = ArticleTextProcessing.update_citation_index(current_section_content, |
|
citation_mapping) |
|
|
|
if parent_section_name is None: |
|
parent_section_name = self.root.section_name |
|
article_dict = ArticleTextProcessing.parse_article_into_dict(current_section_content) |
|
self.insert_or_create_section(article_dict=article_dict, parent_section_name=parent_section_name, |
|
trim_children=False) |
|
|
|
def insert_or_create_section(self, article_dict: Dict[str, Dict], parent_section_name: str = None, |
|
trim_children=False): |
|
parent_node = self.root if parent_section_name is None else self.find_section(self.root, parent_section_name) |
|
|
|
if trim_children: |
|
section_names = set(article_dict.keys()) |
|
for child in parent_node.children[:]: |
|
if child.section_name not in section_names: |
|
parent_node.remove_child(child) |
|
|
|
for section_name, content_dict in article_dict.items(): |
|
current_section_node = self.find_section(parent_node, section_name) |
|
if current_section_node is None: |
|
current_section_node = articleSectionNode(section_name=section_name, |
|
content=content_dict["content"].strip()) |
|
insert_to_front = parent_node.section_name == (self.root.section_name and current_section_node.section_name == "基本介绍") or (self.root.section_name and current_section_node.section_name == "引言") |
|
parent_node.add_child(current_section_node, insert_to_front=insert_to_front) |
|
else: |
|
current_section_node.content = content_dict["content"].strip() |
|
|
|
self.insert_or_create_section(article_dict=content_dict["subsections"], parent_section_name=section_name, |
|
trim_children=True) |
|
|
|
def _merge_new_info_to_references(self, new_info_list: List, index_to_keep=None) -> Dict[ |
|
int, int]: |
|
""" |
|
Merges new storm information into existing references and updates the citation index mapping. |
|
|
|
Args: |
|
new_info_list (List[StormInformation]): A list of dictionaries representing new storm information. |
|
index_to_keep (List[int]): A list of index of the new_info_list to keep. If none, keep all. |
|
|
|
Returns: |
|
Dict[int, int]: A dictionary mapping the index of each storm information piece in the input list |
|
to its unified citation index in the references. |
|
""" |
|
citation_idx_mapping = {} |
|
for idx, storm_info in enumerate(new_info_list): |
|
if index_to_keep is not None and idx not in index_to_keep: |
|
continue |
|
url = storm_info['url'] |
|
if url not in self.reference["url_to_unified_index"]: |
|
self.reference["url_to_unified_index"][url] = len( |
|
self.reference["url_to_unified_index"]) + 1 |
|
self.reference["url_to_info"][url] = storm_info |
|
else: |
|
existing_snippets = self.reference["url_to_info"][url]['snippets'] |
|
existing_snippets.extend(storm_info['snippets']) |
|
self.reference["url_to_info"][url]['snippets'] = list(set(existing_snippets)) |
|
citation_idx_mapping[idx + 1] = self.reference["url_to_unified_index"][ |
|
url] |
|
return citation_idx_mapping |
|
|
|
def get_outline_as_list(self, root_section_name: Optional[str] = None, add_hashtags: bool = False, |
|
include_root: bool = True) -> List[str]: |
|
""" |
|
Get outline of the article as a list. |
|
|
|
Args: |
|
section_name: get all section names in pre-order travel ordering in the subtree of section_name. |
|
For example: |
|
#root |
|
##section1 |
|
###section1.1 |
|
###section1.2 |
|
##section2 |
|
article.get_outline_as_list("section1") returns [section1, section1.1, section1.2, section2] |
|
|
|
Returns: |
|
list of section and subsection names. |
|
""" |
|
if root_section_name is None or root_section_name == '': |
|
section_node = self.root |
|
else: |
|
section_node = self.find_section(self.root, root_section_name) |
|
include_root = include_root or section_node != self.root.section_name |
|
if section_node is None: |
|
return [] |
|
result = [] |
|
|
|
def preorder_traverse(node, level): |
|
prefix = "#" * level if add_hashtags else "" |
|
result.append(f"{prefix} {node.section_name}".strip() if add_hashtags else node.section_name) |
|
for child in node.children: |
|
preorder_traverse(child, level + 1) |
|
|
|
|
|
if include_root: |
|
preorder_traverse(section_node, level=1) |
|
else: |
|
for child in section_node.children: |
|
preorder_traverse(child, level=1) |
|
return result |
|
|
|
def to_string(self) -> str: |
|
""" |
|
Get outline of the article as a list. |
|
|
|
Returns: |
|
list of section and subsection names. |
|
""" |
|
result = [] |
|
|
|
def preorder_traverse(node, level): |
|
prefix = "#" * level |
|
result.append(f"{prefix} {node.section_name}".strip()) |
|
result.append(node.content) |
|
for child in node.children: |
|
preorder_traverse(child, level + 1) |
|
|
|
|
|
for child in self.root.children: |
|
preorder_traverse(child, level=1) |
|
result = [i.strip() for i in result if i is not None and i.strip()] |
|
return "\n\n".join(result) |
|
|
|
def reorder_reference_index(self): |
|
|
|
ref_indices = [] |
|
|
|
def pre_order_find_index(node): |
|
if node is not None: |
|
if node.content is not None and node.content: |
|
ref_indices.extend(ArticleTextProcessing.parse_citation_indices(node.content)) |
|
for child in node.children: |
|
pre_order_find_index(child) |
|
|
|
pre_order_find_index(self.root) |
|
|
|
ref_index_mapping = {} |
|
for ref_index in ref_indices: |
|
if ref_index not in ref_index_mapping: |
|
ref_index_mapping[ref_index] = len(ref_index_mapping) + 1 |
|
|
|
|
|
def pre_order_update_index(node): |
|
if node is not None: |
|
if node.content is not None and node.content: |
|
node.content = ArticleTextProcessing.update_citation_index(node.content, ref_index_mapping) |
|
for child in node.children: |
|
pre_order_update_index(child) |
|
|
|
pre_order_update_index(self.root) |
|
|
|
for url in list(self.reference["url_to_unified_index"]): |
|
pre_index = self.reference["url_to_unified_index"][url] |
|
if pre_index not in ref_index_mapping: |
|
del self.reference["url_to_unified_index"][url] |
|
else: |
|
new_index = ref_index_mapping[pre_index] |
|
self.reference["url_to_unified_index"][url] = new_index |
|
|
|
def get_outline_tree(self): |
|
def build_tree(node) -> Dict[str, Dict]: |
|
tree = {} |
|
for child in node.children: |
|
tree[child.section_name] = build_tree(child) |
|
return tree if tree else {} |
|
|
|
return build_tree(self.root) |
|
|
|
def get_outline(self) -> List[str]: |
|
""" |
|
Get the entire outline with all section names (all levels) |
|
""" |
|
def _get_all_section_names(section: Section) -> List[str]: |
|
names = [section.section_name] |
|
for child in section.children: |
|
names.extend(_get_all_section_names(child)) |
|
return names |
|
|
|
return _get_all_section_names(self.root) |
|
|
|
def get_first_level_section_names(self) -> List[str]: |
|
""" |
|
Get first level section names |
|
""" |
|
return [i.section_name for i in self.root.children] |
|
|
|
def get_leaf_nodes(self) -> List[articleSectionNode]: |
|
""" |
|
Get all leaf nodes containing the given keyword in their content. |
|
|
|
Args: |
|
keyword: The keyword to search for in leaf nodes. |
|
|
|
Returns: |
|
A list of leaf nodes that contain the keyword in their content. |
|
""" |
|
result = [] |
|
|
|
def traverse(node): |
|
|
|
if not node.children or len(node.children) == 0: |
|
if len(node.keywords)>0: |
|
result.append(node) |
|
else: |
|
for child in node.children: |
|
if len(node.keywords)>0: |
|
result.append(node) |
|
traverse(child) |
|
|
|
traverse(self.root) |
|
|
|
|
|
return result |
|
|
|
@classmethod |
|
def from_outline_file(cls, topic: str, file_path: str): |
|
""" |
|
Create StormArticle class instance from outline file. |
|
""" |
|
outline_str = FileIOHelper.load_str(file_path) |
|
return StormArticle.from_outline_str(topic=topic, outline_str=outline_str) |
|
|
|
@classmethod |
|
def from_outline_str(cls, topic: str, outline_str: str): |
|
""" |
|
Create StormArticle class instance from outline only string. |
|
""" |
|
lines = [] |
|
try: |
|
lines = outline_str.split("\n") |
|
lines = [line.strip() for line in lines if line.strip()] |
|
except: |
|
pass |
|
|
|
|
|
instance = cls(topic) |
|
if lines: |
|
a = lines[0].startswith("#") and lines[0].replace("#", "").strip().lower() |
|
b = topic.lower().replace("_", " ") |
|
adjust_level = lines[0].startswith("#") and lines[0].replace( |
|
"#", "" |
|
).strip().lower() == topic.lower().replace("_", " ") |
|
if adjust_level: |
|
lines = lines[1:] |
|
node_stack = [(0, instance.root)] |
|
|
|
for line in lines: |
|
level = line.count("#") - adjust_level |
|
section_name = line.replace("#", "").strip() |
|
|
|
if section_name == topic: |
|
continue |
|
|
|
new_node = articleSectionNode(section_name) |
|
|
|
while node_stack and level <= node_stack[-1][0]: |
|
node_stack.pop() |
|
|
|
node_stack[-1][1].add_child(new_node) |
|
node_stack.append((level, new_node)) |
|
return instance |
|
|
|
|
|
def dump_outline_to_file(self, file_path): |
|
outline = self.get_outline_as_list(add_hashtags=True, include_root=False) |
|
FileIOHelper.write_str("\n".join(outline), file_path) |
|
|
|
def dump_reference_to_file(self, file_path): |
|
reference = copy.deepcopy(self.reference) |
|
for url in reference["url_to_info"]: |
|
reference["url_to_info"][url] = reference["url_to_info"][url].to_dict() |
|
FileIOHelper.dump_json(reference, file_path) |
|
|
|
def dump_article_as_plain_text(self, file_path): |
|
text = self.to_string() |
|
FileIOHelper.write_str(text, file_path) |
|
|
|
@classmethod |
|
def from_string(cls, topic_name: str, article_text: str, references: dict): |
|
article_dict = ArticleTextProcessing.parse_article_into_dict(article_text) |
|
article = cls(topic_name=topic_name) |
|
article.insert_or_create_section(article_dict=article_dict) |
|
for url in list(references["url_to_info"]): |
|
references["url_to_info"][url] = StormInformation.from_dict(references["url_to_info"][url]) |
|
article.reference = references |
|
return article |
|
|
|
def post_processing(self): |
|
self.prune_empty_nodes() |
|
self.reorder_reference_index() |
|
|