Skip to content

Commit 4b8f4c9

Browse files
authored
Merge pull request #8 from KnowledgeLinks/feature/issue-1
Feature/issue 1
2 parents f068cc9 + 62a8fb0 commit 4b8f4c9

14 files changed

+261
-69
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
/venv
22
/.idea
3+
*__pycache__*
-147 Bytes
Binary file not shown.
-254 Bytes
Binary file not shown.
-2.1 KB
Binary file not shown.
-3.85 KB
Binary file not shown.

kldgraph/batcher.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
from kldgraph import rdfuri, consumer
2+
3+
4+
class BatchProcessor:
5+
"""
6+
Records the item count and sends the data to BatchConsumer once the threshold is met.
7+
"""
8+
9+
def __init__(self, dataset, batch_consumer=None, batch_size=10000):
10+
self.dataset = dataset
11+
self.batch_size = batch_size
12+
self.count = 0
13+
self.total_count = 0
14+
self.batch_consumer = batch_consumer if batch_consumer else consumer.BatchConsumer(self.dataset)
15+
16+
def increment(self):
17+
"""
18+
increments the count and then test to see if the count has reached the batch size by calling test_size
19+
20+
:return: None
21+
"""
22+
self.count += 1
23+
self.total_count += 1
24+
self.test_size_send()
25+
26+
def test_size_send(self):
27+
"""
28+
tests to see if the count has reached the batch size and if it has it tells the batch_consumer to send all data
29+
from the dataset.
30+
31+
:return: None
32+
"""
33+
if self.count >= self.batch_size:
34+
self.batch_consumer.send()
35+
self.count = 0

kldgraph/consumer.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
from kldgraph import dgraphapi, rdfuri
2+
3+
4+
class BatchConsumer:
5+
"""
6+
consumes data and transfers tha data
7+
"""
8+
9+
def __init__(self, dataset, destination=None):
10+
self.dataset = dataset
11+
self.destination = destination if destination else dgraphapi.mutate_add_dataset
12+
13+
def send(self):
14+
"""
15+
sends the data to the destination
16+
:return:
17+
"""
18+
result = self.destination(self.dataset)
19+
self.dataset.clear()
20+
rdfuri.Node.clear_all_registries()
21+
return result

kldgraph/dataset.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,3 +55,15 @@ def are_nodes_set(self):
5555
except AttributeError:
5656
pass
5757
return True
58+
59+
def clear(self):
60+
"""
61+
cycles through the dictionary object to ensure all circular reference are removed
62+
:return: None
63+
"""
64+
for subj, pred_ref in self.items():
65+
for pred in pred_ref.keys():
66+
self[subj][pred].clear()
67+
self[subj].clear()
68+
super().clear()
69+

kldgraph/dgraphapi.py

Lines changed: 76 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -1,81 +1,93 @@
11
import json
22
import pydgraph
33

4-
client_stub = pydgraph.DgraphClientStub('localhost:9080')
5-
client = pydgraph.DgraphClient(client_stub)
4+
DEFAULT_URL = 'localhost:9080'
65

76

87
class XidDoesNotExistError(Exception):
98
pass
109

1110

12-
def get_uid_for_xid(xid):
13-
try:
14-
return find_uid_for_xid(xid)
15-
except XidDoesNotExistError:
16-
return create_uid_for_xid(xid)
11+
class Api:
1712

13+
def __init__(self, url=DEFAULT_URL):
14+
self.url = url
15+
self.client_stub = pydgraph.DgraphClientStub(url)
16+
self.client = pydgraph.DgraphClient(self.client_stub)
1817

19-
def find_uid_for_xid(xid):
20-
qry = """
21-
{{
22-
lookup(func: eq(xid, "{xid}"))
23-
{{uid}}
24-
}}
25-
""".format(xid=xid)
26-
try:
27-
result = client.query(qry)
28-
data = json.loads(result.json)
29-
return data['lookup'][0]['uid']
30-
except (KeyError, IndexError):
31-
raise XidDoesNotExistError(xid)
32-
except Exception:
33-
add_xid_to_schema()
34-
raise XidDoesNotExistError(xid)
18+
def get_uid_for_xid(self, xid):
19+
try:
20+
return self.find_uid_for_xid(xid)
21+
except XidDoesNotExistError:
22+
return self.create_uid_for_xid(xid)
3523

24+
def find_uid_for_xid(self, xid):
25+
qry = """
26+
{{
27+
lookup(func: eq(xid, "{xid}"))
28+
{{uid}}
29+
}}
30+
""".format(xid=xid)
31+
try:
32+
result = self.client.query(qry)
33+
data = json.loads(result.json)
34+
return data['lookup'][0]['uid']
35+
except (KeyError, IndexError):
36+
raise XidDoesNotExistError(xid)
37+
except Exception:
38+
self.add_xid_to_schema()
39+
raise XidDoesNotExistError(xid)
3640

37-
def create_uid_for_xid(xid):
38-
data = {'xid': xid}
39-
txn = client.txn()
40-
uid = None
41-
try:
42-
result = txn.mutate(set_obj=data)
43-
txn.commit()
44-
uid = [result.uids[x] for x in result.uids][0]
45-
finally:
46-
txn.discard()
47-
return uid
41+
def create_uid_for_xid(self, xid):
42+
data = {'xid': xid}
43+
txn = self.client.txn()
44+
uid = None
45+
try:
46+
result = txn.mutate(set_obj=data)
47+
txn.commit()
48+
uid = [result.uids[x] for x in result.uids][0]
49+
finally:
50+
txn.discard()
51+
return uid
4852

53+
def add_xid_to_schema(self):
54+
"""
55+
adds the xid schema to dgraph
56+
:return: True if added
57+
"""
58+
schema = "xid: string @index(exact) ."
59+
op = pydgraph.Operation(schema=schema)
60+
self.client.alter(op)
4961

50-
def add_xid_to_schema():
51-
"""
52-
adds the xid schema to dgraph
53-
:return: True if added
54-
"""
55-
schema = "xid: string @index(exact) ."
56-
op = pydgraph.Operation(schema=schema)
57-
client.alter(op)
62+
def mutate_add_dataset(self, dataset):
63+
"""
64+
adds the triples in the dataset to dgraph
65+
:param dataset:
66+
:return:
67+
"""
68+
dataset.lookup_nodes()
69+
txn = self.client.txn()
70+
try:
71+
result = txn.mutate(set_nquads=dataset.formatter.to_rdf())
72+
txn.commit()
73+
finally:
74+
txn.discard()
75+
return result
5876

77+
def drop_all(self):
78+
"""
79+
Drops all data from dgraph
80+
:return:
81+
"""
82+
op = pydgraph.Operation(drop_all=True)
83+
self.client.alter(op)
5984

60-
def mutate_add_dataset(dataset):
61-
"""
62-
adds the triples in the dataset to dgraph
63-
:param dataset:
64-
:return:
65-
"""
66-
dataset.lookup_nodes()
67-
txn = client.txn()
68-
try:
69-
result = txn.mutate(set_nquads=dataset.formatter.to_rdf())
70-
txn.commit()
71-
finally:
72-
txn.discard()
73-
74-
75-
def drop_all():
76-
"""
77-
Drops all data from dgraph
78-
:return:
79-
"""
80-
op = pydgraph.Operation(drop_all=True)
81-
client.alter(op)
85+
# default instance of the API is generated on module load and the class methods of that instance are available at the
86+
# module level.
87+
default_api = Api()
88+
drop_all = default_api.drop_all
89+
mutate_add_dataset = default_api.mutate_add_dataset
90+
add_xid_to_schema = default_api.add_xid_to_schema
91+
create_uid_for_xid = default_api.create_uid_for_xid
92+
find_uid_for_xid = default_api.find_uid_for_xid
93+
get_uid_for_xid = default_api.get_uid_for_xid

kldgraph/ntparse.py

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
__doc__ = """
2+
License: GPL 2, W3C, BSD, or MIT
3+
"""
14
import re
25
from rdflib.plugins.parsers.ntriples import (r_nodeid,
36
r_literal,
@@ -6,6 +9,8 @@
69
unquote)
710
import kldgraph.rdfuri as rdfuri
811
from kldgraph.tracker import Tracker
12+
from kldgraph import dataset
13+
from kldgraph.batcher import BatchProcessor
914

1015

1116
class Store:
@@ -23,27 +28,41 @@ def triple(self, s, p, o):
2328

2429

2530
class NtParser(NTriplesParser):
31+
"""
32+
The parser extends rdflib's NTriplesParser
33+
"""
2634
count = 0
2735

28-
def __init__(self, sink=None, use_tracker=False, tracker=None):
36+
def __init__(self, sink=None, use_tracker=False, tracker=None, use_batcher=False, batcher=None):
37+
if not sink:
38+
sink = dataset.Dataset()
2939
super().__init__(sink)
3040
self.tracker = None
3141
if use_tracker or tracker:
3242
if tracker:
3343
self.tracker = tracker
3444
else:
3545
self.tracker = Tracker()
46+
if use_batcher or batcher:
47+
if batcher:
48+
self.batcher = batcher
49+
else:
50+
self.batcher = BatchProcessor(sink)
3651

3752
def parse(self, *args):
3853
if self.tracker:
3954
self.tracker.start()
4055
super().parse(*args)
56+
if self.batcher:
57+
self.batcher.batch_consumer.send()
4158

4259
def readline(self):
4360
val = super().readline()
4461
self.count += 1
4562
if self.tracker:
4663
self.tracker.increment_count()
64+
if self.batcher:
65+
self.batcher.increment()
4766
return val
4867

4968
def uriref(self):
@@ -55,7 +74,6 @@ def uriref(self):
5574

5675
def nodeid(self):
5776
if self.peek('_'):
58-
# Fix for https://github.com/RDFLib/rdflib/issues/204
5977
bnode_id = self.eat(r_nodeid).group(1)
6078
return rdfuri.Node(bnode_id, rdfuri.NodeType.BLANK)
6179
return False

0 commit comments

Comments
 (0)