File size: 6,769 Bytes
e6e7506 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 |
import json
import re
from neo4j import GraphDatabase
def sanitize_string(input_str, max_length=255):
"""
Process the input string to ensure it meets the database requirements.
"""
# step1: Replace invalid characters
input_str = re.sub(r'[^a-zA-Z0-9_]', '_', input_str)
# step2: Add prefix if it starts with a digit
if input_str[0].isdigit():
input_str = 'num' + input_str
# step3: Limit length
if len(input_str) > max_length:
input_str = input_str[:max_length]
return input_str
def generate_cypher_statements(data):
"""
Generates Cypher query statements based on the provided JSON data.
"""
cypher_statements = []
parsed_data = json.loads(data)
def create_statement(triple):
head = triple.get("head")
head_type = triple.get("head_type")
relation = triple.get("relation")
relation_type = triple.get("relation_type")
tail = triple.get("tail")
tail_type = triple.get("tail_type")
# head_safe = sanitize_string(head) if head else None
head_type_safe = sanitize_string(head_type) if head_type else None
# relation_safe = sanitize_string(relation) if relation else None
relation_type_safe = sanitize_string(relation_type) if relation_type else None
# tail_safe = sanitize_string(tail) if tail else None
tail_type_safe = sanitize_string(tail_type) if tail_type else None
statement = ""
if head:
if head_type_safe:
statement += f'MERGE (a:{head_type_safe} {{name: "{head}"}}) '
else:
statement += f'MERGE (a:UNTYPED {{name: "{head}"}}) '
if tail:
if tail_type_safe:
statement += f'MERGE (b:{tail_type_safe} {{name: "{tail}"}}) '
else:
statement += f'MERGE (b:UNTYPED {{name: "{tail}"}}) '
if relation:
if head and tail: # Only create relation if head and tail exist.
if relation_type_safe:
statement += f'MERGE (a)-[:{relation_type_safe} {{name: "{relation}"}}]->(b);'
else:
statement += f'MERGE (a)-[:UNTYPED {{name: "{relation}"}}]->(b);'
else:
statement += ';' if statement != "" else ''
else:
if relation_type_safe: # if relation is not provided, create relation by `relation_type`.
statement += f'MERGE (a)-[:{relation_type_safe} {{name: "{relation_type_safe}"}}]->(b);'
else:
statement += ';' if statement != "" else ''
return statement
if "triple_list" in parsed_data:
for triple in parsed_data["triple_list"]:
cypher_statements.append(create_statement(triple))
else:
cypher_statements.append(create_statement(parsed_data))
return cypher_statements
def execute_cypher_statements(uri, user, password, cypher_statements):
"""
Executes the generated Cypher query statements.
"""
driver = GraphDatabase.driver(uri, auth=(user, password))
with driver.session() as session:
for statement in cypher_statements:
session.run(statement)
print(f"Executed: {statement}")
# Write excuted cypher statements to a text file if you want.
# with open("executed_statements.txt", 'a') as f:
# for statement in cypher_statements:
# f.write(statement + '\n')
# f.write('\n')
driver.close()
# Here is a test of your database connection:
if __name__ == "__main__":
# test_data 1: Contains a list of triples
test_data = '''
{
"triple_list": [
{
"head": "J.K. Rowling",
"head_type": "Person",
"relation": "wrote",
"relation_type": "Actions",
"tail": "Fantastic Beasts and Where to Find Them",
"tail_type": "Book"
},
{
"head": "Fantastic Beasts and Where to Find Them",
"head_type": "Book",
"relation": "extra section of",
"relation_type": "Affiliation",
"tail": "Harry Potter Series",
"tail_type": "Book"
},
{
"head": "J.K. Rowling",
"head_type": "Person",
"relation": "wrote",
"relation_type": "Actions",
"tail": "Harry Potter Series",
"tail_type": "Book"
},
{
"head": "Harry Potter Series",
"head_type": "Book",
"relation": "create",
"relation_type": "Actions",
"tail": "Dumbledore",
"tail_type": "Person"
},
{
"head": "Fantastic Beasts and Where to Find Them",
"head_type": "Book",
"relation": "mention",
"relation_type": "Actions",
"tail": "Dumbledore",
"tail_type": "Person"
},
{
"head": "Voldemort",
"head_type": "Person",
"relation": "afrid",
"relation_type": "Emotion",
"tail": "Dumbledore",
"tail_type": "Person"
},
{
"head": "Voldemort",
"head_type": "Person",
"relation": "robs",
"relation_type": "Actions",
"tail": "the Elder Wand",
"tail_type": "Weapon"
},
{
"head": "the Elder Wand",
"head_type": "Weapon",
"relation": "belong to",
"relation_type": "Affiliation",
"tail": "Dumbledore",
"tail_type": "Person"
}
]
}
'''
# test_data 2: Contains a single triple
# test_data = '''
# {
# "head": "Christopher Nolan",
# "head_type": "Person",
# "relation": "directed",
# "relation_type": "Action",
# "tail": "Inception",
# "tail_type": "Movie"
# }
# '''
# Generate Cypher query statements
cypher_statements = generate_cypher_statements(test_data)
# Print the generated Cypher query statements
for statement in cypher_statements:
print(statement)
print("\n")
# Execute the generated Cypher query statements
execute_cypher_statements(
uri="neo4j://localhost:7687", # your URI
user="your_username", # your username
password="your_password", # your password
cypher_statements=cypher_statements,
)
|