OneKE / src /construct /convert.py
ShawnRu's picture
update
e6e7506
import json
import re
from neo4j import GraphDatabase
def sanitize_string(input_str, max_length=255):
"""
Process the input string to ensure it meets the database requirements.
"""
# step1: Replace invalid characters
input_str = re.sub(r'[^a-zA-Z0-9_]', '_', input_str)
# step2: Add prefix if it starts with a digit
if input_str[0].isdigit():
input_str = 'num' + input_str
# step3: Limit length
if len(input_str) > max_length:
input_str = input_str[:max_length]
return input_str
def generate_cypher_statements(data):
"""
Generates Cypher query statements based on the provided JSON data.
"""
cypher_statements = []
parsed_data = json.loads(data)
def create_statement(triple):
head = triple.get("head")
head_type = triple.get("head_type")
relation = triple.get("relation")
relation_type = triple.get("relation_type")
tail = triple.get("tail")
tail_type = triple.get("tail_type")
# head_safe = sanitize_string(head) if head else None
head_type_safe = sanitize_string(head_type) if head_type else None
# relation_safe = sanitize_string(relation) if relation else None
relation_type_safe = sanitize_string(relation_type) if relation_type else None
# tail_safe = sanitize_string(tail) if tail else None
tail_type_safe = sanitize_string(tail_type) if tail_type else None
statement = ""
if head:
if head_type_safe:
statement += f'MERGE (a:{head_type_safe} {{name: "{head}"}}) '
else:
statement += f'MERGE (a:UNTYPED {{name: "{head}"}}) '
if tail:
if tail_type_safe:
statement += f'MERGE (b:{tail_type_safe} {{name: "{tail}"}}) '
else:
statement += f'MERGE (b:UNTYPED {{name: "{tail}"}}) '
if relation:
if head and tail: # Only create relation if head and tail exist.
if relation_type_safe:
statement += f'MERGE (a)-[:{relation_type_safe} {{name: "{relation}"}}]->(b);'
else:
statement += f'MERGE (a)-[:UNTYPED {{name: "{relation}"}}]->(b);'
else:
statement += ';' if statement != "" else ''
else:
if relation_type_safe: # if relation is not provided, create relation by `relation_type`.
statement += f'MERGE (a)-[:{relation_type_safe} {{name: "{relation_type_safe}"}}]->(b);'
else:
statement += ';' if statement != "" else ''
return statement
if "triple_list" in parsed_data:
for triple in parsed_data["triple_list"]:
cypher_statements.append(create_statement(triple))
else:
cypher_statements.append(create_statement(parsed_data))
return cypher_statements
def execute_cypher_statements(uri, user, password, cypher_statements):
"""
Executes the generated Cypher query statements.
"""
driver = GraphDatabase.driver(uri, auth=(user, password))
with driver.session() as session:
for statement in cypher_statements:
session.run(statement)
print(f"Executed: {statement}")
# Write excuted cypher statements to a text file if you want.
# with open("executed_statements.txt", 'a') as f:
# for statement in cypher_statements:
# f.write(statement + '\n')
# f.write('\n')
driver.close()
# Here is a test of your database connection:
if __name__ == "__main__":
# test_data 1: Contains a list of triples
test_data = '''
{
"triple_list": [
{
"head": "J.K. Rowling",
"head_type": "Person",
"relation": "wrote",
"relation_type": "Actions",
"tail": "Fantastic Beasts and Where to Find Them",
"tail_type": "Book"
},
{
"head": "Fantastic Beasts and Where to Find Them",
"head_type": "Book",
"relation": "extra section of",
"relation_type": "Affiliation",
"tail": "Harry Potter Series",
"tail_type": "Book"
},
{
"head": "J.K. Rowling",
"head_type": "Person",
"relation": "wrote",
"relation_type": "Actions",
"tail": "Harry Potter Series",
"tail_type": "Book"
},
{
"head": "Harry Potter Series",
"head_type": "Book",
"relation": "create",
"relation_type": "Actions",
"tail": "Dumbledore",
"tail_type": "Person"
},
{
"head": "Fantastic Beasts and Where to Find Them",
"head_type": "Book",
"relation": "mention",
"relation_type": "Actions",
"tail": "Dumbledore",
"tail_type": "Person"
},
{
"head": "Voldemort",
"head_type": "Person",
"relation": "afrid",
"relation_type": "Emotion",
"tail": "Dumbledore",
"tail_type": "Person"
},
{
"head": "Voldemort",
"head_type": "Person",
"relation": "robs",
"relation_type": "Actions",
"tail": "the Elder Wand",
"tail_type": "Weapon"
},
{
"head": "the Elder Wand",
"head_type": "Weapon",
"relation": "belong to",
"relation_type": "Affiliation",
"tail": "Dumbledore",
"tail_type": "Person"
}
]
}
'''
# test_data 2: Contains a single triple
# test_data = '''
# {
# "head": "Christopher Nolan",
# "head_type": "Person",
# "relation": "directed",
# "relation_type": "Action",
# "tail": "Inception",
# "tail_type": "Movie"
# }
# '''
# Generate Cypher query statements
cypher_statements = generate_cypher_statements(test_data)
# Print the generated Cypher query statements
for statement in cypher_statements:
print(statement)
print("\n")
# Execute the generated Cypher query statements
execute_cypher_statements(
uri="neo4j://localhost:7687", # your URI
user="your_username", # your username
password="your_password", # your password
cypher_statements=cypher_statements,
)