From ba9951a3148c693ca32ffb401bee3e94b997b971 Mon Sep 17 00:00:00 2001 From: Siyuan Zhang Date: Wed, 16 Aug 2023 14:05:39 +0800 Subject: [PATCH] feat(interactive): Refine the GrootClient for concurrent and async write. (#3113) --- docs/storage_engine/groot.md | 11 + interactive_engine/groot-client/pom.xml | 17 + .../graphscope/groot/sdk/GrootClient.java | 346 ++++++++++-------- .../groot/sdk/example/IngestFile.java | 12 +- .../groot/sdk/example/LoadLdbc.java | 61 +-- .../groot/sdk/example/RealtimeWrite.java | 332 +++++++++++++++-- .../groot/sdk/example/TimeWatch.java | 37 ++ .../graphscope/groot/sdk/schema/Edge.java | 115 ++++++ .../graphscope/groot/sdk/schema/Vertex.java | 52 +++ .../groot/sdk/ClientBackupTest.java | 7 +- .../graphscope/groot/sdk/ClientTest.java | 23 +- .../coordinator/GarbageCollectManager.java | 9 +- .../groot/tests/gremlin/GrootGraph.java | 124 +++---- 13 files changed, 852 insertions(+), 294 deletions(-) create mode 100644 interactive_engine/groot-client/src/main/java/com/alibaba/graphscope/groot/sdk/example/TimeWatch.java create mode 100644 interactive_engine/groot-client/src/main/java/com/alibaba/graphscope/groot/sdk/schema/Edge.java create mode 100644 interactive_engine/groot-client/src/main/java/com/alibaba/graphscope/groot/sdk/schema/Vertex.java diff --git a/docs/storage_engine/groot.md b/docs/storage_engine/groot.md index d76aa7452dbc..589d891e3985 100644 --- a/docs/storage_engine/groot.md +++ b/docs/storage_engine/groot.md @@ -488,6 +488,10 @@ Else, please proceed to ingest and commit. Groot graph have several methods for realtime write as follows: +#### Python + +Refer to [test_store_service.py](https://github.com/alibaba/GraphScope/blob/main/python/graphscope/tests/kubernetes/test_store_service.py) for examples. + ```python # Inserts one vertex def insert_vertex(self, vertex: VertexRecordKey, properties: dict) -> int: pass @@ -549,6 +553,13 @@ class EdgeRecordKey: self.eid: int = eid # Only required in update and delete operation ``` + +#### Java + +We also have a java sdk for realtime write and schema management. + +Refer to [RealtimeWrite.java](https://github.com/alibaba/GraphScope/blob/main/interactive_engine/groot-client/src/main/java/com/alibaba/graphscope/groot/sdk/example/RealtimeWrite.java) for examples. + ## Uninstalling and Restarting ### Uninstall Groot diff --git a/interactive_engine/groot-client/pom.xml b/interactive_engine/groot-client/pom.xml index 1a3d9cadcfc0..01d2b7c3f0dd 100644 --- a/interactive_engine/groot-client/pom.xml +++ b/interactive_engine/groot-client/pom.xml @@ -168,6 +168,23 @@ org.apache.maven.plugins maven-javadoc-plugin + + + org.codehaus.mojo + exec-maven-plugin + 3.1.0 + + java + + -classpath + + com.alibaba.graphscope.groot.sdk.example.RealtimeWrite + + com.alibaba.graphscope.groot.sdk.example.RealtimeWrite + 1.11 + -1 + + diff --git a/interactive_engine/groot-client/src/main/java/com/alibaba/graphscope/groot/sdk/GrootClient.java b/interactive_engine/groot-client/src/main/java/com/alibaba/graphscope/groot/sdk/GrootClient.java index 46453683fb82..b4600669cd82 100644 --- a/interactive_engine/groot-client/src/main/java/com/alibaba/graphscope/groot/sdk/GrootClient.java +++ b/interactive_engine/groot-client/src/main/java/com/alibaba/graphscope/groot/sdk/GrootClient.java @@ -13,11 +13,14 @@ */ package com.alibaba.graphscope.groot.sdk; +import com.alibaba.graphscope.groot.sdk.schema.Edge; import com.alibaba.graphscope.groot.sdk.schema.Schema; +import com.alibaba.graphscope.groot.sdk.schema.Vertex; import com.alibaba.graphscope.proto.groot.*; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; +import io.grpc.stub.StreamObserver; import java.io.IOException; import java.net.InetSocketAddress; @@ -28,35 +31,31 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; public class GrootClient { private final ClientGrpc.ClientBlockingStub clientStub; private final ClientWriteGrpc.ClientWriteBlockingStub writeStub; + private final ClientWriteGrpc.ClientWriteStub asyncWriteStub; private final ClientBackupGrpc.ClientBackupBlockingStub backupStub; private final GrootDdlServiceGrpc.GrootDdlServiceBlockingStub ddlStub; - private String clientId = "DEFAULT"; - - private BatchWriteRequest.Builder batchWriteBuilder; private GrootClient( ClientGrpc.ClientBlockingStub clientBlockingStub, ClientWriteGrpc.ClientWriteBlockingStub clientWriteBlockingStub, + ClientWriteGrpc.ClientWriteStub clientWriteStub, ClientBackupGrpc.ClientBackupBlockingStub clientBackupBlockingStub, GrootDdlServiceGrpc.GrootDdlServiceBlockingStub ddlServiceBlockingStub) { this.clientStub = clientBlockingStub; this.writeStub = clientWriteBlockingStub; + this.asyncWriteStub = clientWriteStub; this.backupStub = clientBackupBlockingStub; this.ddlStub = ddlServiceBlockingStub; - this.reset(); } public void close() {} - private void reset() { - this.batchWriteBuilder = BatchWriteRequest.newBuilder().setClientId(this.clientId); - } - public com.alibaba.graphscope.proto.GraphDefPb submitSchema(Schema schema) { BatchSubmitRequest request = schema.toProto(); BatchSubmitResponse response = ddlStub.batchSubmit(request); @@ -67,25 +66,10 @@ public com.alibaba.graphscope.proto.GraphDefPb submitSchema(Schema.Builder schem return submitSchema(schema.build()); } - public void initWriteSession() { - this.clientId = - this.writeStub.getClientId(GetClientIdRequest.newBuilder().build()).getClientId(); - this.reset(); - } - - /** - * Commit the realtime write transaction. - * @return The snapshot_id. The data committed would be available after a while, or you could remoteFlush(snapshot_id) - * and wait for its return. - */ - public long commit() { - long snapshotId = 0L; - if (this.batchWriteBuilder.getWriteRequestsCount() > 0) { - BatchWriteResponse response = this.writeStub.batchWrite(this.batchWriteBuilder.build()); - snapshotId = response.getSnapshotId(); - } - this.reset(); - return snapshotId; + private BatchWriteRequest.Builder getNewWriteBuilder() { + String clientId = + writeStub.getClientId(GetClientIdRequest.newBuilder().build()).getClientId(); + return BatchWriteRequest.newBuilder().setClientId(clientId); } /** @@ -93,104 +77,213 @@ public long commit() { * @param snapshotId the snapshot id to be flushed */ public void remoteFlush(long snapshotId) { - this.writeStub.remoteFlush( - RemoteFlushRequest.newBuilder().setSnapshotId(snapshotId).build()); + if (snapshotId != 0) { + this.writeStub.remoteFlush( + RemoteFlushRequest.newBuilder().setSnapshotId(snapshotId).build()); + } + } + + private long modifyVertex(Vertex vertex, WriteTypePb writeType) { + WriteRequestPb request = vertex.toWriteRequest(writeType); + return submit(request); + } + + private long modifyVertex(List vertices, WriteTypePb writeType) { + List requests = getVertexWriteRequestPbs(vertices, writeType); + return submit(requests); + } + + private void modifyVertex( + Vertex vertex, StreamObserver callback, WriteTypePb writeType) { + WriteRequestPb request = vertex.toWriteRequest(writeType); + submit(request, callback); + } + + private void modifyVertex( + List vertices, + StreamObserver callback, + WriteTypePb writeType) { + List requests = getVertexWriteRequestPbs(vertices, writeType); + submit(requests, callback); + } + + private long modifyEdge(Edge edge, WriteTypePb writeType) { + WriteRequestPb request = edge.toWriteRequest(writeType); + return submit(request); + } + + private long modifyEdge(List edges, WriteTypePb writeType) { + List requests = getEdgeWriteRequestPbs(edges, writeType); + return submit(requests); + } + + private void modifyEdge( + Edge edge, StreamObserver callback, WriteTypePb writeType) { + WriteRequestPb request = edge.toWriteRequest(writeType); + submit(request, callback); + } + + private void modifyEdge( + List edges, StreamObserver callback, WriteTypePb writeType) { + List requests = getEdgeWriteRequestPbs(edges, writeType); + submit(requests, callback); } /** * Add vertex by realtime write - * @param label vertex label - * @param properties properties, including the primary key + * @param vertex vertex that contains label and pk properties and other properties */ - public void addVertex(String label, Map properties) { - DataRecordPb record = getVertexDataRecord(label, properties); - WriteRequestPb request = getWriteRequestPb(record, WriteTypePb.INSERT); - this.batchWriteBuilder.addWriteRequests(request); + public long addVertex(Vertex vertex) { + return modifyVertex(vertex, WriteTypePb.INSERT); + } + + public long addVertices(List vertices) { + return modifyVertex(vertices, WriteTypePb.INSERT); + } + + public void addVertex(Vertex vertex, StreamObserver callback) { + modifyVertex(vertex, callback, WriteTypePb.INSERT); + } + + public void addVertices(List vertices, StreamObserver callback) { + modifyVertex(vertices, callback, WriteTypePb.INSERT); } /** * Update existed vertex by realtime write - * @param label vertex label - * @param properties properties, including the primary key + * @param vertex vertex that contains label and pk properties and other properties */ - public void updateVertex(String label, Map properties) { - DataRecordPb record = getVertexDataRecord(label, properties); - WriteRequestPb request = getWriteRequestPb(record, WriteTypePb.UPDATE); - this.batchWriteBuilder.addWriteRequests(request); + public long updateVertex(Vertex vertex) { + return modifyVertex(vertex, WriteTypePb.UPDATE); + } + + public long updateVertices(List vertices) { + return modifyVertex(vertices, WriteTypePb.UPDATE); + } + + public void updateVertex(Vertex vertex, StreamObserver callback) { + modifyVertex(vertex, callback, WriteTypePb.UPDATE); + } + + public void updateVertices(List vertices, StreamObserver callback) { + modifyVertex(vertices, callback, WriteTypePb.UPDATE); } /** * Delete vertex by its primary key - * @param label vertex label - * @param properties properties, contains only the primary key + * @param vertex vertex that contains label and primary key properties */ - public void deleteVertex(String label, Map properties) { - DataRecordPb record = getVertexDataRecord(label, properties); - WriteRequestPb request = getWriteRequestPb(record, WriteTypePb.DELETE); - this.batchWriteBuilder.addWriteRequests(request); + public long deleteVertex(Vertex vertex) { + WriteRequestPb request = vertex.toWriteRequest(WriteTypePb.DELETE); + return submit(request); + } + + public long deleteVertices(List vertices) { + List requests = getVertexWriteRequestPbs(vertices, WriteTypePb.DELETE); + return submit(requests); + } + + public void deleteVertex(Vertex vertex, StreamObserver callback) { + modifyVertex(vertex, callback, WriteTypePb.DELETE); + } + + public void deleteVertices(List vertices, StreamObserver callback) { + modifyVertex(vertices, callback, WriteTypePb.DELETE); } /** * Add edge by realtime write - * @param label edge label - * @param srcLabel source vertex label - * @param dstLabel destination vertex label - * @param srcPk source primary keys - * @param dstPk destination primary keys - * @param properties edge properties + * @param edge edge that contains label, src vertex label and pk, dst label and pk, and properties */ - public void addEdge( - String label, - String srcLabel, - String dstLabel, - Map srcPk, - Map dstPk, - Map properties) { - DataRecordPb record = - getEdgeDataRecord(label, srcLabel, dstLabel, srcPk, dstPk, properties); - WriteRequestPb request = getWriteRequestPb(record, WriteTypePb.INSERT); - this.batchWriteBuilder.addWriteRequests(request); + public long addEdge(Edge edge) { + return modifyEdge(edge, WriteTypePb.INSERT); + } + + public long addEdges(List edges) { + return modifyEdge(edges, WriteTypePb.INSERT); + } + + public void addEdge(Edge edge, StreamObserver callback) { + modifyEdge(edge, callback, WriteTypePb.INSERT); + } + + public void addEdges(List edges, StreamObserver callback) { + modifyEdge(edges, callback, WriteTypePb.INSERT); } /** * Update existed edge by realtime write - * @param label edge label - * @param srcLabel source vertex label - * @param dstLabel destination vertex label - * @param srcPk source primary keys - * @param dstPk destination primary keys - * @param properties edge properties + * @param edge edge that contains label, src vertex label and pk, dst label and pk, and properties */ - public void updateEdge( - String label, - String srcLabel, - String dstLabel, - Map srcPk, - Map dstPk, - Map properties) { - DataRecordPb record = - getEdgeDataRecord(label, srcLabel, dstLabel, srcPk, dstPk, properties); - WriteRequestPb request = getWriteRequestPb(record, WriteTypePb.INSERT); - this.batchWriteBuilder.addWriteRequests(request); + public long updateEdge(Edge edge) { + return modifyEdge(edge, WriteTypePb.UPDATE); + } + + public long updateEdges(List edges) { + return modifyEdge(edges, WriteTypePb.UPDATE); + } + + public void updateEdge(Edge edge, StreamObserver callback) { + modifyEdge(edge, callback, WriteTypePb.UPDATE); + } + + public void updateEdges(List edges, StreamObserver callback) { + modifyEdge(edges, callback, WriteTypePb.UPDATE); } /** * Delete an edge by realtime write - * @param label edge label - * @param srcLabel source vertex label - * @param dstLabel destination vertex label - * @param srcPk source primary keys - * @param dstPk destination primary keys + * @param edge edge that contains label, src vertex label and pk, dst label and pk, no properties required + */ + public long deleteEdge(Edge edge) { + return modifyEdge(edge, WriteTypePb.DELETE); + } + + public long deleteEdges(List edges) { + return modifyEdge(edges, WriteTypePb.DELETE); + } + + public void deleteEdge(Edge edge, StreamObserver callback) { + modifyEdge(edge, callback, WriteTypePb.DELETE); + } + + public void deleteEdges(List edges, StreamObserver callback) { + modifyEdge(edges, callback, WriteTypePb.DELETE); + } + + /** + * Commit the realtime write transaction. + * @return The snapshot_id. The data committed would be available after a while, or you could remoteFlush(snapshot_id) + * and wait for its return. */ - public void deleteEdge( - String label, - String srcLabel, - String dstLabel, - Map srcPk, - Map dstPk) { - DataRecordPb record = getEdgeDataRecord(label, srcLabel, dstLabel, srcPk, dstPk, null); - WriteRequestPb request = getWriteRequestPb(record, WriteTypePb.INSERT); - this.batchWriteBuilder.addWriteRequests(request); + private long submit(WriteRequestPb request) { + BatchWriteRequest.Builder batchWriteBuilder = getNewWriteBuilder(); + batchWriteBuilder.addWriteRequests(request); + return writeStub.batchWrite(batchWriteBuilder.build()).getSnapshotId(); + } + + private long submit(List requests) { + if (requests.isEmpty()) { + return 0; + } + BatchWriteRequest.Builder batchWriteBuilder = getNewWriteBuilder(); + batchWriteBuilder.addAllWriteRequests(requests); + return writeStub.batchWrite(batchWriteBuilder.build()).getSnapshotId(); + } + + private void submit(WriteRequestPb request, StreamObserver callback) { + BatchWriteRequest.Builder batchWriteBuilder = getNewWriteBuilder(); + batchWriteBuilder.addWriteRequests(request); + asyncWriteStub.batchWrite(batchWriteBuilder.build(), callback); + } + + private void submit( + List requests, StreamObserver callback) { + if (!requests.isEmpty()) { + BatchWriteRequest.Builder batchWriteBuilder = getNewWriteBuilder(); + batchWriteBuilder.addAllWriteRequests(requests); + asyncWriteStub.batchWrite(batchWriteBuilder.build(), callback); + } } public GraphDefPb getSchema() { @@ -364,6 +457,7 @@ public GrootClient build() { ClientGrpc.ClientBlockingStub clientBlockingStub = ClientGrpc.newBlockingStub(channel); ClientWriteGrpc.ClientWriteBlockingStub clientWriteBlockingStub = ClientWriteGrpc.newBlockingStub(channel); + ClientWriteGrpc.ClientWriteStub clientWriteStub = ClientWriteGrpc.newStub(channel); ClientBackupGrpc.ClientBackupBlockingStub clientBackupBlockingStub = ClientBackupGrpc.newBlockingStub(channel); GrootDdlServiceGrpc.GrootDdlServiceBlockingStub ddlServiceBlockingStub = @@ -372,69 +466,29 @@ public GrootClient build() { BasicAuth basicAuth = new BasicAuth(username, password); clientBlockingStub = clientBlockingStub.withCallCredentials(basicAuth); clientWriteBlockingStub = clientWriteBlockingStub.withCallCredentials(basicAuth); + clientWriteStub = clientWriteStub.withCallCredentials(basicAuth); clientBackupBlockingStub = clientBackupBlockingStub.withCallCredentials(basicAuth); ddlServiceBlockingStub = ddlServiceBlockingStub.withCallCredentials(basicAuth); } return new GrootClient( clientBlockingStub, clientWriteBlockingStub, + clientWriteStub, clientBackupBlockingStub, ddlServiceBlockingStub); } } - private VertexRecordKeyPb getVertexRecordKeyPb(String label, Map properties) { - VertexRecordKeyPb.Builder builder = VertexRecordKeyPb.newBuilder().setLabel(label); - if (properties != null) { - builder.putAllPkProperties(properties); - } - return builder.build(); - } - - private EdgeRecordKeyPb getEdgeRecordKeyPb( - String label, VertexRecordKeyPb src, VertexRecordKeyPb dst) { - return EdgeRecordKeyPb.newBuilder() - .setLabel(label) - .setSrcVertexKey(src) - .setDstVertexKey(dst) - .build(); - } - - private DataRecordPb getDataRecordPb(VertexRecordKeyPb key, Map properties) { - DataRecordPb.Builder builder = DataRecordPb.newBuilder().setVertexRecordKey(key); - if (properties != null) { - builder.putAllProperties(properties); - } - return builder.build(); - } - - private DataRecordPb getDataRecordPb(EdgeRecordKeyPb key, Map properties) { - DataRecordPb.Builder builder = DataRecordPb.newBuilder().setEdgeRecordKey(key); - if (properties != null) { - builder.putAllProperties(properties); - } - return builder.build(); - } - - private DataRecordPb getVertexDataRecord(String label, Map properties) { - VertexRecordKeyPb vertexRecordKey = getVertexRecordKeyPb(label, null); - return getDataRecordPb(vertexRecordKey, properties); - } - - private DataRecordPb getEdgeDataRecord( - String label, - String srcLabel, - String dstLabel, - Map srcPk, - Map dstPk, - Map properties) { - VertexRecordKeyPb src = getVertexRecordKeyPb(srcLabel, srcPk); - VertexRecordKeyPb dst = getVertexRecordKeyPb(dstLabel, dstPk); - EdgeRecordKeyPb edgeRecordKeyPb = getEdgeRecordKeyPb(label, src, dst); - return getDataRecordPb(edgeRecordKeyPb, properties); + private List getVertexWriteRequestPbs( + List vertices, WriteTypePb writeType) { + return vertices.stream() + .map(element -> element.toWriteRequest(writeType)) + .collect(Collectors.toList()); } - private WriteRequestPb getWriteRequestPb(DataRecordPb record, WriteTypePb writeType) { - return WriteRequestPb.newBuilder().setWriteType(writeType).setDataRecord(record).build(); + private List getEdgeWriteRequestPbs(List edges, WriteTypePb writeType) { + return edges.stream() + .map(element -> element.toWriteRequest(writeType)) + .collect(Collectors.toList()); } } diff --git a/interactive_engine/groot-client/src/main/java/com/alibaba/graphscope/groot/sdk/example/IngestFile.java b/interactive_engine/groot-client/src/main/java/com/alibaba/graphscope/groot/sdk/example/IngestFile.java index fc791d69dbfd..906b638981c9 100644 --- a/interactive_engine/groot-client/src/main/java/com/alibaba/graphscope/groot/sdk/example/IngestFile.java +++ b/interactive_engine/groot-client/src/main/java/com/alibaba/graphscope/groot/sdk/example/IngestFile.java @@ -14,6 +14,7 @@ package com.alibaba.graphscope.groot.sdk.example; import com.alibaba.graphscope.groot.sdk.GrootClient; +import com.alibaba.graphscope.groot.sdk.schema.Vertex; import java.io.BufferedReader; import java.io.File; @@ -56,24 +57,23 @@ public static void main(String[] args) throws IOException { propertyNames.add(item); } } else { + List vertices = new ArrayList<>(); Map properties = new HashMap<>(); String[] items = line.split("\\|"); for (int i = 0; i < items.length; i++) { properties.put(propertyNames.get(i), items[i]); } - client.addVertex(label, properties); + vertices.add(new Vertex(label, properties)); count++; if (count == batchSize) { - snapshotId = client.commit(); + snapshotId = client.addVertices(vertices); count = 0; } } } } - long maybeSnapshotId = client.commit(); - long flushSnapshotId = maybeSnapshotId == 0 ? snapshotId : maybeSnapshotId; - System.out.println("flush snapshotId [" + flushSnapshotId + "]"); - client.remoteFlush(flushSnapshotId); + System.out.println("flush snapshotId [" + snapshotId + "]"); + client.remoteFlush(snapshotId); System.out.println("done"); } } diff --git a/interactive_engine/groot-client/src/main/java/com/alibaba/graphscope/groot/sdk/example/LoadLdbc.java b/interactive_engine/groot-client/src/main/java/com/alibaba/graphscope/groot/sdk/example/LoadLdbc.java index 9b372722e59b..4fa33294c200 100644 --- a/interactive_engine/groot-client/src/main/java/com/alibaba/graphscope/groot/sdk/example/LoadLdbc.java +++ b/interactive_engine/groot-client/src/main/java/com/alibaba/graphscope/groot/sdk/example/LoadLdbc.java @@ -14,6 +14,8 @@ package com.alibaba.graphscope.groot.sdk.example; import com.alibaba.graphscope.groot.sdk.GrootClient; +import com.alibaba.graphscope.groot.sdk.schema.Edge; +import com.alibaba.graphscope.groot.sdk.schema.Vertex; import java.io.BufferedReader; import java.io.FileReader; @@ -91,7 +93,7 @@ public static String capitalize(String origin) { } private static void processVertex(GrootClient client, String label, Path path, int batchSize) - throws IOException, ParseException { + throws IOException { List propertyNames = new ArrayList<>(); int count = 0; long snapshotId = 0; @@ -103,6 +105,7 @@ private static void processVertex(GrootClient client, String label, Path path, i propertyNames.add(item.split(":")[0]); } } else { + List vertices = new ArrayList<>(); Map properties = new HashMap<>(); String[] items = line.split("\\|"); for (int i = 0; i < items.length; i++) { @@ -117,29 +120,27 @@ private static void processVertex(GrootClient client, String label, Path path, i // } properties.put(propertyName, propertyVal); } - try { - client.addVertex(label, properties); - } catch (Exception e) { - System.err.println( - "add vertex label [" - + label - + "], properties [" - + properties - + "] failed. Reason: " - + e); - } + vertices.add(new Vertex(label, properties)); count++; if (count == batchSize) { - snapshotId = client.commit(); + try { + snapshotId = client.addVertices(vertices); + } catch (Exception e) { + System.err.println( + "add vertex label [" + + label + + "], properties [" + + properties + + "] failed. Reason: " + + e); + } count = 0; } } } } - long maybeSnapshotId = client.commit(); - long flushSnapshotId = maybeSnapshotId == 0 ? snapshotId : maybeSnapshotId; - System.out.println("flush snapshotId [" + flushSnapshotId + "]"); - client.remoteFlush(flushSnapshotId); + System.out.println("flush snapshotId [" + snapshotId + "]"); + client.remoteFlush(snapshotId); System.out.println("done"); } @@ -150,7 +151,7 @@ private static void processEdge( String dstLabel, Path path, int batchSize) - throws IOException, ParseException { + throws IOException { List propertyNames = new ArrayList<>(); int count = 0; long snapshotId = 0; @@ -162,6 +163,7 @@ private static void processEdge( propertyNames.add(item.split(":")[0]); } } else { + List edges = new ArrayList<>(); Map properties = new HashMap<>(); String[] items = line.split("\\|"); for (int i = 2; i < items.length; i++) { @@ -180,25 +182,24 @@ private static void processEdge( // } properties.put(propertyName, propertyVal); } - client.addEdge( - label, - srcLabel, - dstLabel, - Collections.singletonMap("id", items[0]), - Collections.singletonMap("id", items[1]), - properties); + edges.add( + new Edge( + label, + srcLabel, + dstLabel, + Collections.singletonMap("id", items[0]), + Collections.singletonMap("id", items[1]), + properties)); count++; if (count == batchSize) { - snapshotId = client.commit(); + snapshotId = client.addEdges(edges); count = 0; } } } } - long maybeSnapshotId = client.commit(); - long flushSnapshotId = maybeSnapshotId == 0 ? snapshotId : maybeSnapshotId; - System.out.println("flush snapshotId [" + flushSnapshotId + "]"); - client.remoteFlush(flushSnapshotId); + System.out.println("flush snapshotId [" + snapshotId + "]"); + client.remoteFlush(snapshotId); System.out.println("done"); } } diff --git a/interactive_engine/groot-client/src/main/java/com/alibaba/graphscope/groot/sdk/example/RealtimeWrite.java b/interactive_engine/groot-client/src/main/java/com/alibaba/graphscope/groot/sdk/example/RealtimeWrite.java index 1d9fae1e7ae5..eb276c928855 100644 --- a/interactive_engine/groot-client/src/main/java/com/alibaba/graphscope/groot/sdk/example/RealtimeWrite.java +++ b/interactive_engine/groot-client/src/main/java/com/alibaba/graphscope/groot/sdk/example/RealtimeWrite.java @@ -1,27 +1,80 @@ package com.alibaba.graphscope.groot.sdk.example; import com.alibaba.graphscope.groot.sdk.GrootClient; +import com.alibaba.graphscope.groot.sdk.schema.*; +import com.alibaba.graphscope.proto.groot.BatchWriteResponse; +import com.alibaba.graphscope.proto.groot.DataTypePb; +import io.grpc.stub.StreamObserver; + +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; public class RealtimeWrite { + private static int startId = 0; + private static int recordNum = 10000 + startId; + + public void initSchema(GrootClient client) { + VertexLabel.Builder person = VertexLabel.newBuilder(); + person.setLabel("person"); + Property id = + Property.newBuilder() + .setName("id") + .setDataType(DataTypePb.LONG) + .setPrimaryKey() + .build(); + Property.Builder name = + Property.newBuilder().setName("name").setDataType(DataTypePb.STRING); + Property.Builder age = Property.newBuilder().setName("age").setDataType(DataTypePb.INT); + person.addProperty(id); + person.addProperty(name); + person.addProperty(age); + + VertexLabel.Builder software = VertexLabel.newBuilder(); + Property.Builder lang = + Property.newBuilder().setName("lang").setDataType(DataTypePb.STRING); + + software.setLabel("software"); + software.addProperty(id); + software.addProperty(name); + software.addProperty(lang); + + EdgeLabel.Builder created = EdgeLabel.newBuilder(); + created.setLabel("created"); + created.addRelation("person", "software"); + Property.Builder weight = + Property.newBuilder().setName("weight").setDataType(DataTypePb.LONG); + created.addProperty(weight); + + Schema.Builder schema = Schema.newBuilder(); + schema.addVertexLabel(person); + schema.addVertexLabel(software); + schema.addEdgeLabel(created); + + System.out.println(client.submitSchema(schema)); + System.out.println("testAddLabel succeed"); + } + private static void testAddVerticesEdges(GrootClient client) { - client.initWriteSession(); for (int i = 0; i < 10; ++i) { Map properties = new HashMap<>(); properties.put("id", String.valueOf(i)); properties.put("name", "person-" + i); properties.put("age", String.valueOf(i + 20)); - client.addVertex("person", properties); + client.addVertex(new Vertex("person", properties)); properties.clear(); properties.put("id", String.valueOf(i)); properties.put("name", "software-" + i); properties.put("lang", String.valueOf(i + 200)); - client.addVertex("software", properties); + client.addVertex(new Vertex("software", properties)); } - + long snapshotId = 0; for (int i = 0; i < 10; ++i) { Map srcPk = new HashMap<>(); Map dstPk = new HashMap<>(); @@ -30,15 +83,15 @@ private static void testAddVerticesEdges(GrootClient client) { srcPk.put("id", String.valueOf(i)); dstPk.put("id", String.valueOf(i)); properties.put("weight", String.valueOf(i * 100)); - client.addEdge("created", "person", "software", srcPk, dstPk, properties); + snapshotId = + client.addEdge( + new Edge("created", "person", "software", srcPk, dstPk, properties)); } - long snapshotId = client.commit(); client.remoteFlush(snapshotId); System.out.println("Finished adding vertices and edges"); } private static void testUpdateDeleteEdge(GrootClient client) { - client.initWriteSession(); Map srcPk = new HashMap<>(); Map dstPk = new HashMap<>(); Map properties = new HashMap<>(); @@ -46,42 +99,275 @@ private static void testUpdateDeleteEdge(GrootClient client) { srcPk.put("id", String.valueOf(0)); dstPk.put("id", String.valueOf(0)); properties.put("weight", String.valueOf(10000)); - client.updateEdge("created", "person", "software", srcPk, dstPk, properties); - long snapshotId = client.commit(); + long snapshotId = + client.updateEdge( + new Edge("created", "person", "software", srcPk, dstPk, properties)); client.remoteFlush(snapshotId); System.out.println("Finished update edge person-0 -> software-0"); - client.initWriteSession(); - client.deleteEdge("created", "person", "software", srcPk, dstPk); - snapshotId = client.commit(); + client.deleteEdge(new Edge("created", "person", "software", srcPk, dstPk)); client.remoteFlush(snapshotId); System.out.println("Finished delete edge person-0 -> software-0"); } private static void testUpdateDeleteVertex(GrootClient client) { - client.initWriteSession(); Map properties = new HashMap<>(); properties.put("id", String.valueOf(0)); properties.put("name", "marko-0-updated"); - client.updateVertex("person", properties); - long snapshotId = client.commit(); + long snapshotId = client.updateVertex(new Vertex("person", properties)); client.remoteFlush(snapshotId); System.out.println("Finished update vertex person-0"); - client.initWriteSession(); Map pk_properties = new HashMap<>(); - client.deleteVertex("person", pk_properties); - snapshotId = client.commit(); - client.remoteFlush(snapshotId); + client.deleteVertex(new Vertex("person", pk_properties)); System.out.println("Finished delete vertex person-0"); } - public static void main(String[] args) { + private static List getVerticesA() { + List vertices = new ArrayList<>(); + for (int i = startId; i < recordNum; ++i) { + Map properties = new HashMap<>(); + properties.put("id", String.valueOf(i)); + properties.put("name", "person-" + i); + properties.put("age", String.valueOf(i + 20)); + vertices.add(new Vertex("person", properties)); + } + return vertices; + } + + private static List getVerticesB() { + List vertices = new ArrayList<>(); + for (int i = startId; i < recordNum; ++i) { + Map properties = new HashMap<>(); + properties.put("id", String.valueOf(i)); + properties.put("name", "software-" + i); + properties.put("lang", String.valueOf(i + 200)); + vertices.add(new Vertex("software", properties)); + } + return vertices; + } + + private static List getEdges() { + List edges = new ArrayList<>(); + for (int i = startId; i < recordNum; ++i) { + Map srcPk = new HashMap<>(); + Map dstPk = new HashMap<>(); + Map properties = new HashMap<>(); + + srcPk.put("id", String.valueOf(i)); + dstPk.put("id", String.valueOf(i)); + properties.put("weight", String.valueOf(i * 100)); + edges.add(new Edge("created", "person", "software", srcPk, dstPk, properties)); + } + return edges; + } + + class ClientTask implements Runnable { + + private GrootClient client; + private List vertices; + private List edges; + + private int type; + + ClientTask(GrootClient client, int type, List vertices, List edges) { + this.client = client; + this.type = type; + this.vertices = vertices; + this.edges = edges; + } + + @Override + public void run() { + if (type == 0) { + for (int i = 0; i < vertices.size(); ++i) { + client.addVertex(vertices.get(i)); + } + } else { + for (int i = 0; i < edges.size(); ++i) { + client.addEdge(edges.get(i)); + } + } + } + } + + public void sequential( + GrootClient client, List verticesA, List verticesB, List edges) { + long snapshotId = 0; + TimeWatch watch = TimeWatch.start(); + { + watch.reset(); + for (Vertex vertex : verticesA) { + snapshotId = client.addVertex(vertex); + } + watch.status("VerticesA"); + } + { + watch.reset(); + for (Vertex vertex : verticesB) { + snapshotId = client.addVertex(vertex); + } + watch.status("VerticesB"); + } + { + watch.reset(); + client.remoteFlush(snapshotId); + watch.status("Flush Vertices"); + System.out.println("Finished add vertices"); + } + { + watch.reset(); + for (Edge edge : edges) { + snapshotId = client.addEdge(edge); + } + watch.status("Edges"); + } + { + watch.reset(); + client.remoteFlush(snapshotId); + watch.status("Flush Edges"); + System.out.println("Finished add edges"); + } + } + + public void sequentialBatch( + GrootClient client, List verticesA, List verticesB, List edges) { + long snapshotId = 0; + + TimeWatch watch = TimeWatch.start(); + + { + watch.reset(); + // snapshotId = client.addVertices(vertices); + for (int i = 0; i < verticesA.size(); i += 1000) { + snapshotId = client.addVertices(verticesA.subList(i, i + 1000)); + } + watch.status("VerticesA"); + } + { + watch.reset(); + // snapshotId = client.addVertices(vertices); + for (int i = 0; i < verticesB.size(); i += 1000) { + snapshotId = client.addVertices(verticesB.subList(i, i + 1000)); + } + watch.status("VerticesB"); + } + { + watch.reset(); + client.remoteFlush(snapshotId); + watch.status("Flush Vertices"); + System.out.println("Finished add vertices"); + } + { + watch.reset(); + // snapshotId = client.addEdges(edges); + for (int i = 0; i < edges.size(); i += 1000) { + snapshotId = client.addEdges(edges.subList(i, i + 1000)); + } + watch.status("Edges"); + } + { + watch.reset(); + client.remoteFlush(snapshotId); + watch.status("Flush Edges"); + System.out.println("Finished add edges"); + } + } + + public void parallel( + GrootClient client, List verticesA, List verticesB, List edges) + throws InterruptedException { + // Create thread pool with 10 threads + int taskNum = 30; + int offset = 10000 / taskNum; + TimeWatch watch = TimeWatch.start(); + { + ExecutorService executor = Executors.newFixedThreadPool(10); + // Submit 10 tasks to call submit() + + for (int i = 0; i < taskNum * offset; i += offset) { + int start = i; + int end = start + offset; + List subVerticesA = verticesA.subList(start, end); + List subVerticesB = verticesB.subList(start, end); + executor.submit(new ClientTask(client, 0, subVerticesA, null)); + executor.submit(new ClientTask(client, 0, subVerticesB, null)); + } + executor.shutdown(); + executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); + watch.status("Vertices"); + } + { + Thread.sleep(2000); + ExecutorService executor = Executors.newFixedThreadPool(10); + watch.reset(); + for (int i = 0; i < taskNum * offset; i += offset) { + int start = i; + int end = start + offset; + List subEdges = edges.subList(start, end); + executor.submit(new ClientTask(client, 1, null, subEdges)); + } + executor.shutdown(); + executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); + watch.status("Edges"); + } + } + + public void sequentialAsync( + GrootClient client, List verticesA, List verticesB, List edges) + throws InterruptedException { + TimeWatch watch = TimeWatch.start(); + class VertexCallBack implements StreamObserver { + @Override + public void onNext(BatchWriteResponse value) { + // System.out.println("on next"); + } + + @Override + public void onError(Throwable t) { + // System.out.println("on next"); + } + + @Override + public void onCompleted() { + // System.out.println("completed"); + } + } + { + watch.reset(); + for (Vertex vertex : verticesA) { + client.addVertex(vertex, new VertexCallBack()); + } + watch.status("VerticesA"); + } + { + watch.reset(); + for (Vertex vertex : verticesB) { + client.addVertex(vertex, new VertexCallBack()); + } + watch.status("VerticesB"); + } + } + + public static void main(String[] args) throws InterruptedException { String hosts = "localhost"; int port = 55556; GrootClient client = GrootClient.newBuilder().addHost(hosts, port).build(); - testAddVerticesEdges(client); - testUpdateDeleteEdge(client); - testUpdateDeleteVertex(client); + + RealtimeWrite writer = new RealtimeWrite(); + + client.dropSchema(); + writer.initSchema(client); + + List verticesA = RealtimeWrite.getVerticesA(); + List verticesB = RealtimeWrite.getVerticesB(); + List edges = RealtimeWrite.getEdges(); + + TimeWatch watch = TimeWatch.start(); + // writer.sequential(client, verticesA, verticesB, edges); + // writer.parallel(client, verticesA, verticesB, edges); + // writer.sequentialBatch(client, verticesA, verticesB, edges); + writer.sequentialAsync(client, verticesA, verticesB, edges); + watch.status("Total"); } } diff --git a/interactive_engine/groot-client/src/main/java/com/alibaba/graphscope/groot/sdk/example/TimeWatch.java b/interactive_engine/groot-client/src/main/java/com/alibaba/graphscope/groot/sdk/example/TimeWatch.java new file mode 100644 index 000000000000..d92db897773b --- /dev/null +++ b/interactive_engine/groot-client/src/main/java/com/alibaba/graphscope/groot/sdk/example/TimeWatch.java @@ -0,0 +1,37 @@ +package com.alibaba.graphscope.groot.sdk.example; + +import java.util.concurrent.TimeUnit; + +class TimeWatch { + long starts; + + public static TimeWatch start() { + return new TimeWatch(); + } + + private TimeWatch() { + reset(); + } + + public TimeWatch reset() { + starts = System.currentTimeMillis(); + return this; + } + + public long time() { + long ends = System.currentTimeMillis(); + return ends - starts; + } + + public long time(TimeUnit unit) { + return unit.convert(time(), TimeUnit.MILLISECONDS); + } + + public void status(String prefix) { + System.out.println(prefix + ": " + time() + " ms"); + } + + public void status() { + status("Duration"); + } +} diff --git a/interactive_engine/groot-client/src/main/java/com/alibaba/graphscope/groot/sdk/schema/Edge.java b/interactive_engine/groot-client/src/main/java/com/alibaba/graphscope/groot/sdk/schema/Edge.java new file mode 100644 index 000000000000..933d292c09b0 --- /dev/null +++ b/interactive_engine/groot-client/src/main/java/com/alibaba/graphscope/groot/sdk/schema/Edge.java @@ -0,0 +1,115 @@ +package com.alibaba.graphscope.groot.sdk.schema; + +import com.alibaba.graphscope.proto.groot.*; + +import java.util.Map; + +public class Edge { + public String label; + public String srcLabel; + public String dstLabel; + public Map srcPk; + public Map dstPk; + public Map properties; + + /** + * Construct an edge + * @param label edge label + * @param srcLabel source vertex label + * @param dstLabel destination vertex label + * @param srcPk source primary keys + * @param dstPk destination primary keys + * @param properties edge properties + */ + public Edge( + String label, + String srcLabel, + String dstLabel, + Map srcPk, + Map dstPk, + Map properties) { + this.label = label; + this.srcLabel = srcLabel; + this.dstLabel = dstLabel; + this.srcPk = srcPk; + this.dstPk = dstPk; + this.properties = properties; + } + + public Edge( + String label, + String srcLabel, + String dstLabel, + Map srcPk, + Map dstPk) { + this(label, srcLabel, dstLabel, srcPk, dstPk, null); + } + + public Edge(String label, Vertex src, Vertex dst, Map properties) { + this( + label, + src.getLabel(), + dst.getLabel(), + src.getProperties(), + dst.getProperties(), + properties); + } + + public Edge(String label, Vertex src, Vertex dst) { + this(label, src, dst, null); + } + + public String getLabel() { + return label; + } + + public String getSrcLabel() { + return srcLabel; + } + + public String getDstLabel() { + return dstLabel; + } + + public Map getSrcPk() { + return srcPk; + } + + public Map getDstPk() { + return dstPk; + } + + public Map getProperties() { + return properties; + } + + public EdgeRecordKeyPb toEdgeRecordKey() { + return EdgeRecordKeyPb.newBuilder() + .setLabel(label) + .setSrcVertexKey(toVertexRecordKey(srcLabel, srcPk)) + .setDstVertexKey(toVertexRecordKey(dstLabel, dstPk)) + .build(); + } + + public DataRecordPb toDataRecord() { + DataRecordPb.Builder builder = + DataRecordPb.newBuilder().setEdgeRecordKey(toEdgeRecordKey()); + if (properties != null) { + builder.putAllProperties(properties); + } + return builder.build(); + } + + public WriteRequestPb toWriteRequest(WriteTypePb writeType) { + return WriteRequestPb.newBuilder() + .setWriteType(writeType) + .setDataRecord(toDataRecord()) + .build(); + } + + private VertexRecordKeyPb toVertexRecordKey(String label, Map properties) { + VertexRecordKeyPb.Builder builder = VertexRecordKeyPb.newBuilder().setLabel(label); + builder.putAllPkProperties(properties); + return builder.build(); + } +} diff --git a/interactive_engine/groot-client/src/main/java/com/alibaba/graphscope/groot/sdk/schema/Vertex.java b/interactive_engine/groot-client/src/main/java/com/alibaba/graphscope/groot/sdk/schema/Vertex.java new file mode 100644 index 000000000000..71b66a074ded --- /dev/null +++ b/interactive_engine/groot-client/src/main/java/com/alibaba/graphscope/groot/sdk/schema/Vertex.java @@ -0,0 +1,52 @@ +package com.alibaba.graphscope.groot.sdk.schema; + +import com.alibaba.graphscope.proto.groot.DataRecordPb; +import com.alibaba.graphscope.proto.groot.VertexRecordKeyPb; +import com.alibaba.graphscope.proto.groot.WriteRequestPb; +import com.alibaba.graphscope.proto.groot.WriteTypePb; + +import java.util.Map; + +public class Vertex { + public String label; + public Map properties; + + public Vertex(String label) { + this(label, null); + } + + public Vertex(String label, Map properties) { + this.label = label; + this.properties = properties; + } + + public String getLabel() { + return label; + } + + public Map getProperties() { + return properties; + } + + public VertexRecordKeyPb toVertexRecordKey(String label) { + VertexRecordKeyPb.Builder builder = VertexRecordKeyPb.newBuilder().setLabel(label); + // builder.putAllPkProperties(properties); + return builder.build(); + } + + public DataRecordPb toDataRecord() { + DataRecordPb.Builder builder = + DataRecordPb.newBuilder().setVertexRecordKey(toVertexRecordKey(label)); + if (properties != null) { + builder.putAllProperties(properties); + } + return builder.build(); + } + + public WriteRequestPb toWriteRequest(WriteTypePb writeType) { + return WriteRequestPb.newBuilder() + .setWriteType(writeType) + .setDataRecord(toDataRecord()) + .build(); + } +} diff --git a/interactive_engine/groot-client/src/test/java/com/alibaba/graphscope/groot/sdk/ClientBackupTest.java b/interactive_engine/groot-client/src/test/java/com/alibaba/graphscope/groot/sdk/ClientBackupTest.java index ddd9181179a7..3091722b2de7 100644 --- a/interactive_engine/groot-client/src/test/java/com/alibaba/graphscope/groot/sdk/ClientBackupTest.java +++ b/interactive_engine/groot-client/src/test/java/com/alibaba/graphscope/groot/sdk/ClientBackupTest.java @@ -13,6 +13,7 @@ */ package com.alibaba.graphscope.groot.sdk; +import com.alibaba.graphscope.groot.sdk.schema.Vertex; import com.alibaba.graphscope.proto.groot.BackupInfoPb; import org.junit.jupiter.api.Assertions; @@ -50,9 +51,8 @@ public void testBackup() throws InterruptedException, IOException, URISyntaxExce properties.put("id", "" + i); properties.put("name", "young_" + i); properties.put("age", "18"); - client.addVertex("person", properties); + client.addVertex(new Vertex("person", properties)); } - client.commit(); Thread.sleep(3000L); List backupInfoList; @@ -68,9 +68,8 @@ public void testBackup() throws InterruptedException, IOException, URISyntaxExce properties.put("id", "" + i); properties.put("name", "lop_" + i); properties.put("lang", "java"); - client.addVertex("software", properties); + client.addVertex(new Vertex("software", properties)); } - client.commit(); Thread.sleep(3000L); int backupId2 = client.createNewGraphBackup(); diff --git a/interactive_engine/groot-client/src/test/java/com/alibaba/graphscope/groot/sdk/ClientTest.java b/interactive_engine/groot-client/src/test/java/com/alibaba/graphscope/groot/sdk/ClientTest.java index b8baa3122c6f..7a57ca22a558 100644 --- a/interactive_engine/groot-client/src/test/java/com/alibaba/graphscope/groot/sdk/ClientTest.java +++ b/interactive_engine/groot-client/src/test/java/com/alibaba/graphscope/groot/sdk/ClientTest.java @@ -13,6 +13,8 @@ */ package com.alibaba.graphscope.groot.sdk; +import com.alibaba.graphscope.groot.sdk.schema.Edge; +import com.alibaba.graphscope.groot.sdk.schema.Vertex; import com.alibaba.graphscope.proto.groot.GraphDefPb; import org.junit.jupiter.api.Test; @@ -70,30 +72,29 @@ void testGetSchema() { @Test void testAddData() { - client.initWriteSession(); Map properties = new HashMap<>(); properties.put("name", "alice"); properties.put("id", "12345"); - client.addVertex("person", properties); + client.addVertex(new Vertex("person", properties)); properties = new HashMap<>(); properties.put("name", "bob"); properties.put("id", "88888"); - client.addVertex("person", properties); + client.addVertex(new Vertex("person", properties)); for (int i = 0; i < 100; i++) { properties = new HashMap<>(); properties.put("name", "test" + i); properties.put("id", "" + i); - client.addVertex("person", properties); + client.addVertex(new Vertex("person", properties)); } client.addEdge( - "knows", - "person", - "person", - Collections.singletonMap("id", "12345"), - Collections.singletonMap("id", "88888"), - Collections.singletonMap("weight", "20201111")); - client.commit(); + new Edge( + "knows", + "person", + "person", + Collections.singletonMap("id", "12345"), + Collections.singletonMap("id", "88888"), + Collections.singletonMap("weight", "20201111"))); } } diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/GarbageCollectManager.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/GarbageCollectManager.java index 91ab1840bb16..e8f9eda06231 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/GarbageCollectManager.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/GarbageCollectManager.java @@ -44,12 +44,9 @@ public void start() { for (int i = 0; i < CommonConfig.STORE_NODE_COUNT.get(configs); i++) { CoordinatorSnapshotClient client = clients.getClient(i); client.synchronizeSnapshot(offlineVersion); - logger.info( - "Offline version of store [" - + i - + "] updated to [" - + offlineVersion - + "]"); + if (i == 0 && offlineVersion % 100 == 0) { + logger.info("Offline version updated to {}", offlineVersion); + } } } } catch (Exception e) { diff --git a/interactive_engine/groot-server/src/test/java/com/alibaba/graphscope/groot/tests/gremlin/GrootGraph.java b/interactive_engine/groot-server/src/test/java/com/alibaba/graphscope/groot/tests/gremlin/GrootGraph.java index e03b812a490a..b03317ce38d8 100644 --- a/interactive_engine/groot-server/src/test/java/com/alibaba/graphscope/groot/tests/gremlin/GrootGraph.java +++ b/interactive_engine/groot-server/src/test/java/com/alibaba/graphscope/groot/tests/gremlin/GrootGraph.java @@ -20,6 +20,8 @@ import com.alibaba.graphscope.groot.common.config.GremlinConfig; import com.alibaba.graphscope.groot.common.exception.GrootException; import com.alibaba.graphscope.groot.sdk.GrootClient; +import com.alibaba.graphscope.groot.sdk.schema.Edge; +import com.alibaba.graphscope.groot.sdk.schema.Vertex; import com.alibaba.graphscope.groot.servers.MaxNode; import com.alibaba.graphscope.groot.servers.NodeBase; import com.alibaba.graphscope.groot.tests.common.GrootIORegistry; @@ -36,9 +38,7 @@ import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource; import org.apache.tinkerpop.gremlin.process.traversal.strategy.finalization.ProfileStrategy; import org.apache.tinkerpop.gremlin.process.traversal.strategy.optimization.FilterRankingStrategy; -import org.apache.tinkerpop.gremlin.structure.Edge; import org.apache.tinkerpop.gremlin.structure.Transaction; -import org.apache.tinkerpop.gremlin.structure.Vertex; import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,7 +51,6 @@ import java.nio.file.Paths; import java.util.Collections; import java.util.HashMap; -import java.util.Iterator; import java.util.Map; public class GrootGraph extends RemoteTestGraph { @@ -134,96 +133,100 @@ public void loadData(LoadGraphWith.GraphData graphData) throws InterruptedExcept Thread.sleep(5000); String graphName = graphData.name().toLowerCase(); if (graphName.equals("modern")) { - ddlClient.initWriteSession(); - + long snapshotId; Map v1 = new HashMap<>(); v1.put("id", "1"); v1.put("name", "marko"); v1.put("age", "29"); - ddlClient.addVertex("person", v1); + ddlClient.addVertex(new Vertex("person", v1)); Map v2 = new HashMap<>(); v2.put("id", "2"); v2.put("name", "vadas"); v2.put("age", "27"); - ddlClient.addVertex("person", v2); + ddlClient.addVertex(new Vertex("person", v2)); Map v4 = new HashMap<>(); v4.put("id", "4"); v4.put("name", "josh"); v4.put("age", "32"); - ddlClient.addVertex("person", v4); + ddlClient.addVertex(new Vertex("person", v4)); Map v6 = new HashMap<>(); v6.put("id", "6"); v6.put("name", "peter"); v6.put("age", "35"); - ddlClient.addVertex("person", v6); + ddlClient.addVertex(new Vertex("person", v6)); Map v3 = new HashMap<>(); v3.put("id", "3"); v3.put("name", "lop"); v3.put("lang", "java"); - ddlClient.addVertex("software", v3); + ddlClient.addVertex(new Vertex("software", v3)); Map v5 = new HashMap<>(); v5.put("id", "5"); v5.put("name", "ripple"); v5.put("lang", "java"); - ddlClient.addVertex("software", v5); - - ddlClient.commit(); - Thread.sleep(5000); + snapshotId = ddlClient.addVertex(new Vertex("software", v5)); - ddlClient.addEdge( - "knows", - "person", - "person", - Collections.singletonMap("id", "1"), - Collections.singletonMap("id", "2"), - Collections.singletonMap("weight", "0.5")); + ddlClient.remoteFlush(snapshotId); ddlClient.addEdge( - "created", - "person", - "software", - Collections.singletonMap("id", "1"), - Collections.singletonMap("id", "3"), - Collections.singletonMap("weight", "0.4")); + new Edge( + "knows", + "person", + "person", + Collections.singletonMap("id", "1"), + Collections.singletonMap("id", "2"), + Collections.singletonMap("weight", "0.5"))); ddlClient.addEdge( - "knows", - "person", - "person", - Collections.singletonMap("id", "1"), - Collections.singletonMap("id", "4"), - Collections.singletonMap("weight", "1.0")); + new Edge( + "created", + "person", + "software", + Collections.singletonMap("id", "1"), + Collections.singletonMap("id", "3"), + Collections.singletonMap("weight", "0.4"))); ddlClient.addEdge( - "created", - "person", - "software", - Collections.singletonMap("id", "4"), - Collections.singletonMap("id", "3"), - Collections.singletonMap("weight", "0.4")); + new Edge( + "knows", + "person", + "person", + Collections.singletonMap("id", "1"), + Collections.singletonMap("id", "4"), + Collections.singletonMap("weight", "1.0"))); ddlClient.addEdge( - "created", - "person", - "software", - Collections.singletonMap("id", "4"), - Collections.singletonMap("id", "5"), - Collections.singletonMap("weight", "1.0")); + new Edge( + "created", + "person", + "software", + Collections.singletonMap("id", "4"), + Collections.singletonMap("id", "3"), + Collections.singletonMap("weight", "0.4"))); ddlClient.addEdge( - "created", - "person", - "software", - Collections.singletonMap("id", "6"), - Collections.singletonMap("id", "3"), - Collections.singletonMap("weight", "0.2")); - - ddlClient.commit(); + new Edge( + "created", + "person", + "software", + Collections.singletonMap("id", "4"), + Collections.singletonMap("id", "5"), + Collections.singletonMap("weight", "1.0"))); + + snapshotId = + ddlClient.addEdge( + new Edge( + "created", + "person", + "software", + Collections.singletonMap("id", "6"), + Collections.singletonMap("id", "3"), + Collections.singletonMap("weight", "0.2"))); + ddlClient.remoteFlush(snapshotId); Thread.sleep(5000); } else { throw new UnsupportedOperationException("graph " + graphName + " is unsupported yet"); @@ -239,11 +242,6 @@ public GraphTraversalSource traversal() { return source; } - @Override - public Vertex addVertex(Object... keyValues) { - throw new UnsupportedOperationException(); - } - @Override public C compute(Class graphComputerClass) throws IllegalArgumentException { @@ -255,16 +253,6 @@ public GraphComputer compute() throws IllegalArgumentException { throw new UnsupportedOperationException(); } - @Override - public Iterator vertices(Object... vertexIds) { - throw new UnsupportedOperationException(); - } - - @Override - public Iterator edges(Object... edgeIds) { - throw new UnsupportedOperationException(); - } - @Override public Transaction tx() { throw new UnsupportedOperationException();