14
14
import psycopg2
15
15
import pyarrow .flight as flight
16
16
import re
17
+ import requests
17
18
import threading
18
19
from typing import Any , Dict , List
19
20
@@ -27,6 +28,7 @@ class Constants:
27
28
I_COLUMN_NAME = 0
28
29
PASSWORD = "password"
29
30
PORT = "port"
31
+ RESULT = "result"
30
32
USERNAME = "username"
31
33
32
34
@@ -780,6 +782,89 @@ def cleanup_migrate_memgraph():
780
782
mgp .add_batch_read_proc (memgraph , init_migrate_memgraph , cleanup_migrate_memgraph )
781
783
782
784
785
+ servicenow_dict = {}
786
+
787
+
788
+ def init_migrate_servicenow (
789
+ endpoint : str ,
790
+ config : mgp .Map ,
791
+ config_path : str = "" ,
792
+ params : mgp .Nullable [mgp .Any ] = None ,
793
+ ):
794
+ """
795
+ Initialize the connection to the ServiceNow REST API and fetch the JSON data.
796
+
797
+ :param endpoint: ServiceNow API endpoint (full URL)
798
+ :param config: Configuration map containing authentication details (username, password, instance URL, etc.)
799
+ :param config_path: Optional path to a JSON file containing authentication details
800
+ :param params: Optional query parameters for filtering results
801
+ """
802
+ global servicenow_dict
803
+
804
+ if len (config_path ) > 0 :
805
+ config = _combine_config (config = config , config_path = config_path )
806
+
807
+ auth = (config .get (Constants .USERNAME ), config .get (Constants .PASSWORD ))
808
+ headers = {"Accept" : "application/json" }
809
+
810
+ response = requests .get (endpoint , auth = auth , headers = headers , params = params )
811
+ response .raise_for_status ()
812
+
813
+ data = response .json ().get (Constants .RESULT , [])
814
+ if not data :
815
+ raise ValueError ("No data found in ServiceNow response" )
816
+
817
+ thread_id = threading .get_native_id ()
818
+ if thread_id not in servicenow_dict :
819
+ servicenow_dict [thread_id ] = {}
820
+
821
+ servicenow_dict [thread_id ][Constants .CURSOR ] = iter (data )
822
+
823
+
824
+ def servicenow (
825
+ endpoint : str ,
826
+ config : mgp .Map ,
827
+ config_path : str = "" ,
828
+ params : mgp .Nullable [mgp .Any ] = None ,
829
+ ) -> mgp .Record (row = mgp .Map ):
830
+ """
831
+ Fetch rows from the ServiceNow REST API in batches.
832
+
833
+ :param endpoint: ServiceNow API endpoint (full URL)
834
+ :param config: Authentication details (username, password, instance URL, etc.)
835
+ :param config_path: Optional path to a JSON file containing authentication details
836
+ :param params: Optional query parameters for filtering results
837
+ :return: The result data as a stream of rows
838
+ """
839
+ global servicenow_dict
840
+
841
+ thread_id = threading .get_native_id ()
842
+ data_iter = servicenow_dict [thread_id ][Constants .CURSOR ]
843
+
844
+ batch_rows = []
845
+ for _ in range (Constants .BATCH_SIZE ):
846
+ try :
847
+ row = next (data_iter )
848
+ batch_rows .append (mgp .Record (row = row ))
849
+ except StopIteration :
850
+ break
851
+
852
+ return batch_rows
853
+
854
+
855
+ def cleanup_migrate_servicenow ():
856
+ """
857
+ Clean up ServiceNow dictionary references per-thread.
858
+ """
859
+ global servicenow_dict
860
+
861
+ thread_id = threading .get_native_id ()
862
+ servicenow_dict .pop (thread_id , None )
863
+
864
+
865
+ mgp .add_batch_read_proc (servicenow , init_migrate_servicenow , cleanup_migrate_servicenow )
866
+
867
+
783
868
def _formulate_cypher_query (label_or_rel_or_query : str ) -> str :
784
869
words = label_or_rel_or_query .split ()
785
870
if len (words ) > 1 :
0 commit comments