diff --git a/.doc_gen/metadata/neptune_metadata.yaml b/.doc_gen/metadata/neptune_metadata.yaml index 6a616a3e6cc..821bd4a020c 100644 --- a/.doc_gen/metadata/neptune_metadata.yaml +++ b/.doc_gen/metadata/neptune_metadata.yaml @@ -5,6 +5,14 @@ neptune_Hello: synopsis: get started using &neptune;. category: Hello languages: + Python: + versions: + - sdk_version: 3 + github: python/example_code/neptune + excerpts: + - description: + snippet_tags: + - neptune.python.hello.main Java: versions: - sdk_version: 2 @@ -18,6 +26,14 @@ neptune_Hello: neptune: {DescribeDBClustersPaginator} neptune_ExecuteQuery: languages: + Python: + versions: + - sdk_version: 3 + github: python/example_code/neptune + excerpts: + - description: + snippet_tags: + - neptune.python.graph.execute.main Java: versions: - sdk_version: 2 @@ -31,6 +47,14 @@ neptune_ExecuteQuery: neptune: {ExecuteQuery} neptune_CreateGraph: languages: + Python: + versions: + - sdk_version: 3 + github: python/example_code/neptune + excerpts: + - description: + snippet_tags: + - neptune.python.graph.create.main Java: versions: - sdk_version: 2 @@ -44,6 +68,14 @@ neptune_CreateGraph: neptune: {CreateGraph} neptune_ExecuteOpenCypherExplainQuery: languages: + Python: + versions: + - sdk_version: 3 + github: python/example_code/neptune + excerpts: + - description: + snippet_tags: + - neptune.python.data.query.opencypher.main Java: versions: - sdk_version: 2 @@ -57,6 +89,14 @@ neptune_ExecuteOpenCypherExplainQuery: neptune: {ExecuteOpenCypherExplainQuery} neptune_ExecuteGremlinProfileQuery: languages: + Python: + versions: + - sdk_version: 3 + github: python/example_code/neptune + excerpts: + - description: + snippet_tags: + - neptune.python.data.query.gremlin.main Java: versions: - sdk_version: 2 @@ -70,6 +110,14 @@ neptune_ExecuteGremlinProfileQuery: neptune: {ExecuteGremlinProfileQuery} neptune_ExecuteGremlinQuery: languages: + Python: + versions: + - sdk_version: 3 + github: python/example_code/neptune + excerpts: + - description: + snippet_tags: + - neptune.python.data.query.gremlin.profile.main Java: versions: - sdk_version: 2 @@ -83,6 +131,14 @@ neptune_ExecuteGremlinQuery: neptune: {ExecuteGremlinQuery} neptune_DeleteDBSubnetGroup: languages: + Python: + versions: + - sdk_version: 3 + github: python/example_code/neptune + excerpts: + - description: + snippet_tags: + - neptune.python.delete.subnet.group.main Java: versions: - sdk_version: 2 @@ -96,6 +152,14 @@ neptune_DeleteDBSubnetGroup: neptune: {DeleteDBSubnetGroup} neptune_DeleteDBCluster: languages: + Python: + versions: + - sdk_version: 3 + github: python/example_code/neptune + excerpts: + - description: + snippet_tags: + - neptune.python.delete.cluster.main Java: versions: - sdk_version: 2 @@ -109,6 +173,14 @@ neptune_DeleteDBCluster: neptune: {DeleteDBCluster} neptune_DeleteDBInstance: languages: + Python: + versions: + - sdk_version: 3 + github: python/example_code/neptune + excerpts: + - description: + snippet_tags: + - neptune.python.delete.instance.main Java: versions: - sdk_version: 2 @@ -122,6 +194,14 @@ neptune_DeleteDBInstance: neptune: {DeleteDBInstance} neptune_StartDBCluster: languages: + Python: + versions: + - sdk_version: 3 + github: python/example_code/neptune + excerpts: + - description: + snippet_tags: + - neptune.python.start.cluster.main Java: versions: - sdk_version: 2 @@ -135,6 +215,14 @@ neptune_StartDBCluster: neptune: {StartDBCluster} neptune_StopDBCluster: languages: + Python: + versions: + - sdk_version: 3 + github: python/example_code/neptune + excerpts: + - description: + snippet_tags: + - neptune.python.stop.cluster.main Java: versions: - sdk_version: 2 @@ -148,6 +236,14 @@ neptune_StopDBCluster: neptune: {StopDBCluster} neptune_DescribeDBClusters: languages: + Python: + versions: + - sdk_version: 3 + github: python/example_code/neptune + excerpts: + - description: + snippet_tags: + - neptune.python.describe.cluster.main Java: versions: - sdk_version: 2 @@ -161,6 +257,14 @@ neptune_DescribeDBClusters: neptune: {DescribeDBClusters} neptune_DescribeDBInstances: languages: + Python: + versions: + - sdk_version: 3 + github: python/example_code/neptune + excerpts: + - description: + snippet_tags: + - neptune.python.describe.dbinstance.main Java: versions: - sdk_version: 2 @@ -174,6 +278,14 @@ neptune_DescribeDBInstances: neptune: {DescribeDBInstances} neptune_CreateDBInstance: languages: + Python: + versions: + - sdk_version: 3 + github: python/example_code/neptune + excerpts: + - description: + snippet_tags: + - neptune.python.create.dbinstance.main Java: versions: - sdk_version: 2 @@ -187,6 +299,14 @@ neptune_CreateDBInstance: neptune: {CreateDBInstance} neptune_CreateDBCluster: languages: + Python: + versions: + - sdk_version: 3 + github: python/example_code/neptune + excerpts: + - description: + snippet_tags: + - neptune.python.create.cluster.main Java: versions: - sdk_version: 2 @@ -200,6 +320,14 @@ neptune_CreateDBCluster: neptune: {CreateDBCluster} neptune_CreateDBSubnetGroup: languages: + Python: + versions: + - sdk_version: 3 + github: python/example_code/neptune + excerpts: + - description: + snippet_tags: + - neptune.python.create.subnet.main Java: versions: - sdk_version: 2 @@ -223,6 +351,14 @@ neptune_Scenario: - Delete the &neptune; Assets. category: Basics languages: + Python: + versions: + - sdk_version: 3 + github: python/example_code/neptune + excerpts: + - description: + snippet_tags: + - neptune.python.scenario.main Java: versions: - sdk_version: 2 diff --git a/python/example_code/neptune/README.md b/python/example_code/neptune/README.md new file mode 100644 index 00000000000..df881f4d6bc --- /dev/null +++ b/python/example_code/neptune/README.md @@ -0,0 +1,142 @@ +# Neptune code examples for the SDK for Python + +## Overview + +Shows how to use the AWS SDK for Python (Boto3) to work with Amazon Neptune. + + + + +_Neptune is a serverless graph database designed for superior scalability and availability._ + +## ⚠ Important + +* Running this code might result in charges to your AWS account. For more details, see [AWS Pricing](https://aws.amazon.com/pricing/) and [Free Tier](https://aws.amazon.com/free/). +* Running the tests might result in charges to your AWS account. +* We recommend that you grant your code least privilege. At most, grant only the minimum permissions required to perform the task. For more information, see [Grant least privilege](https://docs.aws.amazon.com/IAM/latest/UserGuide/best-practices.html#grant-least-privilege). +* This code is not tested in every AWS Region. For more information, see [AWS Regional Services](https://aws.amazon.com/about-aws/global-infrastructure/regional-product-services). + + + + +## Code examples + +### Prerequisites + +For prerequisites, see the [README](../../README.md#Prerequisites) in the `python` folder. + +Install the packages required by these examples by running the following in a virtual environment: + +``` +python -m pip install -r requirements.txt +``` + + + + +### Get started + +- [Hello Neptune](HelloNeptune.py#L4) (`DescribeDBClustersPaginator`) + + +### Basics + +Code examples that show you how to perform the essential operations within a service. + +- [Learn the basics](NeptuneScenario.py) + + +### Single actions + +Code excerpts that show you how to call individual service functions. + +- [CreateDBCluster](NeptuneScenario.py#L288) +- [CreateDBInstance](NeptuneScenario.py#L269) +- [CreateDBSubnetGroup](NeptuneScenario.py#L335) +- [CreateGraph](analytics/CreateNeptuneGraphExample.py#L7) +- [DeleteDBCluster](NeptuneScenario.py#L14) +- [DeleteDBInstance](NeptuneScenario.py#L77) +- [DeleteDBSubnetGroup](NeptuneScenario.py#L95) +- [DescribeDBClusters](NeptuneScenario.py#L203) +- [DescribeDBInstances](NeptuneScenario.py#L242) +- [ExecuteGremlinProfileQuery](database/NeptuneGremlinQueryExample.py#L22) +- [ExecuteGremlinQuery](database/NeptuneGremlinExplainAndProfileExample.py#L8) +- [ExecuteOpenCypherExplainQuery](database/OpenCypherExplainExample.py#L22) +- [ExecuteQuery](analytics/NeptuneAnalyticsQueryExample.py#L7) +- [StartDBCluster](NeptuneScenario.py#L161) +- [StopDBCluster](NeptuneScenario.py#L183) + + + + + +## Run the examples + +### Instructions + + + + + +#### Hello Neptune + +This example shows you how to get started using Neptune. + +``` +python HelloNeptune.py +``` + +#### Learn the basics + +This example shows you how to do the following: + +- Create an Amazon Neptune Subnet Group. +- Create an Neptune Cluster. +- Create an Neptune Instance. +- Check the status of the Neptune Instance. +- Show Neptune cluster details. +- Stop the Neptune cluster. +- Start the Neptune cluster. +- Delete the Neptune Assets. + + + + +Start the example by running the following at a command prompt: + +``` +python NeptuneScenario.py +``` + + + + + + +### Tests + +⚠ Running tests might result in charges to your AWS account. + + +To find instructions for running these tests, see the [README](../../README.md#Tests) +in the `python` folder. + + + + + + +## Additional resources + +- [Neptune User Guide](https://docs.aws.amazon.com/neptune/latest/userguide/intro.html) +- [Neptune API Reference](https://docs.aws.amazon.com/neptune/latest/apiref/Welcome.html) +- [SDK for Python Neptune reference](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/iam.html) + + + + +--- + +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 diff --git a/python/example_code/neptune/analytics/create_neptune_graph_example.py b/python/example_code/neptune/analytics/create_neptune_graph_example.py new file mode 100644 index 00000000000..12975467903 --- /dev/null +++ b/python/example_code/neptune/analytics/create_neptune_graph_example.py @@ -0,0 +1,70 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 + +import boto3 +from botocore.exceptions import ClientError, BotoCoreError + +# snippet-start:[neptune.python.graph.create.main] +""" +Running this example. + +---------------------------------------------------------------------------------- +VPC Networking Requirement: +---------------------------------------------------------------------------------- +Amazon Neptune must be accessed from **within the same VPC** as the Neptune cluster. +It does not expose a public endpoint, so this code must be executed from: + + - An **AWS Lambda function** configured to run inside the same VPC + - An **EC2 instance** or **ECS task** running in the same VPC + - A connected environment such as a **VPN**, **AWS Direct Connect**, or a **peered VPC** + +""" + +GRAPH_NAME = "sample-analytics-graph" + +def main(): + """ + Main entry point: create NeptuneGraph client and call graph creation. + """ + # Hypothetical client - boto3 currently doesn't have NeptuneGraph client, so replace with actual client if available + neptune_graph_client = boto3.client("neptune") + + execute_create_graph(neptune_graph_client, GRAPH_NAME) + + +def execute_create_graph(client, graph_name): + """ + Creates a new Neptune graph. + + :param client: Boto3 Neptune graph client (hypothetical) + :param graph_name: Name of the graph to create + """ + try: + print("Creating Neptune graph...") + + # Hypothetical method for create_graph, adjust accordingly if you use HTTP API or SDK extensions + response = client.create_graph( + GraphName=graph_name, + ProvisionedMemory=16 # Example parameter, adjust if API differs + ) + + created_graph_name = response.get("Name") + graph_arn = response.get("Arn") + graph_endpoint = response.get("Endpoint") + + print("Graph created successfully!") + print(f"Graph Name: {created_graph_name}") + print(f"Graph ARN: {graph_arn}") + print(f"Graph Endpoint: {graph_endpoint}") + + except ClientError as e: + print(f"Failed to create graph: {e.response['Error']['Message']}") + except BotoCoreError as e: + print(f"Failed to create graph: {str(e)}") + except Exception as e: + print(f"Unexpected error: {str(e)}") + + +if __name__ == "__main__": + main() +# snippet-end:[neptune.python.graph.create.main] \ No newline at end of file diff --git a/python/example_code/neptune/analytics/neptune_analytics_query_example.py b/python/example_code/neptune/analytics/neptune_analytics_query_example.py new file mode 100644 index 00000000000..c205511f9bb --- /dev/null +++ b/python/example_code/neptune/analytics/neptune_analytics_query_example.py @@ -0,0 +1,71 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 + +import boto3 +from botocore.exceptions import ClientError + +# snippet-start:[neptune.python.graph.execute.main] +""" +Running this example. + +---------------------------------------------------------------------------------- +VPC Networking Requirement: +---------------------------------------------------------------------------------- +Amazon Neptune must be accessed from **within the same VPC** as the Neptune cluster. +It does not expose a public endpoint, so this code must be executed from: + + - An **AWS Lambda function** configured to run inside the same VPC + - An **EC2 instance** or **ECS task** running in the same VPC + - A connected environment such as a **VPN**, **AWS Direct Connect**, or a **peered VPC** + +""" + +NEPTUNE_ANALYTICS_ENDPOINT = "https://:8182" +GRAPH_ID = "" + +def main(): + # Build the boto3 client for neptune-graph with endpoint override + client = boto3.client( + "neptune-graph", + endpoint_url=NEPTUNE_ANALYTICS_ENDPOINT + ) + + try: + execute_gremlin_profile_query(client, GRAPH_ID) + except Exception as e: + print(f"Unexpected error in main: {e}") + +def execute_gremlin_profile_query(client, graph_id): + """ + Executes a Gremlin or OpenCypher query on Neptune Analytics graph. + + Args: + client (boto3.client): The NeptuneGraph client. + graph_id (str): The graph identifier. + """ + print("Running openCypher query on Neptune Analytics...") + + try: + response = client.execute_query( + GraphIdentifier=graph_id, + QueryString="MATCH (n {code: 'ANC'}) RETURN n", + Language="OPEN_CYPHER" + ) + + # The response 'Payload' may contain the query results as a streaming bytes object + # Convert to string and print + if 'Payload' in response: + result = response['Payload'].read().decode('utf-8') + print("Query Result:") + print(result) + else: + print("No query result returned.") + + except ClientError as e: + print(f"NeptuneGraph error: {e.response['Error']['Message']}") + except Exception as e: + print(f"Unexpected error: {e}") + +if __name__ == "__main__": + main() +# snippet-end:[neptune.python.graph.execute.main] \ No newline at end of file diff --git a/python/example_code/neptune/database/gremlin_profile_query_example.py b/python/example_code/neptune/database/gremlin_profile_query_example.py new file mode 100644 index 00000000000..dd2c459b691 --- /dev/null +++ b/python/example_code/neptune/database/gremlin_profile_query_example.py @@ -0,0 +1,75 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 + +# snippet-start: [neptune.python.data.query.gremlin.profile.main] +import boto3 +import json +from botocore.config import Config +from botocore.exceptions import BotoCoreError, ClientError + +""" +Running this example. + +---------------------------------------------------------------------------------- +VPC Networking Requirement: +---------------------------------------------------------------------------------- +Amazon Neptune must be accessed from **within the same VPC** as the Neptune cluster. +It does not expose a public endpoint, so this code must be executed from: + + - An **AWS Lambda function** configured to run inside the same VPC + - An **EC2 instance** or **ECS task** running in the same VPC + - A connected environment such as a **VPN**, **AWS Direct Connect**, or a **peered VPC** + +""" + + +# Customize this with your Neptune endpoint +NEPTUNE_ENDPOINT = "https://:8182" + +def execute_gremlin_profile_query(client): + """ + Executes a Gremlin PROFILE query using the provided Neptune client. + """ + print("Executing Gremlin PROFILE query...") + + try: + response = client.execute_gremlin_profile_query( + gremlinQuery="g.V().has('code', 'ANC')" + ) + output = response.get("output") + + if output: + print("Query Profile Output:") + print(json.dumps(output, indent=2)) + else: + print("No output returned from the profile query.") + + except ClientError as e: + print(f"Neptune error: {e.response['Error']['Message']}") + except BotoCoreError as e: + print(f"Unexpected Boto3 error: {str(e)}") + except Exception as e: + print(f"Unexpected error: {str(e)}") + +def main(): + """ + Main entry point: creates the Neptune client and runs the profile query. + """ + + # * To prevent unneccesary retries please set the total_max_attempts to 1 + # * To prevent a read timeout on the client when a query runs longer than 60 seconds set the read_timeout to None + config = Config(retries={"total_max_attempts": 1, "mode": "standard"}, read_timeout=None) + + neptune_client = boto3.client( + "neptunedata", + endpoint_url=NEPTUNE_ENDPOINT, + config=config + ) + + execute_gremlin_profile_query(neptune_client) + + +if __name__ == "__main__": + main() + +# snippet-end: [neptune.python.data.query.gremlin.profile.main] diff --git a/python/example_code/neptune/database/neptune_gremlin_explain_and_profile_example.py b/python/example_code/neptune/database/neptune_gremlin_explain_and_profile_example.py new file mode 100644 index 00000000000..25779fd76c6 --- /dev/null +++ b/python/example_code/neptune/database/neptune_gremlin_explain_and_profile_example.py @@ -0,0 +1,84 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 + +import boto3 +from botocore.config import Config +from botocore.exceptions import BotoCoreError, ClientError + +# snippet-start:[neptune.python.data.query.gremlin.profile.main] +""" +Running this example. + +---------------------------------------------------------------------------------- +VPC Networking Requirement: +---------------------------------------------------------------------------------- +Amazon Neptune must be accessed from **within the same VPC** as the Neptune cluster. +It does not expose a public endpoint, so this code must be executed from: + + - An **AWS Lambda function** configured to run inside the same VPC + - An **EC2 instance** or **ECS task** running in the same VPC + - A connected environment such as a **VPN**, **AWS Direct Connect**, or a **peered VPC** + +""" + +# Replace with your actual Neptune endpoint +NEPTUNE_ENDPOINT = "https://[Specify-Your-Endpoint]:8182" + +def main(): + """ + Entry point of the program. Initializes the Neptune client and runs both EXPLAIN and PROFILE queries. + """ + config = Config(connect_timeout=10, read_timeout=30, retries={'max_attempts': 3}) + + neptune_client = boto3.client( + "neptunedata", + endpoint_url=NEPTUNE_ENDPOINT, + config=config + ) + + try: + run_explain_query(neptune_client) + run_profile_query(neptune_client) + except ClientError as e: + print(f"Neptune error: {e.response['Error']['Message']}") + except BotoCoreError as e: + print(f"BotoCore error: {str(e)}") + except Exception as e: + print(f"Unexpected error: {str(e)}") + + +def run_explain_query(neptune_client): + """ + Runs an EXPLAIN query on the Neptune graph database. + """ + print("Running Gremlin EXPLAIN query...") + + try: + response = neptune_client.execute_gremlin_explain_query( + gremlinQuery="g.V().has('code', 'ANC')" + ) + print("Explain Query Result:") + print(response.get("output", "No explain output returned.")) + except Exception as e: + print(f"Failed to execute EXPLAIN query: {str(e)}") + + +def run_profile_query(neptune_client): + """ + Runs a PROFILE query on the Neptune graph database. + """ + print("Running Gremlin PROFILE query...") + + try: + response = neptune_client.execute_gremlin_profile_query( + gremlinQuery="g.V().has('code', 'ANC')" + ) + print("Profile Query Result:") + print(response.get("output", "No profile output returned.")) + except Exception as e: + print(f"Failed to execute PROFILE query: {str(e)}") + + +if __name__ == "__main__": + main() +# snippet-end:[neptune.python.data.query.gremlin.profile.main] \ No newline at end of file diff --git a/python/example_code/neptune/database/neptune_gremlin_query_example.py b/python/example_code/neptune/database/neptune_gremlin_query_example.py new file mode 100644 index 00000000000..ffcccce41db --- /dev/null +++ b/python/example_code/neptune/database/neptune_gremlin_query_example.py @@ -0,0 +1,72 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 +import boto3 +from botocore.config import Config +from botocore.exceptions import BotoCoreError, ClientError + +""" +Running this example. + +---------------------------------------------------------------------------------- +VPC Networking Requirement: +---------------------------------------------------------------------------------- +Amazon Neptune must be accessed from **within the same VPC** as the Neptune cluster. +It does not expose a public endpoint, so this code must be executed from: + + - An **AWS Lambda function** configured to run inside the same VPC + - An **EC2 instance** or **ECS task** running in the same VPC + - A connected environment such as a **VPN**, **AWS Direct Connect**, or a **peered VPC** + +""" +# snippet-start:[neptune.python.data.query.gremlin.main] +# Replace this with your actual Neptune endpoint +NEPTUNE_ENDPOINT = "https://[Specify Endpoint]:8182" + +def main(): + """ + Entry point of the program. Initializes the Neptune client and executes the Gremlin query. + """ + config = Config(connect_timeout=10, read_timeout=30, retries={'max_attempts': 3}) + + neptune_client = boto3.client( + "neptunedata", + endpoint_url=NEPTUNE_ENDPOINT, + config=config + ) + + execute_gremlin_query(neptune_client) + + +def execute_gremlin_query(neptune_client): + """ + Executes a Gremlin query against an Amazon Neptune database. + + :param neptune_client: Boto3 Neptunedata client + """ + try: + print("Querying Neptune...") + + response = neptune_client.execute_gremlin_query( + gremlinQuery="g.V().has('code', 'ANC')" + ) + + print("Full Response:") + print(response) + + result = response.get("result") + if result: + print("Query Result:") + print(result) + else: + print("No result returned from the query.") + except ClientError as e: + print(f"Error calling Neptune: {e.response['Error']['Message']}") + except BotoCoreError as e: + print(f"BotoCore error: {str(e)}") + except Exception as e: + print(f"Unexpected error: {str(e)}") + + +if __name__ == "__main__": + main() +# snippet-end:[neptune.python.data.query.gremlin.main] \ No newline at end of file diff --git a/python/example_code/neptune/database/open_cypher_explain_example.py b/python/example_code/neptune/database/open_cypher_explain_example.py new file mode 100644 index 00000000000..245338b69ea --- /dev/null +++ b/python/example_code/neptune/database/open_cypher_explain_example.py @@ -0,0 +1,76 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 + +import boto3 +from botocore.config import Config +from botocore.exceptions import ClientError, BotoCoreError + +""" +Running this example. + +---------------------------------------------------------------------------------- +VPC Networking Requirement: +---------------------------------------------------------------------------------- +Amazon Neptune must be accessed from **within the same VPC** as the Neptune cluster. +It does not expose a public endpoint, so this code must be executed from: + + - An **AWS Lambda function** configured to run inside the same VPC + - An **EC2 instance** or **ECS task** running in the same VPC + - A connected environment such as a **VPN**, **AWS Direct Connect**, or a **peered VPC** + +""" +# snippet-start:[neptune.python.data.query.opencypher.main] +# Replace with your actual Neptune endpoint URL +NEPTUNE_ENDPOINT = "https://:8182" + +def main(): + """ + Entry point: Create Neptune client and execute the OpenCypher EXPLAIN query. + """ + config = Config(connect_timeout=10, read_timeout=30, retries={'max_attempts': 3}) + + neptune_client = boto3.client( + "neptunedata", + endpoint_url=NEPTUNE_ENDPOINT, + config=config + ) + + execute_opencypher_explain_query(neptune_client) + + +def execute_opencypher_explain_query(neptune_client): + """ + Executes an OpenCypher EXPLAIN query on Amazon Neptune. + + :param neptune_client: Boto3 Neptunedata client + """ + try: + print("Executing OpenCypher EXPLAIN query...") + + response = neptune_client.execute_open_cypher_explain_query( + openCypherQuery="MATCH (n {code: 'ANC'}) RETURN n", + explainMode="debug" + ) + + results = response.get("results") + if results: + # `results` might be bytes or string, decode if necessary + if isinstance(results, bytes): + print("Explain Results:") + print(results.decode("utf-8")) + else: + print("Explain Results:") + print(results) + else: + print("No explain results returned.") + except ClientError as e: + print(f"Neptune error: {e.response['Error']['Message']}") + except BotoCoreError as e: + print(f"BotoCore error: {str(e)}") + except Exception as e: + print(f"Unexpected error: {str(e)}") + + +if __name__ == "__main__": + main() +# snippet-end:[neptune.python.data.query.opencypher.main] \ No newline at end of file diff --git a/python/example_code/neptune/hello_neptune.py b/python/example_code/neptune/hello_neptune.py new file mode 100644 index 00000000000..82d332efdc1 --- /dev/null +++ b/python/example_code/neptune/hello_neptune.py @@ -0,0 +1,53 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 + +# snippet-start:[neptune.python.hello.main] +import boto3 +from botocore.exceptions import ClientError + + +def describe_db_clusters(neptune_client): + """ + Describes the Amazon Neptune DB clusters using a paginator to handle multiple pages. + Raises ClientError with 'ResourceNotFoundException' if no clusters are found. + """ + paginator = neptune_client.get_paginator("describe_db_clusters") + clusters_found = False + + for page in paginator.paginate(): + for cluster in page.get("DBClusters", []): + clusters_found = True + print(f"Cluster Identifier: {cluster['DBClusterIdentifier']}") + print(f"Status: {cluster['Status']}") + + if not clusters_found: + raise ClientError( + { + "Error": { + "Code": "ResourceNotFoundException", + "Message": "No Neptune DB clusters found." + } + }, + operation_name="DescribeDBClusters" + ) + +def main(): + """ + Main entry point: creates the Neptune client and calls the describe operation. + """ + neptune_client = boto3.client("neptune") + try: + describe_db_clusters(neptune_client) + except ClientError as e: + error_code = e.response["Error"]["Code"] + if error_code == "ResourceNotFoundException": + print(f"Resource not found: {e.response['Error']['Message']}") + else: + print(f"Unexpected ClientError: {e.response['Error']['Message']}") + except Exception as e: + print(f"Unexpected error: {str(e)}") + +if __name__ == "__main__": + main() + +# snippet-end:[neptune.python.hello.main] \ No newline at end of file diff --git a/python/example_code/neptune/neptune_scenario.py b/python/example_code/neptune/neptune_scenario.py new file mode 100644 index 00000000000..c3e3c77d2b6 --- /dev/null +++ b/python/example_code/neptune/neptune_scenario.py @@ -0,0 +1,729 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 + + +# snippet-start:[neptune.python.scenario.main] +import boto3 +import time +import botocore.exceptions + +# Constants used in this scenario +POLL_INTERVAL_SECONDS = 10 +TIMEOUT_SECONDS = 1200 # 20 minutes + +# snippet-start:[neptune.python.delete.cluster.main] +from botocore.exceptions import ClientError + +def delete_db_cluster(neptune_client, cluster_id: str): + """ + Deletes a Neptune DB cluster and throws exceptions to the caller. + + Args: + neptune_client (boto3.client): The Neptune client object. + cluster_id (str): The ID of the Neptune DB cluster to be deleted. + + Raises: + ClientError: If the delete operation fails. + """ + request = { + 'DBClusterIdentifier': cluster_id, + 'SkipFinalSnapshot': True + } + + try: + print(f"Deleting DB Cluster: {cluster_id}") + neptune_client.delete_db_cluster(**request) + except ClientError as e: + raise + + +# snippet-end:[neptune.python.delete.cluster.main] + +def format_elapsed_time(seconds: int) -> str: + mins, secs = divmod(seconds, 60) + hours, mins = divmod(mins, 60) + return f"{hours:02}:{mins:02}:{secs:02}" + + +# snippet-start:[neptune.python.delete.instance.main] +def delete_db_instance(neptune_client, instance_id: str): + """ + Deletes a Neptune DB instance and waits for its deletion to complete. + Raises exception to be handled by calling code. + """ + print(f"Initiating deletion of DB Instance: {instance_id}") + try: + neptune_client.delete_db_instance( + DBInstanceIdentifier=instance_id, + SkipFinalSnapshot=True + ) + + print(f"Waiting for DB Instance '{instance_id}' to be deleted...") + waiter = neptune_client.get_waiter('db_instance_deleted') + waiter.wait( + DBInstanceIdentifier=instance_id, + WaiterConfig={ + 'Delay': 30, + 'MaxAttempts': 40 + } + ) + + print(f"DB Instance '{instance_id}' successfully deleted.") + except ClientError as e: + raise + + +# snippet-end:[neptune.python.delete.instance.main] + +# snippet-start:[neptune.python.delete.subnet.group.main] +def delete_db_subnet_group(neptune_client, subnet_group_name): + """ + Deletes a Neptune DB subnet group synchronously using Boto3. + + :param subnet_group_name: The name of the DB subnet group to delete. + """ + delete_group_request = { + 'DBSubnetGroupName': subnet_group_name + } + try: + neptune_client.delete_db_subnet_group(**delete_group_request) + print(f"ļø Deleting Subnet Group: {subnet_group_name}") + except ClientError as e: + raise + + +# snippet-end:[neptune.python.delete.subnet.group.main] + +def wait_for_cluster_status( + neptune_client, + cluster_id: str, + desired_status: str, + timeout_seconds: int = TIMEOUT_SECONDS, + poll_interval_seconds: int = POLL_INTERVAL_SECONDS +): + """ + Waits for a Neptune DB cluster to reach a desired status. + + Args: + neptune_client (boto3.client): The Amazon Neptune client. + cluster_id (str): The identifier of the Neptune DB cluster. + desired_status (str): The target status (e.g., "available", "stopped"). + timeout_seconds (int): Max time to wait in seconds (default: 1200). + poll_interval_seconds (int): Polling interval in seconds (default: 10). + + Raises: + RuntimeError: If the desired status is not reached before timeout. + """ + print(f"Waiting for cluster '{cluster_id}' to reach status '{desired_status}'...") + start_time = time.time() + + while True: + # Prepare request object + describe_cluster_request = { + 'DBClusterIdentifier': cluster_id + } + + # Call the Neptune API + response = neptune_client.describe_db_clusters(**describe_cluster_request) + clusters = response.get('DBClusters', []) + current_status = clusters[0].get('Status') if clusters else None + elapsed_seconds = int(time.time() - start_time) + + status_str = current_status if current_status else "Unknown" + print( + f"\r Elapsed: {format_elapsed_time(elapsed_seconds):<20} Cluster status: {status_str:<20}", + end="", flush=True + ) + + if current_status and current_status.lower() == desired_status.lower(): + print( + f"\nNeptune cluster reached desired status '{desired_status}' after {format_elapsed_time(elapsed_seconds)}." + ) + return + + if elapsed_seconds > timeout_seconds: + raise RuntimeError(f"Timeout waiting for Neptune cluster to reach status: {desired_status}") + + time.sleep(poll_interval_seconds) + + +# snippet-start:[neptune.python.start.cluster.main] +def start_db_cluster(neptune_client, cluster_identifier: str): + """ + Starts an Amazon Neptune DB cluster and waits until it reaches 'available'. + + Args: + neptune_client (boto3.client): The Neptune client. + cluster_identifier (str): The DB cluster identifier. + + Raises: + ClientError: Propagates AWS API issues like resource not found. + RuntimeError: If cluster doesn't reach 'available' within timeout. + """ + try: + # Initial wait in case the cluster was just stopped + time.sleep(30) + neptune_client.start_db_cluster(DBClusterIdentifier=cluster_identifier) + except ClientError: + # Immediately propagate any AWS API error + raise + + # Poll until cluster status is 'available' + start_time = time.time() + paginator = neptune_client.get_paginator('describe_db_clusters') + + while True: + try: + pages = paginator.paginate(DBClusterIdentifier=cluster_identifier) + clusters = [] + for page in pages: + clusters.extend(page.get('DBClusters', [])) + except ClientError: + raise + + status = clusters[0].get('Status') if clusters else None + elapsed = time.time() - start_time + + print(f"\rElapsed: {int(elapsed)}s – Cluster status: {status}", end="", flush=True) + + if status and status.lower() == 'available': + print(f"\nšŸŽ‰ Cluster '{cluster_identifier}' is available.") + return + + if elapsed > TIMEOUT_SECONDS: + raise RuntimeError(f"Timeout waiting for cluster '{cluster_identifier}' to become available.") + + time.sleep(POLL_INTERVAL_SECONDS) + + +# snippet-end:[neptune.python.start.cluster.main] + +# snippet-start:[neptune.python.stop.cluster.main] +def stop_db_cluster(neptune_client, cluster_identifier: str): + """ + Stops an Amazon Neptune DB cluster and waits until it's fully stopped. + + Args: + neptune_client (boto3.client): The Neptune client. + cluster_identifier (str): The DB cluster identifier. + + Raises: + ClientError: For AWS API errors (e.g., resource not found). + RuntimeError: If the cluster doesn't stop within the timeout. + """ + try: + neptune_client.stop_db_cluster(DBClusterIdentifier=cluster_identifier) + except ClientError: + # Propagate AWS-level exceptions immediately + raise + + start_time = time.time() + paginator = neptune_client.get_paginator('describe_db_clusters') + + while True: + try: + pages = paginator.paginate(DBClusterIdentifier=cluster_identifier) + clusters = [] + for page in pages: + clusters.extend(page.get('DBClusters', [])) + except ClientError: + # For example, cluster might be already deleted/not found + raise + + status = clusters[0].get('Status') if clusters else None + elapsed = time.time() - start_time + + print(f"\rElapsed: {int(elapsed)}s – Cluster status: {status}", end="", flush=True) + + if status and status.lower() == 'stopped': + print(f"\n Cluster '{cluster_identifier}' is now stopped.") + return + + if elapsed > TIMEOUT_SECONDS: + raise RuntimeError(f"Timeout waiting for cluster '{cluster_identifier}' to stop.") + + time.sleep(POLL_INTERVAL_SECONDS) + + +# snippet-end:[neptune.python.stop.cluster.main] + +# snippet-start:[neptune.python.describe.cluster.main] +def describe_db_clusters(neptune_client, cluster_id: str): + """ + Describes details of a Neptune DB cluster, paginating if needed. + + Args: + neptune_client (boto3.client): The Neptune client. + cluster_id (str): The ID of the cluster to describe. + + Raises: + ClientError: If there's an AWS API error (e.g., cluster not found). + """ + paginator = neptune_client.get_paginator('describe_db_clusters') + try: + pages = paginator.paginate(DBClusterIdentifier=cluster_id) + except ClientError: + raise + + found = False + for page in pages: + for cluster in page.get('DBClusters', []): + found = True + print(f"Cluster Identifier: {cluster.get('DBClusterIdentifier')}") + print(f"Status: {cluster.get('Status')}") + print(f"Engine: {cluster.get('Engine')}") + print(f"Engine Version: {cluster.get('EngineVersion')}") + print(f"Endpoint: {cluster.get('Endpoint')}") + print(f"Reader Endpoint: {cluster.get('ReaderEndpoint')}") + print(f"Availability Zones: {cluster.get('AvailabilityZones')}") + print(f"Subnet Group: {cluster.get('DBSubnetGroup')}") + print("VPC Security Groups:") + for vpc_group in cluster.get('VpcSecurityGroups', []): + print(f" - {vpc_group.get('VpcSecurityGroupId')}") + print(f"Storage Encrypted: {cluster.get('StorageEncrypted')}") + print(f"IAM Auth Enabled: {cluster.get('IAMDatabaseAuthenticationEnabled')}") + print(f"Backup Retention Period: {cluster.get('BackupRetentionPeriod')} days") + print(f"Preferred Backup Window: {cluster.get('PreferredBackupWindow')}") + print(f"Preferred Maintenance Window: {cluster.get('PreferredMaintenanceWindow')}") + print("------") + + if not found: + # Handle empty result set as not found + raise ClientError( + {"Error": {"Code": "DBClusterNotFound", "Message": f"No cluster found with ID '{cluster_id}'"}}, + "DescribeDBClusters" + ) + + +# snippet-end:[neptune.python.describe.cluster.main] + +# snippet-start:[neptune.python.describe.dbinstance.main] +def check_instance_status(neptune_client, instance_id: str, desired_status: str): + """ + Polls the status of a Neptune DB instance until it reaches desired_status. + Uses pagination via describe_db_instances — even for a single instance. + + Raises: + ClientError: If describe_db_instances fails (e.g., instance not found). + RuntimeError: If timeout expires before reaching desired status. + """ + paginator = neptune_client.get_paginator('describe_db_instances') + start_time = time.time() + + while True: + try: + # Paginate responses for the specified instance ID + pages = paginator.paginate(DBInstanceIdentifier=instance_id) + instances = [] + for page in pages: + instances.extend(page.get('DBInstances', [])) + except ClientError: + # Let the calling code handle errors such as ResourceNotFound + raise + + current_status = instances[0].get('DBInstanceStatus') if instances else None + elapsed = int(time.time() - start_time) + + print(f"\rElapsed: {format_elapsed_time(elapsed)} Status: {current_status}", end="", flush=True) + + if current_status and current_status.lower() == desired_status.lower(): + print(f"\nInstance '{instance_id}' reached '{desired_status}' in {format_elapsed_time(elapsed)}.") + return + + if elapsed > TIMEOUT_SECONDS: + raise RuntimeError(f"Timeout waiting for '{instance_id}' to reach '{desired_status}'") + + time.sleep(POLL_INTERVAL_SECONDS) + + +# snippet-end:[neptune.python.describe.dbinstance.main] + +# snippet-start:[neptune.python.create.dbinstance.main] +def create_db_instance(neptune_client, db_instance_id: str, db_cluster_id: str) -> str: + try: + request = { + 'DBInstanceIdentifier': db_instance_id, + 'DBInstanceClass': 'db.r5.large', + 'Engine': 'neptune', + 'DBClusterIdentifier': db_cluster_id + } + + print(f"Creating Neptune DB Instance: {db_instance_id}") + response = neptune_client.create_db_instance(**request) + + instance = response.get('DBInstance') + if not instance or 'DBInstanceIdentifier' not in instance: + raise RuntimeError("Instance creation succeeded but no ID returned.") + + # Wait for it to become available + print(f"Waiting for DB Instance '{db_instance_id}' to become available...") + waiter = neptune_client.get_waiter('db_instance_available') + waiter.wait( + DBInstanceIdentifier=db_instance_id, + WaiterConfig={'Delay': 30, 'MaxAttempts': 40} + ) + + print(f"DB Instance '{db_instance_id}' is now available.") + return instance['DBInstanceIdentifier'] + + except ClientError as e: + raise ClientError( + { + "Error": { + "Code": e.response["Error"]["Code"], + "Message": f"Failed to create DB instance '{db_instance_id}': {e.response['Error']['Message']}" + } + }, + e.operation_name + ) from e + + except Exception as e: + raise RuntimeError(f"Unexpected error creating DB instance '{db_instance_id}': {e}") from e + + +# snippet-end:[neptune.python.create.dbinstance.main] + +# snippet-start:[neptune.python.create.cluster.main] +def create_db_cluster(neptune_client, db_name: str) -> str: + """ + Creates a Neptune DB cluster and returns its identifier. + + Args: + neptune_client (boto3.client): The Neptune client object. + db_name (str): The desired cluster identifier. + + Returns: + str: The DB cluster identifier. + + Raises: + ClientError: Wraps any AWS-side error for the calling code to handle. + RuntimeError: If the call succeeds but no identifier is returned. + """ + request = { + 'DBClusterIdentifier': db_name, + 'Engine': 'neptune', + 'DeletionProtection': False, + 'BackupRetentionPeriod': 1 + } + + try: + response = neptune_client.create_db_cluster(**request) + cluster = response.get('DBCluster') or {} + + cluster_id = cluster.get('DBClusterIdentifier') + if not cluster_id: + raise RuntimeError("Cluster created but no ID returned.") + + print(f"DB Cluster created: {cluster_id}") + return cluster_id + + except ClientError as e: + # enrich the message, + # keep the AWS error code for downstream handling + raise ClientError( + { + "Error": { + "Code": e.response["Error"]["Code"], + "Message": f"Failed to create DB cluster '{db_name}': {e.response['Error']['Message']}" + } + }, + e.operation_name + ) from e + + except Exception as e: + raise RuntimeError(f"Unexpected error creating DB cluster '{db_name}': {e}") from e + + +# snippet-end:[neptune.python.create.cluster.main] + +def get_subnet_ids(vpc_id: str) -> list[str]: + ec2_client = boto3.client('ec2') + + describe_subnets_request = { + 'Filters': [{'Name': 'vpc-id', 'Values': [vpc_id]}] + } + + response = ec2_client.describe_subnets(**describe_subnets_request) + subnets = response.get('Subnets', []) + subnet_ids = [subnet['SubnetId'] for subnet in subnets if 'SubnetId' in subnet] + return subnet_ids + + +def get_default_vpc_id() -> str: + ec2_client = boto3.client('ec2') + describe_vpcs_request = { + 'Filters': [{'Name': 'isDefault', 'Values': ['true']}] + } + + response = ec2_client.describe_vpcs(**describe_vpcs_request) + vpcs = response.get('Vpcs', []) + if not vpcs: + raise RuntimeError("No default VPC found in this region.") + + default_vpc_id = vpcs[0]['VpcId'] + print(f"Default VPC ID: {default_vpc_id}") + return default_vpc_id + + +# snippet-start:[neptune.python.create.subnet.main] +def create_subnet_group(neptune_client, group_name: str): + """ + Creates a Neptune DB subnet group and returns its name and ARN. + + Args: + neptune_client (boto3.client): The Neptune client object. + group_name (str): The desired name of the subnet group. + + Returns: + tuple(str, str): (subnet_group_name, subnet_group_arn) + + Raises: + ClientError: If AWS returns an error. + RuntimeError: For unexpected internal errors. + """ + vpc_id = get_default_vpc_id() + subnet_ids = get_subnet_ids(vpc_id) + + request = { + 'DBSubnetGroupName': group_name, + 'DBSubnetGroupDescription': 'My Neptune subnet group', + 'SubnetIds': subnet_ids, + 'Tags': [{'Key': 'Environment', 'Value': 'Dev'}] + } + + try: + response = neptune_client.create_db_subnet_group(**request) + sg = response.get("DBSubnetGroup", {}) + + name = sg.get("DBSubnetGroupName") + arn = sg.get("DBSubnetGroupArn") + + if not name or not arn: + raise RuntimeError("Response missing subnet group name or ARN.") + + print(f"Subnet group created: {name}") + print(f"ARN: {arn}") + + return name, arn + + except ClientError as e: + # Repackage with context, then throw + raise ClientError( + { + "Error": { + "Code": e.response["Error"]["Code"], + "Message": f"Failed to create subnet group '{group_name}': {e.response['Error']['Message']}" + } + }, + e.operation_name + ) from e + + except Exception as e: + raise RuntimeError(f"Unexpected error creating subnet group '{group_name}': {e}") from e + + +# snippet-end:[neptune.python.create.subnet.main] + +def wait_for_input_to_continue(): + input("\nPress to continue...") + print("Continuing with the program...\n") + + +def run_scenario(neptune_client, subnet_group_name: str, db_instance_id: str, cluster_name: str): + print("-" * 88) + print("1. Create a Neptune DB Subnet Group") + wait_for_input_to_continue() + + try: + name, arn = create_subnet_group(neptune_client, subnet_group_name) + print(f"Subnet group successfully created: {name}") + + except ClientError as ce: + code = ce.response["Error"]["Code"] + if code == "ServiceQuotaExceededException": + print("You've hit the subnet group quota.") + else: + msg = ce.response["Error"]["Message"] + print(f"AWS error [{code}]: {msg}") + raise + + except RuntimeError as re: + print(f"Runtime issue: {re}") + + print("-" * 88) + + print("2. Create a Neptune Cluster") + wait_for_input_to_continue() + try: + db_cluster_id = create_db_cluster(neptune_client, cluster_name) + except ClientError as ce: + code = ce.response["Error"]["Code"] + if code in ("ServiceQuotaExceededException", "DBClusterQuotaExceededFault"): + print("You have exceeded the quota for Neptune DB clusters.") + else: + msg = ce.response["Error"]["Message"] + print(f"AWS error [{code}]: {msg}") + + except RuntimeError as re: + print(f"Runtime issue: {re}") + + except Exception as e: + print(f" Unexpected error: {e}") + print("-" * 88) + + print("-" * 88) + print("3. Create a Neptune DB Instance") + wait_for_input_to_continue() + try: + create_db_instance(neptune_client, db_instance_id, cluster_name) + + except ClientError as ce: + error_code = ce.response["Error"]["Code"] + if error_code == "ServiceQuotaExceededException": + print("You have exceeded the quota for Neptune DB instances.") + else: + print(f"AWS error [{error_code}]: {ce.response['Error']['Message']}") + raise # Optionally rethrow + + except RuntimeError as re: + print(f"Runtime error: {str(re)}") + print("-" * 88) + + print("-" * 88) + print("4. Check the status of the Neptune DB Instance") + print(""" + Even though you're targeting a single DB instance, + describe_db_instances supports pagination and can return multiple pages. + + Handling paginated responses ensures your method continues to work reliably + even if AWS returns large or paged results. + """) + wait_for_input_to_continue() + + try: + check_instance_status(neptune_client, db_instance_id, "available") + except ClientError as ce: + code = ce.response['Error']['Code'] + if code in ('DBInstanceNotFound', 'DBInstanceNotFoundFault', 'ResourceNotFound'): + print(f"Instance '{db_instance_id}' not found.") + else: + print(f"AWS error [{code}]: {ce.response['Error']['Message']}") + raise + except RuntimeError as re: + print(f" Timeout: {re}") + print("-" * 88) + + print("-" * 88) + print("5. Show Neptune Cluster details") + wait_for_input_to_continue() + + try: + describe_db_clusters(neptune_client, db_cluster_id) + except ClientError as ce: + code = ce.response["Error"]["Code"] + if code in ("DBClusterNotFound", "DBClusterNotFoundFault", "ResourceNotFound"): + print(f"Cluster '{db_cluster_id}' not found.") + else: + print(f"AWS error [{code}]: {ce.response['Error']['Message']}") + raise + print("-" * 88) + + print("-" * 88) + print("6. Stop the Amazon Neptune cluster") + print(""" + Boto3 doesn't currently offer a + built-in waiter for stop_db_cluster, + This example implements a custom polling + strategy until the cluster is in a stopped state. + + """) + wait_for_input_to_continue() + try: + stop_db_cluster(neptune_client, db_cluster_id) + check_instance_status(neptune_client, db_instance_id, "stopped") + except ClientError as ce: + code = ce.response["Error"]["Code"] + if code in ("DBClusterNotFoundFault", "DBClusterNotFound", "ResourceNotFoundFault"): + print(f"Cluster '{db_cluster_id}' not found.") + else: + print(f"AWS error [{code}]: {ce.response['Error']['Message']}") + raise + print("-" * 88) + + print("-" * 88) + print("7. Start the Amazon Neptune cluster") + print(""" + Boto3 doesn't currently offer a + built-in waiter for start_db_cluster, + This example implements a custom polling + strategy until the cluster is in an available state. + """) + wait_for_input_to_continue() + try: + start_db_cluster(neptune_client, db_cluster_id) + wait_for_cluster_status(neptune_client, db_cluster_id, "available") + check_instance_status(neptune_client, db_instance_id, "available") + + except ClientError as ce: + code = ce.response["Error"]["Code"] + if code in ("DBClusterNotFoundFault", "DBClusterNotFound", "ResourceNotFoundFault"): + print(f"Cluster '{db_cluster_id}' not found.") + else: + print(f"AWS error [{code}]: {ce.response['Error']['Message']}") + raise + + except RuntimeError as re: + # Handles timeout or other runtime issues + print(f"Timeout or runtime error: {re}") + + else: + # No exceptions occurred + print("All Neptune resources are now available.") + print("-" * 88) + + print("-" * 88) + print("8. Delete the Neptune Assets") + print("Would you like to delete the Neptune Assets? (y/n)") + del_ans = input().strip() + if del_ans == "y": + print("You selected to delete the Neptune assets.") + try: + delete_db_instance(neptune_client, db_instance_id) + delete_db_cluster(neptune_client, db_cluster_id) + delete_db_subnet_group(neptune_client, subnet_group_name) + print("Neptune resources deleted successfully") + except ClientError as e: + error_code = e.response['Error']['Code'] + if error_code == "DBInstanceNotFound": + print(f"Instance '{db_instance_id}' already deleted or doesn't exist.") + else: + print(f"Error during Neptune cleanup: {e}") + print("-" * 88) + + +def main(): + neptune_client = boto3.client('neptune') + + # Customize the following names to match your Neptune setup + # (You must change these to unique values for your environment) + subnet_group_name = "neptuneSubnetGroup105" + cluster_name = "neptuneCluster105" + db_instance_id = "neptuneDB105" + + print(""" + Amazon Neptune is a fully managed graph database service by AWS... + Let's get started! + """) + wait_for_input_to_continue() + run_scenario(neptune_client, subnet_group_name, db_instance_id, cluster_name) + + print(""" + Thank you for checking out the Amazon Neptune Service Use demo. + For more AWS code examples, visit: + https://docs.aws.amazon.com/code-library/latest/ug/what-is-code-library.html + """) + + +if __name__ == "__main__": + main() +# snippet-end:[neptune.python.scenario.main] diff --git a/python/example_code/neptune/tests/analytics_tests/test_create_graph.py b/python/example_code/neptune/tests/analytics_tests/test_create_graph.py new file mode 100644 index 00000000000..f934d2027d5 --- /dev/null +++ b/python/example_code/neptune/tests/analytics_tests/test_create_graph.py @@ -0,0 +1,43 @@ +import pytest +from unittest.mock import MagicMock +from botocore.exceptions import ClientError, BotoCoreError +from analytics.create_neptune_graph_example import execute_create_graph # Adjust import based on your file structure + + +def test_execute_create_graph(capfd): + mock_client = MagicMock() + + # --- Success case --- + mock_client.create_graph.return_value = { + "Name": "test-graph", + "Arn": "arn:aws:neptune:region:123456789012:graph/test-graph", + "Endpoint": "https://test-graph.endpoint" + } + + execute_create_graph(mock_client, "test-graph") + out, _ = capfd.readouterr() + assert "Creating Neptune graph..." in out + assert "Graph created successfully!" in out + assert "Graph Name: test-graph" in out + assert "Graph ARN: arn:aws:neptune:region:123456789012:graph/test-graph" in out + assert "Graph Endpoint: https://test-graph.endpoint" in out + + # --- ClientError case --- + mock_client.create_graph.side_effect = ClientError( + {"Error": {"Message": "Client error occurred"}}, "CreateGraph" + ) + execute_create_graph(mock_client, "test-graph") + out, _ = capfd.readouterr() + assert "Failed to create graph: Client error occurred" in out + + # --- BotoCoreError case --- + mock_client.create_graph.side_effect = BotoCoreError() + execute_create_graph(mock_client, "test-graph") + out, _ = capfd.readouterr() + assert "Failed to create graph:" in out # Just check the prefix because message varies + + # --- Generic Exception case --- + mock_client.create_graph.side_effect = Exception("Generic failure") + execute_create_graph(mock_client, "test-graph") + out, _ = capfd.readouterr() + assert "Unexpected error: Generic failure" in out diff --git a/python/example_code/neptune/tests/analytics_tests/test_execute_gremlin_profile_query.py b/python/example_code/neptune/tests/analytics_tests/test_execute_gremlin_profile_query.py new file mode 100644 index 00000000000..170dd37defc --- /dev/null +++ b/python/example_code/neptune/tests/analytics_tests/test_execute_gremlin_profile_query.py @@ -0,0 +1,46 @@ +import pytest +from unittest.mock import MagicMock +from botocore.exceptions import ClientError +from analytics.neptune_analytics_query_example import execute_gremlin_profile_query # adjust this import + + +class FakePayload: + def __init__(self, data: bytes): + self._data = data + def read(self): + return self._data + + +def test_execute_gremlin_profile_query(capfd): + mock_client = MagicMock() + graph_id = "test-graph-id" + + # --- Success case with Payload --- + mock_client.execute_query.return_value = { + "Payload": FakePayload(b'{"results": "some data"}') + } + execute_gremlin_profile_query(mock_client, graph_id) + out, _ = capfd.readouterr() + assert "Running openCypher query on Neptune Analytics..." in out + assert "Query Result:" in out + assert '{"results": "some data"}' in out + + # --- Success case with no Payload --- + mock_client.execute_query.return_value = {} + execute_gremlin_profile_query(mock_client, graph_id) + out, _ = capfd.readouterr() + assert "No query result returned." in out + + # --- ClientError case --- + mock_client.execute_query.side_effect = ClientError( + {"Error": {"Message": "Client error occurred"}}, "ExecuteQuery" + ) + execute_gremlin_profile_query(mock_client, graph_id) + out, _ = capfd.readouterr() + assert "NeptuneGraph error: Client error occurred" in out + + # --- Generic exception case --- + mock_client.execute_query.side_effect = Exception("Generic failure") + execute_gremlin_profile_query(mock_client, graph_id) + out, _ = capfd.readouterr() + assert "Unexpected error: Generic failure" in out diff --git a/python/example_code/neptune/tests/database_tests/execute_gremlin_profile_query.py b/python/example_code/neptune/tests/database_tests/execute_gremlin_profile_query.py new file mode 100644 index 00000000000..4b2f89fdb00 --- /dev/null +++ b/python/example_code/neptune/tests/database_tests/execute_gremlin_profile_query.py @@ -0,0 +1,52 @@ +import json +import pytest +from unittest.mock import MagicMock +from botocore.exceptions import ClientError, EndpointConnectionError + +from database.gremlin_profile_query_example import execute_gremlin_profile_query # Adjust path as needed + + +def test_execute_gremlin_profile_query(capfd): + """ + Unit test for execute_gremlin_profile_query(). + Tests success, no output, ClientError, BotoCoreError, and general Exception handling. + """ + # --- Success case with valid output --- + mock_client = MagicMock() + mock_client.execute_gremlin_profile_query.return_value = { + "output": {"metrics": {"dur": 500, "steps": 3}} + } + + execute_gremlin_profile_query(mock_client) + out, _ = capfd.readouterr() + assert "Query Profile Output:" in out + assert '"dur": 500' in out + + # --- Success case with no output --- + mock_client.execute_gremlin_profile_query.return_value = {"output": None} + execute_gremlin_profile_query(mock_client) + out, _ = capfd.readouterr() + assert "No output returned from the profile query." in out + + # --- ClientError case --- + mock_client.execute_gremlin_profile_query.side_effect = ClientError( + {"Error": {"Code": "BadRequest", "Message": "Invalid query"}}, + operation_name="ExecuteGremlinProfileQuery" + ) + execute_gremlin_profile_query(mock_client) + out, _ = capfd.readouterr() + assert "Neptune error: Invalid query" in out + + # --- BotoCoreError case --- + mock_client.execute_gremlin_profile_query.side_effect = EndpointConnectionError( + endpoint_url="https://neptune.amazonaws.com" + ) + execute_gremlin_profile_query(mock_client) + out, _ = capfd.readouterr() + assert "Unexpected Boto3 error" in out + + # --- Unexpected exception case --- + mock_client.execute_gremlin_profile_query.side_effect = Exception("Boom") + execute_gremlin_profile_query(mock_client) + out, _ = capfd.readouterr() + assert "Unexpected error: Boom" in out diff --git a/python/example_code/neptune/tests/database_tests/test_gremlin_queries.py b/python/example_code/neptune/tests/database_tests/test_gremlin_queries.py new file mode 100644 index 00000000000..41224593b2d --- /dev/null +++ b/python/example_code/neptune/tests/database_tests/test_gremlin_queries.py @@ -0,0 +1,44 @@ +import pytest +from unittest.mock import MagicMock +from botocore.exceptions import ClientError, BotoCoreError +from database.neptune_gremlin_query_example import execute_gremlin_query + +def test_execute_gremlin_query(capfd): + # Mock the client + mock_client = MagicMock() + + # --- Case 1: Success with result --- + mock_client.execute_gremlin_query.return_value = { + "result": {"data": ["some", "nodes"]} + } + execute_gremlin_query(mock_client) + out, _ = capfd.readouterr() + assert "Querying Neptune..." in out + assert "Query Result:" in out + assert "some" in out + + # --- Case 2: Success with no result --- + mock_client.execute_gremlin_query.return_value = {"result": None} + execute_gremlin_query(mock_client) + out, _ = capfd.readouterr() + assert "No result returned from the query." in out + + # --- Case 3: ClientError --- + mock_client.execute_gremlin_query.side_effect = ClientError( + {"Error": {"Message": "BadRequest"}}, operation_name="ExecuteGremlinQuery" + ) + execute_gremlin_query(mock_client) + out, _ = capfd.readouterr() + assert "Error calling Neptune: BadRequest" in out + + # --- Case 4: BotoCoreError --- + mock_client.execute_gremlin_query.side_effect = BotoCoreError() + execute_gremlin_query(mock_client) + out, _ = capfd.readouterr() + assert "BotoCore error:" in out + + # --- Case 5: Generic exception --- + mock_client.execute_gremlin_query.side_effect = Exception("Unexpected failure") + execute_gremlin_query(mock_client) + out, _ = capfd.readouterr() + assert "Unexpected error: Unexpected failure" in out diff --git a/python/example_code/neptune/tests/database_tests/test_opencypher_explain_query.py b/python/example_code/neptune/tests/database_tests/test_opencypher_explain_query.py new file mode 100644 index 00000000000..c117618f756 --- /dev/null +++ b/python/example_code/neptune/tests/database_tests/test_opencypher_explain_query.py @@ -0,0 +1,54 @@ +import pytest +from unittest.mock import MagicMock +from botocore.exceptions import ClientError, BotoCoreError +from database.open_cypher_explain_example import execute_opencypher_explain_query + + +def test_execute_opencypher_explain_query(capfd): + mock_client = MagicMock() + + # --- Case 1: Successful result (bytes) --- + mock_client.execute_open_cypher_explain_query.return_value = { + "results": b"mocked byte explain output" + } + execute_opencypher_explain_query(mock_client) + out, _ = capfd.readouterr() + assert "Explain Results:" in out + assert "mocked byte explain output" in out + + # --- Case 2: Successful result (str) --- + mock_client.execute_open_cypher_explain_query.return_value = { + "results": "mocked string explain output" + } + execute_opencypher_explain_query(mock_client) + out, _ = capfd.readouterr() + assert "Explain Results:" in out + assert "mocked string explain output" in out + + # --- Case 3: No results --- + mock_client.execute_open_cypher_explain_query.return_value = { + "results": None + } + execute_opencypher_explain_query(mock_client) + out, _ = capfd.readouterr() + assert "No explain results returned." in out + + # --- Case 4: ClientError --- + mock_client.execute_open_cypher_explain_query.side_effect = ClientError( + {"Error": {"Message": "Invalid OpenCypher query"}}, "ExecuteOpenCypherExplainQuery" + ) + execute_opencypher_explain_query(mock_client) + out, _ = capfd.readouterr() + assert "Neptune error: Invalid OpenCypher query" in out + + # --- Case 5: BotoCoreError --- + mock_client.execute_open_cypher_explain_query.side_effect = BotoCoreError() + execute_opencypher_explain_query(mock_client) + out, _ = capfd.readouterr() + assert "BotoCore error:" in out + + # --- Case 6: Generic Exception --- + mock_client.execute_open_cypher_explain_query.side_effect = Exception("Some generic error") + execute_opencypher_explain_query(mock_client) + out, _ = capfd.readouterr() + assert "Unexpected error: Some generic error" in out diff --git a/python/example_code/neptune/tests/test_check_instance_status.py b/python/example_code/neptune/tests/test_check_instance_status.py new file mode 100644 index 00000000000..3f4738d5840 --- /dev/null +++ b/python/example_code/neptune/tests/test_check_instance_status.py @@ -0,0 +1,65 @@ +import pytest +from unittest.mock import MagicMock, patch +from botocore.exceptions import ClientError +from neptune_scenario import check_instance_status + + +@patch("NeptuneScenario.time.sleep", return_value=None) +@patch("NeptuneScenario.time.time") +@patch("NeptuneScenario.format_elapsed_time", side_effect=lambda x: f"{x}s") +def test_check_instance_status(mock_format_time, mock_time, mock_sleep): + """ + Fast unit test for check_instance_status(). + Covers: success, timeout, ClientError. + """ + # --- Setup Neptune mock client --- + mock_client = MagicMock() + mock_paginator = MagicMock() + mock_client.get_paginator.return_value = mock_paginator + + # --- Success scenario --- + # Simulate time progressing quickly + mock_time.side_effect = [0, 1, 2, 3, 4, 5] # enough for 2 loops + + # Simulate: starting -> available + mock_paginator.paginate.side_effect = [ + [{"DBInstances": [{"DBInstanceStatus": "starting"}]}], + [{"DBInstances": [{"DBInstanceStatus": "available"}]}] + ] + + check_instance_status(mock_client, "instance-1", "available") + assert mock_client.get_paginator.called + assert mock_paginator.paginate.called + + # --- Timeout scenario --- + # Reset mocks + mock_client.reset_mock() + mock_paginator = MagicMock() + mock_client.get_paginator.return_value = mock_paginator + + # Provide enough time values to loop 4–5 times + mock_time.side_effect = list(range(20)) # 0 to 19 + + # Always returns 'starting' + mock_paginator.paginate.side_effect = lambda **kwargs: [ + {"DBInstances": [{"DBInstanceStatus": "starting"}]} + ] + + # Shrink TIMEOUT to 3s inside test scope + with patch("NeptuneScenario.TIMEOUT_SECONDS", 3), patch("NeptuneScenario.POLL_INTERVAL_SECONDS", 1): + with pytest.raises(RuntimeError, match="Timeout waiting for 'instance-timeout'"): + check_instance_status(mock_client, "instance-timeout", "available") + + # --- ClientError scenario --- + mock_paginator.paginate.side_effect = ClientError( + { + "Error": { + "Code": "DBInstanceNotFound", + "Message": "Instance not found" + } + }, + operation_name="DescribeDBInstances" + ) + + with pytest.raises(ClientError, match="Instance not found"): + check_instance_status(mock_client, "not-there", "available") diff --git a/python/example_code/neptune/tests/test_create_db_cluster.py b/python/example_code/neptune/tests/test_create_db_cluster.py new file mode 100644 index 00000000000..2864bc45b96 --- /dev/null +++ b/python/example_code/neptune/tests/test_create_db_cluster.py @@ -0,0 +1,49 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 + +import pytest +from unittest.mock import MagicMock +from botocore.exceptions import ClientError + +from neptune_scenario import create_db_cluster # Replace with your actual module path + +def test_create_db_cluster(): + """ + Unit test for create_db_cluster(). + Tests success, missing cluster ID, ClientError, and unexpected exceptions, + all in one test to follow the single-method test style. + """ + # --- Success case --- + mock_neptune = MagicMock() + mock_neptune.create_db_cluster.return_value = { + "DBCluster": { + "DBClusterIdentifier": "test-cluster" + } + } + cluster_id = create_db_cluster(mock_neptune, "test-cluster") + assert cluster_id == "test-cluster" + mock_neptune.create_db_cluster.assert_called_once() + + # --- Missing cluster ID raises RuntimeError --- + mock_neptune.create_db_cluster.return_value = {"DBCluster": {}} + with pytest.raises(RuntimeError, match="Cluster created but no ID returned"): + create_db_cluster(mock_neptune, "missing-id-cluster") + + # --- ClientError is wrapped and re-raised --- + mock_neptune.create_db_cluster.side_effect = ClientError( + { + "Error": { + "Code": "AccessDenied", + "Message": "You do not have permission." + } + }, + operation_name="CreateDBCluster" + ) + with pytest.raises(ClientError) as exc_info: + create_db_cluster(mock_neptune, "denied-cluster") + assert "Failed to create DB cluster 'denied-cluster'" in str(exc_info.value) + + # --- Unexpected exception raises RuntimeError --- + mock_neptune.create_db_cluster.side_effect = Exception("Unexpected failure") + with pytest.raises(RuntimeError, match="Unexpected error creating DB cluster"): + create_db_cluster(mock_neptune, "fail-cluster") diff --git a/python/example_code/neptune/tests/test_create_db_instance.py b/python/example_code/neptune/tests/test_create_db_instance.py new file mode 100644 index 00000000000..2864bc45b96 --- /dev/null +++ b/python/example_code/neptune/tests/test_create_db_instance.py @@ -0,0 +1,49 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 + +import pytest +from unittest.mock import MagicMock +from botocore.exceptions import ClientError + +from neptune_scenario import create_db_cluster # Replace with your actual module path + +def test_create_db_cluster(): + """ + Unit test for create_db_cluster(). + Tests success, missing cluster ID, ClientError, and unexpected exceptions, + all in one test to follow the single-method test style. + """ + # --- Success case --- + mock_neptune = MagicMock() + mock_neptune.create_db_cluster.return_value = { + "DBCluster": { + "DBClusterIdentifier": "test-cluster" + } + } + cluster_id = create_db_cluster(mock_neptune, "test-cluster") + assert cluster_id == "test-cluster" + mock_neptune.create_db_cluster.assert_called_once() + + # --- Missing cluster ID raises RuntimeError --- + mock_neptune.create_db_cluster.return_value = {"DBCluster": {}} + with pytest.raises(RuntimeError, match="Cluster created but no ID returned"): + create_db_cluster(mock_neptune, "missing-id-cluster") + + # --- ClientError is wrapped and re-raised --- + mock_neptune.create_db_cluster.side_effect = ClientError( + { + "Error": { + "Code": "AccessDenied", + "Message": "You do not have permission." + } + }, + operation_name="CreateDBCluster" + ) + with pytest.raises(ClientError) as exc_info: + create_db_cluster(mock_neptune, "denied-cluster") + assert "Failed to create DB cluster 'denied-cluster'" in str(exc_info.value) + + # --- Unexpected exception raises RuntimeError --- + mock_neptune.create_db_cluster.side_effect = Exception("Unexpected failure") + with pytest.raises(RuntimeError, match="Unexpected error creating DB cluster"): + create_db_cluster(mock_neptune, "fail-cluster") diff --git a/python/example_code/neptune/tests/test_create_subnet_group.py b/python/example_code/neptune/tests/test_create_subnet_group.py new file mode 100644 index 00000000000..ba38d17224f --- /dev/null +++ b/python/example_code/neptune/tests/test_create_subnet_group.py @@ -0,0 +1,56 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 + +import pytest +from unittest.mock import MagicMock, patch +from botocore.exceptions import ClientError +from neptune_scenario import create_subnet_group # Adjust the import path as necessary + +# Mocking external functions to isolate the unit test +@patch("NeptuneScenario.get_subnet_ids") +@patch("NeptuneScenario.get_default_vpc_id") +def test_create_subnet_group(mock_get_vpc, mock_get_subnets): + """ + Unit test for create_subnet_group(). + Verifies successful creation and correct parsing of name and ARN. + """ + # --- Setup Mocks --- + mock_get_vpc.return_value = "vpc-1234" + mock_get_subnets.return_value = ["subnet-1", "subnet-2"] + + mock_neptune = MagicMock() + mock_neptune.create_db_subnet_group.return_value = { + "DBSubnetGroup": { + "DBSubnetGroupName": "test-group", + "DBSubnetGroupArn": "arn:aws:neptune:us-east-1:123456789012:subnet-group:test-group" + } + } + + # --- Success Case --- + name, arn = create_subnet_group(mock_neptune, "test-group") + assert name == "test-group" + assert arn == "arn:aws:neptune:us-east-1:123456789012:subnet-group:test-group" + mock_neptune.create_db_subnet_group.assert_called_once_with( + DBSubnetGroupName="test-group", + DBSubnetGroupDescription="My Neptune subnet group", + SubnetIds=["subnet-1", "subnet-2"], + Tags=[{"Key": "Environment", "Value": "Dev"}] + ) + + # --- Missing Name or ARN --- + mock_neptune.create_db_subnet_group.return_value = {"DBSubnetGroup": {}} + with pytest.raises(RuntimeError, match="Response missing subnet group name or ARN"): + create_subnet_group(mock_neptune, "missing-id-group") + + # --- ClientError Handling --- + mock_neptune.create_db_subnet_group.side_effect = ClientError( + {"Error": {"Code": "AccessDenied", "Message": "Permission denied"}}, + operation_name="CreateDBSubnetGroup" + ) + with pytest.raises(ClientError, match="Failed to create subnet group 'denied-group'"): + create_subnet_group(mock_neptune, "denied-group") + + # --- Unexpected Exception --- + mock_neptune.create_db_subnet_group.side_effect = Exception("Unexpected failure") + with pytest.raises(RuntimeError, match="Unexpected error creating subnet group 'fail-group'"): + create_subnet_group(mock_neptune, "fail-group") diff --git a/python/example_code/neptune/tests/test_delete_db_cluster.py b/python/example_code/neptune/tests/test_delete_db_cluster.py new file mode 100644 index 00000000000..d1b9eb132c2 --- /dev/null +++ b/python/example_code/neptune/tests/test_delete_db_cluster.py @@ -0,0 +1,46 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 + +import pytest +from unittest.mock import MagicMock +from botocore.exceptions import ClientError + +from neptune_scenario import delete_db_cluster # Update with actual module name + +def test_delete_db_cluster(): + """ + Unit test for delete_db_cluster(). + Tests success, AWS ClientError, and unexpected exception scenarios. + """ + # --- Success case --- + mock_neptune = MagicMock() + mock_neptune.delete_db_cluster.return_value = {} + + delete_db_cluster(mock_neptune, "test-cluster") + mock_neptune.delete_db_cluster.assert_called_once_with( + DBClusterIdentifier="test-cluster", + SkipFinalSnapshot=True + ) + + # --- AWS ClientError is raised --- + mock_neptune = MagicMock() + mock_neptune.delete_db_cluster.side_effect = ClientError( + { + "Error": { + "Code": "AccessDenied", + "Message": "You are not authorized to delete this cluster" + } + }, + operation_name="DeleteDBCluster" + ) + + with pytest.raises(ClientError) as exc_info: + delete_db_cluster(mock_neptune, "unauthorized-cluster") + assert "AccessDenied" in str(exc_info.value) + + # --- Unexpected Exception raises as-is --- + mock_neptune = MagicMock() + mock_neptune.delete_db_cluster.side_effect = Exception("Unexpected error") + + with pytest.raises(Exception, match="Unexpected error"): + delete_db_cluster(mock_neptune, "error-cluster") diff --git a/python/example_code/neptune/tests/test_delete_db_instance.py b/python/example_code/neptune/tests/test_delete_db_instance.py new file mode 100644 index 00000000000..bf485af96be --- /dev/null +++ b/python/example_code/neptune/tests/test_delete_db_instance.py @@ -0,0 +1,47 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 + +import pytest +from unittest.mock import MagicMock, patch +from botocore.exceptions import ClientError +from neptune_scenario import delete_db_instance + + +@patch("NeptuneScenario.time.sleep", return_value=None) # Not needed here, but safe if waiter is mocked differently later +def test_delete_db_instance(mock_sleep): + """ + Unit test for delete_db_instance(). + Covers: successful deletion and ClientError case. + """ + # --- Setup mock Neptune client --- + mock_client = MagicMock() + mock_waiter = MagicMock() + mock_client.get_waiter.return_value = mock_waiter + + # --- Success scenario --- + delete_db_instance(mock_client, "instance-1") + + mock_client.delete_db_instance.assert_called_once_with( + DBInstanceIdentifier="instance-1", + SkipFinalSnapshot=True + ) + mock_client.get_waiter.assert_called_once_with("db_instance_deleted") + mock_waiter.wait.assert_called_once_with( + DBInstanceIdentifier="instance-1", + WaiterConfig={"Delay": 30, "MaxAttempts": 40} + ) + + # --- ClientError scenario --- + mock_client.reset_mock() + mock_client.delete_db_instance.side_effect = ClientError( + { + "Error": { + "Code": "InvalidDBInstanceState", + "Message": "Instance is not in a deletable state" + } + }, + operation_name="DeleteDBInstance" + ) + + with pytest.raises(ClientError, match="Instance is not in a deletable state"): + delete_db_instance(mock_client, "bad-instance") diff --git a/python/example_code/neptune/tests/test_delete_db_subnet_group.py b/python/example_code/neptune/tests/test_delete_db_subnet_group.py new file mode 100644 index 00000000000..8dbe211bf79 --- /dev/null +++ b/python/example_code/neptune/tests/test_delete_db_subnet_group.py @@ -0,0 +1,39 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 + +import pytest +from unittest.mock import MagicMock +from botocore.exceptions import ClientError + +from neptune_scenario import delete_db_subnet_group # Adjust if module name differs + + +def test_delete_db_subnet_group(): + """ + Unit test for delete_db_subnet_group(). + Covers success and ClientError cases. + """ + mock_neptune = MagicMock() + + # --- Success case --- + mock_neptune.delete_db_subnet_group.return_value = {} + delete_db_subnet_group(mock_neptune, "my-subnet-group") + mock_neptune.delete_db_subnet_group.assert_called_once_with( + DBSubnetGroupName="my-subnet-group" + ) + + # --- ClientError case --- + mock_neptune.delete_db_subnet_group.side_effect = ClientError( + { + "Error": { + "Code": "AccessDenied", + "Message": "You are not authorized to delete this subnet group" + } + }, + operation_name="DeleteDBSubnetGroup" + ) + + with pytest.raises(ClientError) as exc_info: + delete_db_subnet_group(mock_neptune, "unauthorized-subnet") + + assert "You are not authorized" in str(exc_info.value) diff --git a/python/example_code/neptune/tests/test_describe_db_clusters.py b/python/example_code/neptune/tests/test_describe_db_clusters.py new file mode 100644 index 00000000000..2443f3deff6 --- /dev/null +++ b/python/example_code/neptune/tests/test_describe_db_clusters.py @@ -0,0 +1,75 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 + + +import unittest +from unittest.mock import MagicMock, patch +from botocore.exceptions import ClientError +from neptune_scenario import describe_db_clusters + +class TestDescribeDbClusters(unittest.TestCase): + + def setUp(self): + self.cluster_id = "test-cluster" + self.mock_client = MagicMock() + + def test_cluster_found_and_prints_info(self): + # Simulate successful describe with one DBCluster + mock_response = [{ + 'DBClusters': [{ + 'DBClusterIdentifier': 'test-cluster', + 'Status': 'available', + 'Engine': 'neptune', + 'EngineVersion': '1.2.0.0', + 'Endpoint': 'test-endpoint', + 'ReaderEndpoint': 'reader-endpoint', + 'AvailabilityZones': ['us-east-1a'], + 'DBSubnetGroup': 'default', + 'VpcSecurityGroups': [{'VpcSecurityGroupId': 'sg-12345'}], + 'StorageEncrypted': True, + 'IAMDatabaseAuthenticationEnabled': True, + 'BackupRetentionPeriod': 7, + 'PreferredBackupWindow': '07:00-09:00', + 'PreferredMaintenanceWindow': 'sun:05:00-sun:09:00' + }] + }] + paginator_mock = MagicMock() + paginator_mock.paginate.return_value = mock_response + self.mock_client.get_paginator.return_value = paginator_mock + + # Just run the function and ensure no exception + describe_db_clusters(self.mock_client, self.cluster_id) + + self.mock_client.get_paginator.assert_called_with('describe_db_clusters') + paginator_mock.paginate.assert_called_with(DBClusterIdentifier=self.cluster_id) + + def test_cluster_not_found_raises_client_error(self): + # Simulate paginator returning empty DBClusters + mock_response = [{'DBClusters': []}] + paginator_mock = MagicMock() + paginator_mock.paginate.return_value = mock_response + self.mock_client.get_paginator.return_value = paginator_mock + + with self.assertRaises(ClientError) as cm: + describe_db_clusters(self.mock_client, self.cluster_id) + + err = cm.exception.response['Error'] + self.assertEqual(err['Code'], 'DBClusterNotFound') + + def test_client_error_from_paginate_is_propagated(self): + # Simulate paginator throwing ClientError + paginator_mock = MagicMock() + paginator_mock.paginate.side_effect = ClientError( + {"Error": {"Code": "AccessDeniedException", "Message": "Denied"}}, + "DescribeDBClusters" + ) + self.mock_client.get_paginator.return_value = paginator_mock + + with self.assertRaises(ClientError) as cm: + describe_db_clusters(self.mock_client, self.cluster_id) + + self.assertEqual(cm.exception.response['Error']['Code'], 'AccessDeniedException') + + +if __name__ == "__main__": + unittest.main() diff --git a/python/example_code/neptune/tests/test_hello.py b/python/example_code/neptune/tests/test_hello.py new file mode 100644 index 00000000000..44845376768 --- /dev/null +++ b/python/example_code/neptune/tests/test_hello.py @@ -0,0 +1,69 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 + +import pytest +from unittest.mock import MagicMock +from botocore.exceptions import ClientError + +from hello_neptune import describe_db_clusters # replace with actual import + + +@pytest.fixture +def mock_neptune_client(): + """Return a mocked boto3 Neptune client.""" + return MagicMock() + + +def test_describe_db_clusters_unit(mock_neptune_client, capsys): + """ + Unit test for describe_db_clusters with paginator. + Mocks the Neptune client's paginator and verifies expected output is printed. + """ + + # Create a mock paginator + mock_paginator = MagicMock() + mock_neptune_client.get_paginator.return_value = mock_paginator + + # Mock pages returned by paginate() + mock_paginator.paginate.return_value = [ + { + "DBClusters": [ + { + "DBClusterIdentifier": "my-test-cluster", + "Status": "available" + } + ] + }, + { + "DBClusters": [ + { + "DBClusterIdentifier": "my-second-cluster", + "Status": "modifying" + } + ] + } + ] + + try: + # Call the function with the mocked client + describe_db_clusters(mock_neptune_client) + + # Capture stdout + captured = capsys.readouterr() + + # Check that expected outputs from both pages were printed + assert "my-test-cluster" in captured.out + assert "available" in captured.out + assert "my-second-cluster" in captured.out + assert "modifying" in captured.out + + # Ensure get_paginator was called with correct operation + mock_neptune_client.get_paginator.assert_called_once_with("describe_db_clusters") + + # Ensure paginate method was called + mock_paginator.paginate.assert_called_once() + + except ClientError as e: + pytest.fail(f"AWS ClientError occurred: {e.response['Error']['Message']}") + except Exception as e: + pytest.fail(f"Unexpected error: {str(e)}") diff --git a/python/example_code/neptune/tests/test_neptune_scenario_integration.py b/python/example_code/neptune/tests/test_neptune_scenario_integration.py new file mode 100644 index 00000000000..5d16d899de4 --- /dev/null +++ b/python/example_code/neptune/tests/test_neptune_scenario_integration.py @@ -0,0 +1,43 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 + +import boto3 +import builtins +import pytest +from unittest.mock import patch + +# Import your scenario file; ensure Python can locate it. +# If your file is named `neptune_scenario.py`, this import will work: +import NeptuneScenario + + +@pytest.mark.integration +def test_neptune_run_scenario(monkeypatch): + # Patch input() to simulate all required inputs + input_sequence = iter([ + "c", # Step 1: create subnet group + "c", # Step 2: create cluster + "c", # Step 3: create instance + "c", # Step 4: wait for instance + "c", # Step 5: describe cluster + "c", # Step 6: stop cluster + "c", # Step 7: start cluster + "y" # Step 8: delete resources + ]) + + monkeypatch.setattr(builtins, "input", lambda: next(input_sequence)) + + # You can override these to make test-friendly unique names + subnet_group_name = "test-subnet-group-inte112" + cluster_name = "test-cluster-integ11" + db_instance_id = "test-db-instance-integ11" + + neptune_client = boto3.client("neptune", region_name="us-east-1") + + # Run the full scenario + NeptuneScenario.run_scenario( + neptune_client, + subnet_group_name=subnet_group_name, + db_instance_id=db_instance_id, + cluster_name=cluster_name, + ) diff --git a/python/example_code/neptune/tests/test_start_db_cluster.py b/python/example_code/neptune/tests/test_start_db_cluster.py new file mode 100644 index 00000000000..9deefc5fda8 --- /dev/null +++ b/python/example_code/neptune/tests/test_start_db_cluster.py @@ -0,0 +1,90 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 + + +import pytest +from unittest.mock import MagicMock, patch +from botocore.exceptions import ClientError + +from neptune_scenario import start_db_cluster # Update import if needed + +# Speed up test runs +POLL_INTERVAL_SECONDS = 0.1 +TIMEOUT_SECONDS = 0.3 + + +@patch("NeptuneScenario.time.sleep", return_value=None) # mock sleep +@patch("NeptuneScenario.POLL_INTERVAL_SECONDS", POLL_INTERVAL_SECONDS) +@patch("NeptuneScenario.TIMEOUT_SECONDS", TIMEOUT_SECONDS) +def test_start_db_cluster(mock_sleep): + """ + Unit test for start_db_cluster(). + Covers success, timeout, start failure, and paginator failure. + """ + # --- Success case --- + mock_neptune = MagicMock() + paginator_mock = MagicMock() + mock_neptune.get_paginator.return_value = paginator_mock + + # start_db_cluster returns nothing + mock_neptune.start_db_cluster.return_value = {} + + # First call returns "starting", second returns "available" + paginator_mock.paginate.side_effect = [ + [{'DBClusters': [{'Status': 'starting'}]}], + [{'DBClusters': [{'Status': 'available'}]}] + ] + + start_db_cluster(mock_neptune, "my-cluster") + + mock_neptune.start_db_cluster.assert_called_once_with(DBClusterIdentifier="my-cluster") + mock_neptune.get_paginator.assert_called_once_with("describe_db_clusters") + assert paginator_mock.paginate.call_count == 2 + + # --- Timeout case --- + mock_neptune.reset_mock() + paginator_mock = MagicMock() + mock_neptune.get_paginator.return_value = paginator_mock + + def always_starting(*args, **kwargs): + return [{'DBClusters': [{'Status': 'starting'}]}] + + paginator_mock.paginate.side_effect = always_starting + mock_neptune.start_db_cluster.return_value = {} + + with pytest.raises(RuntimeError, match="Timeout waiting for cluster 'timeout-cluster' to become available."): + start_db_cluster(mock_neptune, "timeout-cluster") + + # --- start_db_cluster throws ClientError --- + mock_neptune.start_db_cluster.side_effect = ClientError( + { + "Error": { + "Code": "AccessDenied", + "Message": "Permission denied" + } + }, + operation_name="StartDBCluster" + ) + + with pytest.raises(ClientError) as exc_info: + start_db_cluster(mock_neptune, "fail-cluster") + assert exc_info.value.response["Error"]["Code"] == "AccessDenied" + + # --- Paginator throws ClientError --- + mock_neptune.start_db_cluster.side_effect = None # reset + paginator_mock = MagicMock() + mock_neptune.get_paginator.return_value = paginator_mock + + paginator_mock.paginate.side_effect = ClientError( + { + "Error": { + "Code": "Throttling", + "Message": "Too many requests" + } + }, + operation_name="DescribeDBClusters" + ) + + with pytest.raises(ClientError) as exc_info: + start_db_cluster(mock_neptune, "paginator-error") + assert exc_info.value.response["Error"]["Code"] == "Throttling" diff --git a/python/example_code/neptune/tests/test_stop_db_cluster.py b/python/example_code/neptune/tests/test_stop_db_cluster.py new file mode 100644 index 00000000000..c35635617fd --- /dev/null +++ b/python/example_code/neptune/tests/test_stop_db_cluster.py @@ -0,0 +1,88 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 + + +import pytest +from unittest.mock import MagicMock, patch +from botocore.exceptions import ClientError + +from neptune_scenario import stop_db_cluster # Update as needed + +# Use small values to speed up the test +POLL_INTERVAL_SECONDS = 0.1 +TIMEOUT_SECONDS = 0.3 + + +@patch("NeptuneScenario.time.sleep", return_value=None) # avoid actual delay +@patch("NeptuneScenario.POLL_INTERVAL_SECONDS", POLL_INTERVAL_SECONDS) +@patch("NeptuneScenario.TIMEOUT_SECONDS", TIMEOUT_SECONDS) +def test_stop_db_cluster(mock_sleep): + """ + Unit test for stop_db_cluster(). + Covers: success, timeout, stop call failure, and paginator failure. + """ + # --- Success case --- + mock_neptune = MagicMock() + paginator_mock = MagicMock() + mock_neptune.get_paginator.return_value = paginator_mock + + # First response: stopping, then stopped + paginator_mock.paginate.side_effect = [ + [{'DBClusters': [{'Status': 'stopping'}]}], + [{'DBClusters': [{'Status': 'stopped'}]}] + ] + mock_neptune.stop_db_cluster.return_value = {} + + stop_db_cluster(mock_neptune, "my-cluster") + + mock_neptune.stop_db_cluster.assert_called_once_with(DBClusterIdentifier="my-cluster") + mock_neptune.get_paginator.assert_called_once_with("describe_db_clusters") + assert paginator_mock.paginate.call_count == 2 + + # --- Timeout case --- + mock_neptune.reset_mock() + paginator_mock = MagicMock() + mock_neptune.get_paginator.return_value = paginator_mock + + def always_stopping(*args, **kwargs): + return [{'DBClusters': [{'Status': 'stopping'}]}] + + paginator_mock.paginate.side_effect = always_stopping + mock_neptune.stop_db_cluster.return_value = {} + + with pytest.raises(RuntimeError, match="Timeout waiting for cluster 'timeout-cluster' to stop."): + stop_db_cluster(mock_neptune, "timeout-cluster") + + # --- stop_db_cluster raises ClientError --- + mock_neptune.stop_db_cluster.side_effect = ClientError( + { + "Error": { + "Code": "AccessDenied", + "Message": "Not authorized" + } + }, + operation_name="StopDBCluster" + ) + + with pytest.raises(ClientError) as exc_info: + stop_db_cluster(mock_neptune, "fail-cluster") + assert exc_info.value.response["Error"]["Code"] == "AccessDenied" + + # --- Paginator throws ClientError --- + mock_neptune.stop_db_cluster.side_effect = None # clear previous error + paginator_mock = MagicMock() + mock_neptune.get_paginator.return_value = paginator_mock + + paginator_mock.paginate.side_effect = ClientError( + { + "Error": { + "Code": "Throttling", + "Message": "Too many requests" + } + }, + operation_name="DescribeDBClusters" + ) + + with pytest.raises(ClientError) as exc_info: + stop_db_cluster(mock_neptune, "paginator-error") + assert exc_info.value.response["Error"]["Code"] == "Throttling" diff --git a/scenarios/basics/neptune/SPECIFICATION.md b/scenarios/basics/neptune/SPECIFICATION.md index 7575ebc8be1..12220f08a9d 100644 --- a/scenarios/basics/neptune/SPECIFICATION.md +++ b/scenarios/basics/neptune/SPECIFICATION.md @@ -41,7 +41,9 @@ The key advantage of the `NeptuneAsyncClient` is its ability to provide fine-gra This Basics scenario does not require any additional AWS resources. ## Hello Amazon Neptune -This program is intended for users not familiar with Amazon Neptune to easily get up and running. The program invokes `describeDBClustersPaginator`to iterate through subnet groups. +This program is intended for users not familiar with Amazon Neptune to easily get up and running. The program invokes `describeDBClustersPaginator`to iterate through subnet groups. ' + +Exception Handling: Check to see if a `ResourceNotFoundException` is thrown. ## Basics Scenario Program Flow The Amazon Neptune Basics scenario executes the following operations. @@ -61,7 +63,7 @@ The Amazon Neptune Basics scenario executes the following operations. 4. **Check the status of the Neptune DB Instance**: - Description: Check the status of the DB instance by invoking `describeDBInstances`. Poll the instance until it reaches an `availbale`state. - - Exception Handling: This operatioin handles a `CompletionException`. If thrown, display the message and end the program. + - Exception Handling: This operatioin handles a `ResourceNotFoundException`. If thrown, display the message and end the program. 5. **Show Neptune Cluster details**: - Description: Shows the details of the cluster by invoking `describeDBClusters`. @@ -275,15 +277,30 @@ The following table describes the metadata used in this Basics Scenario. The met |`createDBSubnetGroup` | neptune_CreateDBSubnetGroup | |`createDBCluster` | neptune_CreateDBCluster | |`createDBInstance` | neptune_CreateDBInstance | -|`describeDBInstances ` | neptune_DescribeDBInstances | +|`describeDBInstances` | neptune_DescribeDBInstances | |`describeDBClusters` | neptune_DescribeDBClusters | | `stopDBCluster` | neptune_StopDBCluster | -|`startDBCluster ` | neptune_StartDBCluster | -|`deleteDBInstance ` | neptune_DeleteDBInstance | -| `deleteDBCluster` | neptune_DeleteDBCluster | -| `deleteDBSubnetGroup `| neptune_DeleteDBSubnetGroup | -| `scenario` | neptune_Scenario | -| `hello` | neptune_Hello | +|`startDBCluster` | neptune_StartDBCluster | +|`deleteDBInstance` | neptune_DeleteDBInstance | +|`deleteDBCluster` | neptune_DeleteDBCluster | +|`deleteDBSubnetGroup` | neptune_DeleteDBSubnetGroup | +|`scenario` | neptune_Scenario | +|`hello` | neptune_Hello | + +### Additional SOS Tags +We will add additional code examples to the AWS Code Library. These code examples were created by the SME. These APIs cannot be used in the main scenario because you must run them from within the same VPC as the cluster. There is no console access. However, we will still add them to the AWS Code Library. + +This table decribes the SOS tags for NeptunedataClient and NeptuneGraphClient. + +| action | metadata key | +|-------------------------------|------------------------------------- | +|`executeGremlinProfileQuery` | neptune_ExecuteGremlinProfileQuery | +|`executeGremlinQuery` | neptune_ExecuteGremlinQuery | +|`executeOpenCypherExplainQuery`| | +|`createGraph ` | neptune_CreateGraph: | +|`executeQuery` | neptune_ExecuteQuery | +NOTE +As there is limited room in aboce table, the metadata key for `executeOpenCypherExplainQuery`is neptune_ExecuteOpenCypherExplainQuery. \ No newline at end of file