-
Notifications
You must be signed in to change notification settings - Fork 37
/
Copy pathrag_schema_from_onto.py
152 lines (131 loc) · 4.94 KB
/
rag_schema_from_onto.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
from rdflib import Graph, URIRef, XSD
from rdflib.namespace import RDF, OWL, RDFS, DefinedNamespace, Namespace
from neo4j_graphrag.experimental.components.schema import (
SchemaBuilder,
SchemaEntity,
SchemaProperty,
SchemaRelation,
SchemaConfig
)
def getLocalPart(uri):
pos = -1
pos = uri.rfind('#')
if pos < 0 :
pos = uri.rfind('/')
if pos < 0 :
pos = uri.rindex(':')
return uri[pos+1:]
def getNLOntology(g):
result = ''
definedcats = []
result += '\nNode Labels:\n'
for cat in g.subjects(RDF.type, OWL.Class):
result += getLocalPart(cat)
definedcats.append(cat)
for desc in g.objects(cat,RDFS.comment):
result += ': ' + desc + '\n'
extracats = {}
for cat in g.objects(None,RDFS.domain):
if not cat in definedcats:
extracats[cat] = None
for cat in g.objects(None,RDFS.range):
if not (cat.startswith("http://www.w3.org/2001/XMLSchema#") or cat in definedcats):
extracats[cat] = None
for xtracat in extracats.keys():
result += getLocalPart(cat) + ":\n"
result += '\nNode Properties:\n'
for att in g.subjects(RDF.type, OWL.DatatypeProperty):
result += getLocalPart(att)
for dom in g.objects(att,RDFS.domain):
result += ': Attribute that applies to entities of type ' + getLocalPart(dom)
for desc in g.objects(att,RDFS.comment):
result += '. It represents ' + desc + '\n'
result += '\nRelationships:\n'
for att in g.subjects(RDF.type, OWL.ObjectProperty):
result += getLocalPart(att)
for dom in g.objects(att,RDFS.domain):
result += ': Relationship that connects entities of type ' + getLocalPart(dom)
for ran in g.objects(att,RDFS.range):
result += ' to entities of type ' + getLocalPart(ran)
for desc in g.objects(att,RDFS.comment):
result += '. It represents ' + desc + '\n'
return result
def getPropertiesForClass(g, cat):
props = []
for dtp in g.subjects(RDFS.domain,cat):
if (dtp, RDF.type, OWL.DatatypeProperty) in g:
propName = getLocalPart(dtp)
propDesc = next(g.objects(dtp, RDFS.comment),"")
props.append(SchemaProperty(name=propName,
type=convert_to_di_data_type(next(g.objects(dtp, RDFS.range),"")),
description=propDesc))
return props
def getSchemaFromOnto(path) -> SchemaConfig:
g = Graph()
g.parse(path)
schema_builder = SchemaBuilder()
classes = {}
entities =[]
rels =[]
triples = []
for cat in g.subjects(RDF.type, OWL.Class):
classes[cat] = None
label = getLocalPart(cat)
props = getPropertiesForClass(g, cat)
entities.append(SchemaEntity(label=label,
description=next(g.objects(cat,RDFS.comment),""),
properties=props))
for cat in g.objects(None,RDFS.domain):
if not cat in classes.keys():
classes[cat] = None
label = getLocalPart(cat)
props = getPropertiesForClass(g, cat)
entities.append(SchemaEntity(label=label,
description=next(g.objects(cat,RDFS.comment),""),
properties=props))
for cat in g.objects(None,RDFS.range):
if not (cat.startswith("http://www.w3.org/2001/XMLSchema#") or cat in classes.keys()):
classes[cat] = None
label = getLocalPart(cat)
props = getPropertiesForClass(g, cat)
entities.append(SchemaEntity(label=label,
description=next(g.objects(cat,RDFS.comment),""),
properties=props))
for op in g.subjects(RDF.type, OWL.ObjectProperty):
relname = getLocalPart(op)
rels.append(SchemaRelation(label=relname,
properties = [],
description=next(g.objects(op,RDFS.comment), "")))
for op in g.subjects(RDF.type, OWL.ObjectProperty):
relname = getLocalPart(op)
doms = []
rans = []
for dom in g.objects(op,RDFS.domain):
if dom in classes.keys():
doms.append(getLocalPart(dom))
for ran in g.objects(op,RDFS.range):
if ran in classes.keys():
rans.append(getLocalPart(ran))
for d in doms:
for r in rans:
triples.append((d,relname,r))
return schema_builder.create_schema_model(entities=entities,
relations=rels,
potential_schema=triples)
def getPKs(g):
keys = []
for k in g.subjects(RDF.type, OWL.InverseFunctionalProperty):
keys.append(getLocalPart(k))
return keys
def convert_to_di_data_type(datatype):
if datatype in {XSD.integer, XSD.int, XSD.positiveInteger, XSD.negativeInteger, XSD.nonPositiveInteger,
XSD.nonNegativeInteger, XSD.long, XSD.short, XSD.unsignedLong, XSD.unsignedShort}:
return "INTEGER"
elif datatype in {XSD.decimal, XSD.float, XSD.double}:
return "FLOAT"
elif datatype == XSD.boolean:
return "BOOLEAN"
#elif datatype == XSD.dateTime:
# return "LOCAL_DATETIME"
else:
return "STRING"