Hay más ejemplos de AWS SDK disponibles en el GitHub repositorio de ejemplos de AWS Doc SDK
Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.
Ejemplos de Neptune usando el SDK para Python (Boto3)
Los siguientes ejemplos de código muestran cómo realizar acciones e implementar escenarios comunes mediante el uso de AWS SDK para Python (Boto3) Neptune.
Los conceptos básicos son ejemplos de código que muestran cómo realizar las operaciones esenciales dentro de un servicio.
Las acciones son extractos de código de programas más grandes y deben ejecutarse en contexto. Mientras las acciones muestran cómo llamar a las distintas funciones de servicio, es posible ver las acciones en contexto en los escenarios relacionados.
En cada ejemplo se incluye un enlace al código de origen completo, con instrucciones de configuración y ejecución del código en el contexto.
Introducción
Los siguientes ejemplos de código muestran cómo empezar a utilizar Neptune.
- SDK para Python (Boto3)
-
nota
Hay más información al respecto. GitHub Busque el ejemplo completo y aprenda a configurar y ejecutar en el Repositorio de ejemplos de código de AWS
. import boto3 from botocore.exceptions import ClientError def describe_db_clusters(neptune_client): """ Describes the Amazon Neptune DB clusters using a paginator to handle multiple pages. Raises ClientError with 'ResourceNotFoundException' if no clusters are found. """ paginator = neptune_client.get_paginator("describe_db_clusters") clusters_found = False for page in paginator.paginate(): for cluster in page.get("DBClusters", []): clusters_found = True print(f"Cluster Identifier: {cluster['DBClusterIdentifier']}") print(f"Status: {cluster['Status']}") if not clusters_found: raise ClientError( { "Error": { "Code": "ResourceNotFoundException", "Message": "No Neptune DB clusters found." } }, operation_name="DescribeDBClusters" ) def main(): """ Main entry point: creates the Neptune client and calls the describe operation. """ neptune_client = boto3.client("neptune") try: describe_db_clusters(neptune_client) except ClientError as e: error_code = e.response["Error"]["Code"] if error_code == "ResourceNotFoundException": print(f"Resource not found: {e.response['Error']['Message']}") else: print(f"Unexpected ClientError: {e.response['Error']['Message']}") except Exception as e: print(f"Unexpected error: {str(e)}") if __name__ == "__main__": main()
-
Para obtener más información sobre la API, consulta Describe DBClusters Paginator en la referencia de la API AWS del SDK for Python (Boto3).
-
Conceptos básicos
En el siguiente ejemplo de código, se muestra cómo:
Cree un grupo de subredes de Amazon Neptune.
Crea un cúmulo de Neptuno.
Cree una instancia de Neptune.
Compruebe el estado de la instancia de Neptune.
Muestra los detalles del cúmulo de Neptune.
Detenga el cúmulo de Neptuno.
Inicie el cúmulo de Neptuno.
Elimine los activos de Neptune.
- SDK para Python (Boto3)
-
nota
Hay más en marcha. GitHub Busque el ejemplo completo y aprenda a configurar y ejecutar en el Repositorio de ejemplos de código de AWS
. import boto3 import time from botocore.exceptions import ClientError # Constants used in this scenario POLL_INTERVAL_SECONDS = 10 TIMEOUT_SECONDS = 1200 # 20 minutes def delete_db_cluster(neptune_client, cluster_id: str): """ Deletes a Neptune DB cluster and throws exceptions to the caller. Args: neptune_client (boto3.client): The Neptune client object. cluster_id (str): The ID of the Neptune DB cluster to be deleted. Raises: ClientError: If the delete operation fails. """ request = { 'DBClusterIdentifier': cluster_id, 'SkipFinalSnapshot': True } try: print(f"Deleting DB Cluster: {cluster_id}") neptune_client.delete_db_cluster(**request) except ClientError as err: code = err.response["Error"]["Code"] message = err.response["Error"]["Message"] if code == "DBClusterNotFoundFault": print(f"Cluster '{cluster_id}' not found or already deleted.") elif code == "AccessDeniedException": print("Access denied. Please ensure you have the necessary permissions.") else: print(f"Couldn't delete DB cluster. {code}: {message}") raise def format_elapsed_time(seconds: int) -> str: mins, secs = divmod(seconds, 60) hours, mins = divmod(mins, 60) return f"{hours:02}:{mins:02}:{secs:02}" def delete_db_instance(neptune_client, instance_id: str): """ Deletes a Neptune DB instance and waits for its deletion to complete. Raises exception to be handled by calling code. """ print(f"Initiating deletion of DB Instance: {instance_id}") try: neptune_client.delete_db_instance( DBInstanceIdentifier=instance_id, SkipFinalSnapshot=True ) print(f"Waiting for DB Instance '{instance_id}' to be deleted...") waiter = neptune_client.get_waiter('db_instance_deleted') waiter.wait( DBInstanceIdentifier=instance_id, WaiterConfig={ 'Delay': 30, 'MaxAttempts': 40 } ) print(f"DB Instance '{instance_id}' successfully deleted.") except ClientError as err: code = err.response["Error"]["Code"] message = err.response["Error"]["Message"] if code == "DBInstanceNotFoundFault": print(f"Instance '{instance_id}' not found or already deleted.") elif code == "AccessDeniedException": print("Access denied. Please ensure you have the necessary permissions.") else: print(f"Couldn't delete DB instance. {code}: {message}") raise def delete_db_subnet_group(neptune_client, subnet_group_name): """ Deletes a Neptune DB subnet group synchronously using Boto3. Args: neptune_client (boto3.client): The Neptune client. subnet_group_name (str): The name of the DB subnet group to delete. Raises: ClientError: If the delete operation fails. """ delete_group_request = { 'DBSubnetGroupName': subnet_group_name } try: neptune_client.delete_db_subnet_group(**delete_group_request) print(f"️ Deleting Subnet Group: {subnet_group_name}") except ClientError as err: code = err.response["Error"]["Code"] message = err.response["Error"]["Message"] if code == "DBSubnetGroupNotFoundFault": print(f"Subnet group '{subnet_group_name}' not found or already deleted.") elif code == "AccessDeniedException": print("Access denied. Please ensure you have the necessary permissions.") else: print(f"Couldn't delete subnet group. {code}: {message}") raise def wait_for_cluster_status( neptune_client, cluster_id: str, desired_status: str, timeout_seconds: int = TIMEOUT_SECONDS, poll_interval_seconds: int = POLL_INTERVAL_SECONDS ): """ Waits for a Neptune DB cluster to reach a desired status. Args: neptune_client (boto3.client): The Amazon Neptune client. cluster_id (str): The identifier of the Neptune DB cluster. desired_status (str): The target status (e.g., "available", "stopped"). timeout_seconds (int): Max time to wait in seconds (default: 1200). poll_interval_seconds (int): Polling interval in seconds (default: 10). Raises: RuntimeError: If the desired status is not reached before timeout. """ print(f"Waiting for cluster '{cluster_id}' to reach status '{desired_status}'...") start_time = time.time() while True: # Prepare request object describe_cluster_request = { 'DBClusterIdentifier': cluster_id } # Call the Neptune API response = neptune_client.describe_db_clusters(**describe_cluster_request) clusters = response.get('DBClusters', []) current_status = clusters[0].get('Status') if clusters else None elapsed_seconds = int(time.time() - start_time) status_str = current_status if current_status else "Unknown" print( f"\r Elapsed: {format_elapsed_time(elapsed_seconds):<20} Cluster status: {status_str:<20}", end="", flush=True ) if current_status and current_status.lower() == desired_status.lower(): print( f"\nNeptune cluster reached desired status '{desired_status}' after {format_elapsed_time(elapsed_seconds)}." ) return if elapsed_seconds > timeout_seconds: raise RuntimeError(f"Timeout waiting for Neptune cluster to reach status: {desired_status}") time.sleep(poll_interval_seconds) def start_db_cluster(neptune_client, cluster_identifier: str): """ Starts an Amazon Neptune DB cluster and waits until it reaches 'available'. Args: neptune_client (boto3.client): The Neptune client. cluster_identifier (str): The DB cluster identifier. Raises: ClientError: Propagates AWS API issues like resource not found. RuntimeError: If cluster doesn't reach 'available' within timeout. """ try: # Initial wait in case the cluster was just stopped time.sleep(30) neptune_client.start_db_cluster(DBClusterIdentifier=cluster_identifier) except ClientError as err: code = err.response["Error"]["Code"] message = err.response["Error"]["Message"] if code == "AccessDeniedException": print("Access denied. Please ensure you have the necessary permissions.") else: print(f"Couldn't start DB cluster. Here's why: {code}: {message}") raise start_time = time.time() paginator = neptune_client.get_paginator('describe_db_clusters') while True: try: pages = paginator.paginate(DBClusterIdentifier=cluster_identifier) clusters = [] for page in pages: clusters.extend(page.get('DBClusters', [])) except ClientError as err: code = err.response["Error"]["Code"] message = err.response["Error"]["Message"] if code == "DBClusterNotFound": print(f"Cluster '{cluster_identifier}' not found while polling. It may have been deleted.") else: print(f"Couldn't describe DB cluster. Here's why: {code}: {message}") raise status = clusters[0].get('Status') if clusters else None elapsed = time.time() - start_time print(f"\rElapsed: {int(elapsed)}s – Cluster status: {status}", end="", flush=True) if status and status.lower() == 'available': print(f"\n🎉 Cluster '{cluster_identifier}' is available.") return if elapsed > TIMEOUT_SECONDS: raise RuntimeError(f"Timeout waiting for cluster '{cluster_identifier}' to become available.") time.sleep(POLL_INTERVAL_SECONDS) def stop_db_cluster(neptune_client, cluster_identifier: str): """ Stops an Amazon Neptune DB cluster and waits until it's fully stopped. Args: neptune_client (boto3.client): The Neptune client. cluster_identifier (str): The DB cluster identifier. Raises: ClientError: For AWS API errors (e.g., resource not found). RuntimeError: If the cluster doesn't stop within the timeout. """ try: neptune_client.stop_db_cluster(DBClusterIdentifier=cluster_identifier) except ClientError as err: code = err.response["Error"]["Code"] message = err.response["Error"]["Message"] if code == "AccessDeniedException": print("Access denied. Please ensure you have the necessary permissions.") else: print(f"Couldn't stop DB cluster. Here's why: {code}: {message}") raise start_time = time.time() paginator = neptune_client.get_paginator('describe_db_clusters') while True: try: pages = paginator.paginate(DBClusterIdentifier=cluster_identifier) clusters = [] for page in pages: clusters.extend(page.get('DBClusters', [])) except ClientError as err: code = err.response["Error"]["Code"] message = err.response["Error"]["Message"] if code == "DBClusterNotFound": print(f"Cluster '{cluster_identifier}' not found while polling. It may have been deleted.") else: print(f"Couldn't describe DB cluster. Here's why: {code}: {message}") raise status = clusters[0].get('Status') if clusters else None elapsed = time.time() - start_time print(f"\rElapsed: {int(elapsed)}s – Cluster status: {status}", end="", flush=True) if status and status.lower() == 'stopped': print(f"\nCluster '{cluster_identifier}' is now stopped.") return if elapsed > TIMEOUT_SECONDS: raise RuntimeError(f"Timeout waiting for cluster '{cluster_identifier}' to stop.") time.sleep(POLL_INTERVAL_SECONDS) def describe_db_clusters(neptune_client, cluster_id: str): """ Describes details of a Neptune DB cluster, paginating if needed. Args: neptune_client (boto3.client): The Neptune client. cluster_id (str): The ID of the cluster to describe. Raises: ClientError: If there's an AWS API error (e.g., cluster not found). """ paginator = neptune_client.get_paginator('describe_db_clusters') try: pages = paginator.paginate(DBClusterIdentifier=cluster_id) found = False for page in pages: for cluster in page.get('DBClusters', []): found = True print(f"Cluster Identifier: {cluster.get('DBClusterIdentifier')}") print(f"Status: {cluster.get('Status')}") print(f"Engine: {cluster.get('Engine')}") print(f"Engine Version: {cluster.get('EngineVersion')}") print(f"Endpoint: {cluster.get('Endpoint')}") print(f"Reader Endpoint: {cluster.get('ReaderEndpoint')}") print(f"Availability Zones: {cluster.get('AvailabilityZones')}") print(f"Subnet Group: {cluster.get('DBSubnetGroup')}") print("VPC Security Groups:") for vpc_group in cluster.get('VpcSecurityGroups', []): print(f" - {vpc_group.get('VpcSecurityGroupId')}") print(f"Storage Encrypted: {cluster.get('StorageEncrypted')}") print(f"IAM Auth Enabled: {cluster.get('IAMDatabaseAuthenticationEnabled')}") print(f"Backup Retention Period: {cluster.get('BackupRetentionPeriod')} days") print(f"Preferred Backup Window: {cluster.get('PreferredBackupWindow')}") print(f"Preferred Maintenance Window: {cluster.get('PreferredMaintenanceWindow')}") print("------") if not found: # Treat empty response as cluster not found raise ClientError( {"Error": {"Code": "DBClusterNotFound", "Message": f"No cluster found with ID '{cluster_id}'"}}, "DescribeDBClusters" ) except ClientError as err: code = err.response["Error"]["Code"] message = err.response["Error"]["Message"] if code == "AccessDeniedException": print("Access denied. Please ensure you have the necessary permissions.") elif code == "DBClusterNotFound": print(f"Cluster '{cluster_id}' not found. Please verify the cluster ID.") else: print(f"Couldn't describe DB cluster. Here's why: {code}: {message}") raise def check_instance_status(neptune_client, instance_id: str, desired_status: str): """ Polls the status of a Neptune DB instance until it reaches desired_status. Uses pagination via describe_db_instances — even for a single instance. Raises: ClientError: If describe_db_instances fails (e.g., instance not found). RuntimeError: If timeout expires before reaching desired status. """ paginator = neptune_client.get_paginator('describe_db_instances') start_time = time.time() while True: try: pages = paginator.paginate(DBInstanceIdentifier=instance_id) instances = [] for page in pages: instances.extend(page.get('DBInstances', [])) except ClientError as err: code = err.response["Error"]["Code"] message = err.response["Error"]["Message"] if code == "DBInstanceNotFound": print(f"Instance '{instance_id}' not found. Please verify the instance ID.") else: print(f"Failed to describe DB instance. {code}: {message}") raise current_status = instances[0].get('DBInstanceStatus') if instances else None elapsed = int(time.time() - start_time) print(f"\rElapsed: {format_elapsed_time(elapsed)} Status: {current_status}", end="", flush=True) if current_status and current_status.lower() == desired_status.lower(): print(f"\nInstance '{instance_id}' reached '{desired_status}' in {format_elapsed_time(elapsed)}.") return if elapsed > TIMEOUT_SECONDS: raise RuntimeError(f"Timeout waiting for '{instance_id}' to reach '{desired_status}'") time.sleep(POLL_INTERVAL_SECONDS) def create_db_instance(neptune_client, db_instance_id: str, db_cluster_id: str) -> str: try: request = { 'DBInstanceIdentifier': db_instance_id, 'DBInstanceClass': 'db.r5.large', 'Engine': 'neptune', 'DBClusterIdentifier': db_cluster_id } print(f"Creating Neptune DB Instance: {db_instance_id}") response = neptune_client.create_db_instance(**request) instance = response.get('DBInstance') if not instance or 'DBInstanceIdentifier' not in instance: raise RuntimeError("Instance creation succeeded but no ID returned.") print(f"Waiting for DB Instance '{db_instance_id}' to become available...") waiter = neptune_client.get_waiter('db_instance_available') waiter.wait( DBInstanceIdentifier=db_instance_id, WaiterConfig={'Delay': 30, 'MaxAttempts': 40} ) print(f"DB Instance '{db_instance_id}' is now available.") return instance['DBInstanceIdentifier'] except ClientError as err: code = err.response["Error"]["Code"] message = err.response["Error"]["Message"] if code == "AccessDeniedException": print("Access denied. Please ensure you have the necessary permissions.") else: print(f"Couldn't create DB instance. Here's why: {code}: {message}") raise except Exception as e: print(f"Unexpected error creating DB instance '{db_instance_id}': {e}") raise RuntimeError(f"Unexpected error creating DB instance '{db_instance_id}': {e}") from e def create_db_cluster(neptune_client, db_name: str) -> str: """ Creates a Neptune DB cluster and returns its identifier. Args: neptune_client (boto3.client): The Neptune client object. db_name (str): The desired cluster identifier. Returns: str: The DB cluster identifier. Raises: RuntimeError: For any failure or AWS error, with a user-friendly message. """ request = { 'DBClusterIdentifier': db_name, 'Engine': 'neptune', 'DeletionProtection': False, 'BackupRetentionPeriod': 1 } try: response = neptune_client.create_db_cluster(**request) cluster = response.get('DBCluster') or {} cluster_id = cluster.get('DBClusterIdentifier') if not cluster_id: raise RuntimeError("Cluster created but no ID returned.") print(f"DB Cluster created: {cluster_id}") return cluster_id except ClientError as e: code = e.response["Error"]["Code"] message = e.response["Error"]["Message"] if code in ("ServiceQuotaExceededException", "DBClusterQuotaExceededFault"): raise RuntimeError("You have exceeded the quota for Neptune DB clusters.") from e else: raise RuntimeError(f"AWS error [{code}]: {message}") from e except Exception as e: raise RuntimeError(f"Unexpected error creating DB cluster '{db_name}': {e}") from e def get_subnet_ids(vpc_id: str) -> list[str]: ec2_client = boto3.client('ec2') describe_subnets_request = { 'Filters': [{'Name': 'vpc-id', 'Values': [vpc_id]}] } response = ec2_client.describe_subnets(**describe_subnets_request) subnets = response.get('Subnets', []) subnet_ids = [subnet['SubnetId'] for subnet in subnets if 'SubnetId' in subnet] return subnet_ids def get_default_vpc_id() -> str: ec2_client = boto3.client('ec2') describe_vpcs_request = { 'Filters': [{'Name': 'isDefault', 'Values': ['true']}] } response = ec2_client.describe_vpcs(**describe_vpcs_request) vpcs = response.get('Vpcs', []) if not vpcs: raise RuntimeError("No default VPC found in this region.") default_vpc_id = vpcs[0]['VpcId'] print(f"Default VPC ID: {default_vpc_id}") return default_vpc_id def create_subnet_group(neptune_client, group_name: str): """ Creates a Neptune DB subnet group and returns its name and ARN. Args: neptune_client (boto3.client): The Neptune client object. group_name (str): The desired name of the subnet group. Returns: tuple(str, str): (subnet_group_name, subnet_group_arn) Raises: RuntimeError: For quota errors or other AWS-related failures. """ vpc_id = get_default_vpc_id() subnet_ids = get_subnet_ids(vpc_id) request = { 'DBSubnetGroupName': group_name, 'DBSubnetGroupDescription': 'My Neptune subnet group', 'SubnetIds': subnet_ids, 'Tags': [{'Key': 'Environment', 'Value': 'Dev'}] } try: response = neptune_client.create_db_subnet_group(**request) sg = response.get("DBSubnetGroup", {}) name = sg.get("DBSubnetGroupName") arn = sg.get("DBSubnetGroupArn") if not name or not arn: raise RuntimeError("Response missing subnet group name or ARN.") print(f"Subnet group created: {name}") print(f"ARN: {arn}") return name, arn except ClientError as e: code = e.response["Error"]["Code"] msg = e.response["Error"]["Message"] if code == "ServiceQuotaExceededException": print("Subnet group quota exceeded.") raise RuntimeError("Subnet group quota exceeded.") from e else: print(f"AWS error [{code}]: {msg}") raise RuntimeError(f"AWS error [{code}]: {msg}") from e except Exception as e: print(f"Unexpected error creating subnet group '{group_name}': {e}") raise RuntimeError(f"Unexpected error creating subnet group '{group_name}': {e}") from e def wait_for_input_to_continue(): input("\nPress <ENTER> to continue...") print("Continuing with the program...\n") def run_scenario(neptune_client, subnet_group_name: str, db_instance_id: str, cluster_name: str): print("-" * 88) print("1. Create a Neptune DB Subnet Group") wait_for_input_to_continue() try: name, arn = create_subnet_group(neptune_client, subnet_group_name) print(f"Subnet group successfully created: {name}") print("-" * 88) print("2. Create a Neptune Cluster") wait_for_input_to_continue() db_cluster_id = create_db_cluster(neptune_client, cluster_name) print("-" * 88) print("3. Create a Neptune DB Instance") wait_for_input_to_continue() create_db_instance(neptune_client, db_instance_id, cluster_name) print("-" * 88) print("4. Check the status of the Neptune DB Instance") print(""" Even though you're targeting a single DB instance, describe_db_instances supports pagination and can return multiple pages. Handling paginated responses ensures your method continues to work reliably even if AWS returns large or paged results. """) wait_for_input_to_continue() check_instance_status(neptune_client, db_instance_id, "available") print("-" * 88) print("5. Show Neptune Cluster details") wait_for_input_to_continue() describe_db_clusters(neptune_client, db_cluster_id) print("-" * 88) print("6. Stop the Amazon Neptune cluster") print(""" Boto3 doesn't currently offer a built-in waiter for stop_db_cluster, This example implements a custom polling strategy until the cluster is in a stopped state. """) wait_for_input_to_continue() stop_db_cluster(neptune_client, db_cluster_id) check_instance_status(neptune_client, db_instance_id, "stopped") print("-" * 88) print("7. Start the Amazon Neptune cluster") print(""" Boto3 doesn't currently offer a built-in waiter for start_db_cluster, This example implements a custom polling strategy until the cluster is in an available state. """) wait_for_input_to_continue() start_db_cluster(neptune_client, db_cluster_id) wait_for_cluster_status(neptune_client, db_cluster_id, "available") check_instance_status(neptune_client, db_instance_id, "available") print("All Neptune resources are now available.") print("-" * 88) print("-" * 88) print("8. Delete the Neptune Assets") print("Would you like to delete the Neptune Assets? (y/n)") del_ans = input().strip().lower() if del_ans == "y": print("You selected to delete the Neptune assets.") delete_db_instance(neptune_client, db_instance_id) delete_db_cluster(neptune_client, db_cluster_id) delete_db_subnet_group(neptune_client, subnet_group_name) print("Neptune resources deleted successfully") except ClientError as ce: code = ce.response["Error"]["Code"] if code in ("DBInstanceNotFound", "DBInstanceNotFoundFault", "ResourceNotFound"): print(f"Instance '{db_instance_id}' not found.") elif code in ("DBClusterNotFound", "DBClusterNotFoundFault", "ResourceNotFoundFault"): print(f"Cluster '{cluster_name}' not found.") elif code == "DBSubnetGroupNotFoundFault": print(f"Subnet group '{subnet_group_name}' not found.") elif code == "AccessDeniedException": print("Access denied. Please ensure you have the necessary permissions.") else: print(f"AWS error [{code}]: {ce.response['Error']['Message']}") raise # re-raise unexpected errors except RuntimeError as re: print(f"Runtime error or timeout: {re}") def main(): neptune_client = boto3.client('neptune') # Customize the following names to match your Neptune setup # (You must change these to unique values for your environment) subnet_group_name = "neptuneSubnetGroup111" cluster_name = "neptuneCluster111" db_instance_id = "neptuneDB111" print(""" Amazon Neptune is a fully managed graph database service by AWS... Let's get started! """) wait_for_input_to_continue() run_scenario(neptune_client, subnet_group_name, db_instance_id, cluster_name) print(""" Thank you for checking out the Amazon Neptune Service Use demo. For more AWS code examples, visit: https://docs.aws.amazon.com/code-library/latest/ug/what-is-code-library.html """) if __name__ == "__main__": main()
Acciones
En el siguiente ejemplo de código, se muestra cómo utilizar CreateDBCluster
.
- SDK para Python (Boto3)
-
nota
Hay más información GitHub. Busque el ejemplo completo y aprenda a configurar y ejecutar en el Repositorio de ejemplos de código de AWS
. def create_db_cluster(neptune_client, db_name: str) -> str: """ Creates a Neptune DB cluster and returns its identifier. Args: neptune_client (boto3.client): The Neptune client object. db_name (str): The desired cluster identifier. Returns: str: The DB cluster identifier. Raises: RuntimeError: For any failure or AWS error, with a user-friendly message. """ request = { 'DBClusterIdentifier': db_name, 'Engine': 'neptune', 'DeletionProtection': False, 'BackupRetentionPeriod': 1 } try: response = neptune_client.create_db_cluster(**request) cluster = response.get('DBCluster') or {} cluster_id = cluster.get('DBClusterIdentifier') if not cluster_id: raise RuntimeError("Cluster created but no ID returned.") print(f"DB Cluster created: {cluster_id}") return cluster_id except ClientError as e: code = e.response["Error"]["Code"] message = e.response["Error"]["Message"] if code in ("ServiceQuotaExceededException", "DBClusterQuotaExceededFault"): raise RuntimeError("You have exceeded the quota for Neptune DB clusters.") from e else: raise RuntimeError(f"AWS error [{code}]: {message}") from e except Exception as e: raise RuntimeError(f"Unexpected error creating DB cluster '{db_name}': {e}") from e
-
Para obtener más información sobre la API, consulta la referencia de la API Create DBCluster in AWS SDK for Python (Boto3).
-
En el siguiente ejemplo de código, se muestra cómo utilizar CreateDBInstance
.
- SDK para Python (Boto3)
-
nota
Hay más información al respecto. GitHub Busque el ejemplo completo y aprenda a configurar y ejecutar en el Repositorio de ejemplos de código de AWS
. def create_db_instance(neptune_client, db_instance_id: str, db_cluster_id: str) -> str: try: request = { 'DBInstanceIdentifier': db_instance_id, 'DBInstanceClass': 'db.r5.large', 'Engine': 'neptune', 'DBClusterIdentifier': db_cluster_id } print(f"Creating Neptune DB Instance: {db_instance_id}") response = neptune_client.create_db_instance(**request) instance = response.get('DBInstance') if not instance or 'DBInstanceIdentifier' not in instance: raise RuntimeError("Instance creation succeeded but no ID returned.") print(f"Waiting for DB Instance '{db_instance_id}' to become available...") waiter = neptune_client.get_waiter('db_instance_available') waiter.wait( DBInstanceIdentifier=db_instance_id, WaiterConfig={'Delay': 30, 'MaxAttempts': 40} ) print(f"DB Instance '{db_instance_id}' is now available.") return instance['DBInstanceIdentifier'] except ClientError as err: code = err.response["Error"]["Code"] message = err.response["Error"]["Message"] if code == "AccessDeniedException": print("Access denied. Please ensure you have the necessary permissions.") else: print(f"Couldn't create DB instance. Here's why: {code}: {message}") raise except Exception as e: print(f"Unexpected error creating DB instance '{db_instance_id}': {e}") raise RuntimeError(f"Unexpected error creating DB instance '{db_instance_id}': {e}") from e
-
Para obtener más información sobre la API, consulta la referencia de la API Create DBInstance in AWS SDK for Python (Boto3).
-
En el siguiente ejemplo de código, se muestra cómo utilizar CreateDBSubnetGroup
.
- SDK para Python (Boto3)
-
nota
Hay más información al respecto. GitHub Busque el ejemplo completo y aprenda a configurar y ejecutar en el Repositorio de ejemplos de código de AWS
. def create_subnet_group(neptune_client, group_name: str): """ Creates a Neptune DB subnet group and returns its name and ARN. Args: neptune_client (boto3.client): The Neptune client object. group_name (str): The desired name of the subnet group. Returns: tuple(str, str): (subnet_group_name, subnet_group_arn) Raises: RuntimeError: For quota errors or other AWS-related failures. """ vpc_id = get_default_vpc_id() subnet_ids = get_subnet_ids(vpc_id) request = { 'DBSubnetGroupName': group_name, 'DBSubnetGroupDescription': 'My Neptune subnet group', 'SubnetIds': subnet_ids, 'Tags': [{'Key': 'Environment', 'Value': 'Dev'}] } try: response = neptune_client.create_db_subnet_group(**request) sg = response.get("DBSubnetGroup", {}) name = sg.get("DBSubnetGroupName") arn = sg.get("DBSubnetGroupArn") if not name or not arn: raise RuntimeError("Response missing subnet group name or ARN.") print(f"Subnet group created: {name}") print(f"ARN: {arn}") return name, arn except ClientError as e: code = e.response["Error"]["Code"] msg = e.response["Error"]["Message"] if code == "ServiceQuotaExceededException": print("Subnet group quota exceeded.") raise RuntimeError("Subnet group quota exceeded.") from e else: print(f"AWS error [{code}]: {msg}") raise RuntimeError(f"AWS error [{code}]: {msg}") from e except Exception as e: print(f"Unexpected error creating subnet group '{group_name}': {e}") raise RuntimeError(f"Unexpected error creating subnet group '{group_name}': {e}") from e
-
Para obtener más información sobre la API, consulta Crear un DBSubnet grupo en la referencia de la API del AWS SDK for Python (Boto3).
-
En el siguiente ejemplo de código, se muestra cómo utilizar CreateGraph
.
- SDK para Python (Boto3)
-
nota
Hay más información al respecto. GitHub Busque el ejemplo completo y aprenda a configurar y ejecutar en el Repositorio de ejemplos de código de AWS
. """ Running this example. ---------------------------------------------------------------------------------- VPC Networking Requirement: ---------------------------------------------------------------------------------- Amazon Neptune must be accessed from **within the same VPC** as the Neptune cluster. It does not expose a public endpoint, so this code must be executed from: - An **AWS Lambda function** configured to run inside the same VPC - An **EC2 instance** or **ECS task** running in the same VPC - A connected environment such as a **VPN**, **AWS Direct Connect**, or a **peered VPC** """ GRAPH_NAME = "sample-analytics-graph" def main(): config = Config(retries={"total_max_attempts": 1, "mode": "standard"}, read_timeout=None) client = boto3.client("neptune-graph", config=config) execute_create_graph(client, GRAPH_NAME) def execute_create_graph(client, graph_name): try: print("Creating Neptune graph...") response = client.create_graph( graphName=graph_name, provisionedMemory = 16 ) created_graph_name = response.get("name") graph_arn = response.get("arn") graph_endpoint = response.get("endpoint") print("Graph created successfully!") print(f"Graph Name: {created_graph_name}") print(f"Graph ARN: {graph_arn}") print(f"Graph Endpoint: {graph_endpoint}") except ClientError as e: print(f"Failed to create graph: {e.response['Error']['Message']}") except BotoCoreError as e: print(f"Failed to create graph: {str(e)}") except Exception as e: print(f"Unexpected error: {str(e)}") if __name__ == "__main__": main()
-
Para obtener más información sobre la API, consulta CreateGraphla AWS Referencia de API de SDK for Python (Boto3).
-
En el siguiente ejemplo de código, se muestra cómo utilizar DeleteDBCluster
.
- SDK para Python (Boto3)
-
nota
Hay más información al respecto. GitHub Busque el ejemplo completo y aprenda a configurar y ejecutar en el Repositorio de ejemplos de código de AWS
. def delete_db_cluster(neptune_client, cluster_id: str): """ Deletes a Neptune DB cluster and throws exceptions to the caller. Args: neptune_client (boto3.client): The Neptune client object. cluster_id (str): The ID of the Neptune DB cluster to be deleted. Raises: ClientError: If the delete operation fails. """ request = { 'DBClusterIdentifier': cluster_id, 'SkipFinalSnapshot': True } try: print(f"Deleting DB Cluster: {cluster_id}") neptune_client.delete_db_cluster(**request) except ClientError as err: code = err.response["Error"]["Code"] message = err.response["Error"]["Message"] if code == "DBClusterNotFoundFault": print(f"Cluster '{cluster_id}' not found or already deleted.") elif code == "AccessDeniedException": print("Access denied. Please ensure you have the necessary permissions.") else: print(f"Couldn't delete DB cluster. {code}: {message}") raise
-
Para obtener más información sobre la API, consulta Eliminar DBCluster en la referencia de la API del AWS SDK for Python (Boto3).
-
En el siguiente ejemplo de código, se muestra cómo utilizar DeleteDBInstance
.
- SDK para Python (Boto3)
-
nota
Hay más información al respecto. GitHub Busque el ejemplo completo y aprenda a configurar y ejecutar en el Repositorio de ejemplos de código de AWS
. def delete_db_instance(neptune_client, instance_id: str): """ Deletes a Neptune DB instance and waits for its deletion to complete. Raises exception to be handled by calling code. """ print(f"Initiating deletion of DB Instance: {instance_id}") try: neptune_client.delete_db_instance( DBInstanceIdentifier=instance_id, SkipFinalSnapshot=True ) print(f"Waiting for DB Instance '{instance_id}' to be deleted...") waiter = neptune_client.get_waiter('db_instance_deleted') waiter.wait( DBInstanceIdentifier=instance_id, WaiterConfig={ 'Delay': 30, 'MaxAttempts': 40 } ) print(f"DB Instance '{instance_id}' successfully deleted.") except ClientError as err: code = err.response["Error"]["Code"] message = err.response["Error"]["Message"] if code == "DBInstanceNotFoundFault": print(f"Instance '{instance_id}' not found or already deleted.") elif code == "AccessDeniedException": print("Access denied. Please ensure you have the necessary permissions.") else: print(f"Couldn't delete DB instance. {code}: {message}") raise
-
Para obtener más información sobre la API, consulta Eliminar DBInstance en la referencia de la API del AWS SDK for Python (Boto3).
-
En el siguiente ejemplo de código, se muestra cómo utilizar DeleteDBSubnetGroup
.
- SDK para Python (Boto3)
-
nota
Hay más información al respecto. GitHub Busque el ejemplo completo y aprenda a configurar y ejecutar en el Repositorio de ejemplos de código de AWS
. def delete_db_subnet_group(neptune_client, subnet_group_name): """ Deletes a Neptune DB subnet group synchronously using Boto3. Args: neptune_client (boto3.client): The Neptune client. subnet_group_name (str): The name of the DB subnet group to delete. Raises: ClientError: If the delete operation fails. """ delete_group_request = { 'DBSubnetGroupName': subnet_group_name } try: neptune_client.delete_db_subnet_group(**delete_group_request) print(f"️ Deleting Subnet Group: {subnet_group_name}") except ClientError as err: code = err.response["Error"]["Code"] message = err.response["Error"]["Message"] if code == "DBSubnetGroupNotFoundFault": print(f"Subnet group '{subnet_group_name}' not found or already deleted.") elif code == "AccessDeniedException": print("Access denied. Please ensure you have the necessary permissions.") else: print(f"Couldn't delete subnet group. {code}: {message}") raise
-
Para obtener más información sobre la API, consulta Eliminar DBSubnet grupo en la referencia de la API del AWS SDK for Python (Boto3).
-
En el siguiente ejemplo de código, se muestra cómo utilizar DescribeDBClusters
.
- SDK para Python (Boto3)
-
nota
Hay más información al respecto. GitHub Busque el ejemplo completo y aprenda a configurar y ejecutar en el Repositorio de ejemplos de código de AWS
. def describe_db_clusters(neptune_client, cluster_id: str): """ Describes details of a Neptune DB cluster, paginating if needed. Args: neptune_client (boto3.client): The Neptune client. cluster_id (str): The ID of the cluster to describe. Raises: ClientError: If there's an AWS API error (e.g., cluster not found). """ paginator = neptune_client.get_paginator('describe_db_clusters') try: pages = paginator.paginate(DBClusterIdentifier=cluster_id) found = False for page in pages: for cluster in page.get('DBClusters', []): found = True print(f"Cluster Identifier: {cluster.get('DBClusterIdentifier')}") print(f"Status: {cluster.get('Status')}") print(f"Engine: {cluster.get('Engine')}") print(f"Engine Version: {cluster.get('EngineVersion')}") print(f"Endpoint: {cluster.get('Endpoint')}") print(f"Reader Endpoint: {cluster.get('ReaderEndpoint')}") print(f"Availability Zones: {cluster.get('AvailabilityZones')}") print(f"Subnet Group: {cluster.get('DBSubnetGroup')}") print("VPC Security Groups:") for vpc_group in cluster.get('VpcSecurityGroups', []): print(f" - {vpc_group.get('VpcSecurityGroupId')}") print(f"Storage Encrypted: {cluster.get('StorageEncrypted')}") print(f"IAM Auth Enabled: {cluster.get('IAMDatabaseAuthenticationEnabled')}") print(f"Backup Retention Period: {cluster.get('BackupRetentionPeriod')} days") print(f"Preferred Backup Window: {cluster.get('PreferredBackupWindow')}") print(f"Preferred Maintenance Window: {cluster.get('PreferredMaintenanceWindow')}") print("------") if not found: # Treat empty response as cluster not found raise ClientError( {"Error": {"Code": "DBClusterNotFound", "Message": f"No cluster found with ID '{cluster_id}'"}}, "DescribeDBClusters" ) except ClientError as err: code = err.response["Error"]["Code"] message = err.response["Error"]["Message"] if code == "AccessDeniedException": print("Access denied. Please ensure you have the necessary permissions.") elif code == "DBClusterNotFound": print(f"Cluster '{cluster_id}' not found. Please verify the cluster ID.") else: print(f"Couldn't describe DB cluster. Here's why: {code}: {message}") raise
-
Para obtener más información sobre la API, consulta la descripción DBClusters en la referencia de la API del AWS SDK for Python (Boto3).
-
En el siguiente ejemplo de código, se muestra cómo utilizar DescribeDBInstances
.
- SDK para Python (Boto3)
-
nota
Hay más información al respecto. GitHub Busque el ejemplo completo y aprenda a configurar y ejecutar en el Repositorio de ejemplos de código de AWS
. def check_instance_status(neptune_client, instance_id: str, desired_status: str): """ Polls the status of a Neptune DB instance until it reaches desired_status. Uses pagination via describe_db_instances — even for a single instance. Raises: ClientError: If describe_db_instances fails (e.g., instance not found). RuntimeError: If timeout expires before reaching desired status. """ paginator = neptune_client.get_paginator('describe_db_instances') start_time = time.time() while True: try: pages = paginator.paginate(DBInstanceIdentifier=instance_id) instances = [] for page in pages: instances.extend(page.get('DBInstances', [])) except ClientError as err: code = err.response["Error"]["Code"] message = err.response["Error"]["Message"] if code == "DBInstanceNotFound": print(f"Instance '{instance_id}' not found. Please verify the instance ID.") else: print(f"Failed to describe DB instance. {code}: {message}") raise current_status = instances[0].get('DBInstanceStatus') if instances else None elapsed = int(time.time() - start_time) print(f"\rElapsed: {format_elapsed_time(elapsed)} Status: {current_status}", end="", flush=True) if current_status and current_status.lower() == desired_status.lower(): print(f"\nInstance '{instance_id}' reached '{desired_status}' in {format_elapsed_time(elapsed)}.") return if elapsed > TIMEOUT_SECONDS: raise RuntimeError(f"Timeout waiting for '{instance_id}' to reach '{desired_status}'") time.sleep(POLL_INTERVAL_SECONDS)
-
Para obtener más información sobre la API, consulta la descripción DBInstances en la referencia de la API del AWS SDK for Python (Boto3).
-
En el siguiente ejemplo de código, se muestra cómo utilizar ExecuteGremlinProfileQuery
.
- SDK para Python (Boto3)
-
nota
Hay más información al respecto. GitHub Busque el ejemplo completo y aprenda a configurar y ejecutar en el Repositorio de ejemplos de código de AWS
. # Replace this with your actual Neptune endpoint NEPTUNE_ENDPOINT = "https://[Specify Endpoint]:8182" def main(): """ Entry point of the program. Initializes the Neptune client and executes the Gremlin query. """ config = Config(connect_timeout=10, read_timeout=30, retries={'max_attempts': 3}) neptune_client = boto3.client( "neptunedata", endpoint_url=NEPTUNE_ENDPOINT, config=config ) execute_gremlin_query(neptune_client) def execute_gremlin_query(neptune_client): """ Executes a Gremlin query against an Amazon Neptune database. """ try: print("Querying Neptune...") response = neptune_client.execute_gremlin_explain_query( gremlinQuery="g.V().has('code', 'ANC')" ) print("Full Response:") print(response['output'].read().decode('UTF-8')) except ClientError as e: print(f"Error calling Neptune: {e.response['Error']['Message']}") except BotoCoreError as e: print(f"BotoCore error: {str(e)}") except Exception as e: print(f"Unexpected error: {str(e)}") if __name__ == "__main__": main()
-
Para obtener más información sobre la API, consulta ExecuteGremlinProfileQueryla AWS Referencia de API de SDK for Python (Boto3).
-
En el siguiente ejemplo de código, se muestra cómo utilizar ExecuteGremlinQuery
.
- SDK para Python (Boto3)
-
nota
Hay más información al respecto. GitHub Busque el ejemplo completo y aprenda a configurar y ejecutar en el Repositorio de ejemplos de código de AWS
. """ Running this example. ---------------------------------------------------------------------------------- VPC Networking Requirement: ---------------------------------------------------------------------------------- Amazon Neptune must be accessed from **within the same VPC** as the Neptune cluster. It does not expose a public endpoint, so this code must be executed from: - An **AWS Lambda function** configured to run inside the same VPC - An **EC2 instance** or **ECS task** running in the same VPC - A connected environment such as a **VPN**, **AWS Direct Connect**, or a **peered VPC** """ # Replace with your actual Neptune endpoint NEPTUNE_ENDPOINT = "https://[Specify-Your-Endpoint]:8182" def main(): """ Entry point of the program. Initializes the Neptune client and runs both EXPLAIN and PROFILE queries. """ config = Config(connect_timeout=10, read_timeout=30, retries={'max_attempts': 3}) neptune_client = boto3.client( "neptunedata", endpoint_url=NEPTUNE_ENDPOINT, config=config ) try: run_profile_query(neptune_client) except ClientError as e: print(f"Neptune error: {e.response['Error']['Message']}") except BotoCoreError as e: print(f"BotoCore error: {str(e)}") except Exception as e: print(f"Unexpected error: {str(e)}") def run_profile_query(neptune_client): """ Runs a PROFILE query on the Neptune graph database. """ print("Running Gremlin PROFILE query...") try: response = neptune_client.execute_gremlin_profile_query( gremlinQuery="g.V().has('code', 'ANC')" ) print("Profile Query Result:") output = response.get("output") if output: print(output.read().decode('utf-8')) else: print("No explain output returned.") except Exception as e: print(f"Failed to execute PROFILE query: {str(e)}") if __name__ == "__main__": main()
-
Para obtener más información sobre la API, consulta ExecuteGremlinQueryla AWS Referencia de API de SDK for Python (Boto3).
-
En el siguiente ejemplo de código, se muestra cómo utilizar ExecuteOpenCypherExplainQuery
.
- SDK para Python (Boto3)
-
nota
Hay más información al respecto. GitHub Busque el ejemplo completo y aprenda a configurar y ejecutar en el Repositorio de ejemplos de código de AWS
. # Replace with your actual Neptune endpoint URL NEPTUNE_ENDPOINT = "https://<your-neptune-endpoint>:8182" def main(): """ Entry point: Create Neptune client and execute different OpenCypher queries. """ config = Config(connect_timeout=10, read_timeout=30, retries={'max_attempts': 3}) neptune_client = boto3.client( "neptunedata", endpoint_url=NEPTUNE_ENDPOINT, config=config ) execute_open_cypher_query_without_params(neptune_client) execute_open_cypher_query_with_params(neptune_client) execute_open_cypher_explain_query(neptune_client) def execute_open_cypher_query_without_params(client): """ Executes a simple OpenCypher query without parameters. """ try: print("\nRunning OpenCypher query without parameters...") resp = client.execute_open_cypher_query( openCypherQuery="MATCH (n {code: 'ANC'}) RETURN n" ) print("Results:") print(resp['results']) except Exception as e: print(f"Error in simple OpenCypher query: {str(e)}") def execute_open_cypher_query_with_params(client): """ Executes an OpenCypher query using parameters. """ try: print("\nRunning OpenCypher query with parameters...") parameters = {'code': 'ANC'} resp = client.execute_open_cypher_query( openCypherQuery="MATCH (n {code: $code}) RETURN n", parameters=json.dumps(parameters) ) print("Results:") print(resp['results']) except Exception as e: print(f"Error in parameterized OpenCypher query: {str(e)}") def execute_open_cypher_explain_query(client): """ Runs an OpenCypher EXPLAIN query in debug mode. """ try: print("\nRunning OpenCypher EXPLAIN query (debug mode)...") resp = client.execute_open_cypher_explain_query( openCypherQuery="MATCH (n {code: 'ANC'}) RETURN n", explainMode="details" ) results = resp.get('results') if results is None: print("No explain results returned.") else: try: print("Explain Results:") print(results.read().decode('UTF-8')) except Exception as e: print(f"Error in OpenCypher EXPLAIN query: {str(e)}") except ClientError as e: print(f"Neptune error: {e.response['Error']['Message']}") except BotoCoreError as e: print(f"BotoCore error: {str(e)}") except Exception as e: print(f"Unexpected error: {str(e)}") if __name__ == "__main__": main()
-
Para obtener más información sobre la API, consulta ExecuteOpenCypherExplainQueryla AWS Referencia de API de SDK for Python (Boto3).
-
En el siguiente ejemplo de código, se muestra cómo utilizar ExecuteQuery
.
- SDK para Python (Boto3)
-
nota
Hay más información al respecto. GitHub Busque el ejemplo completo y aprenda a configurar y ejecutar en el Repositorio de ejemplos de código de AWS
. """ Running this example. ---------------------------------------------------------------------------------- VPC Networking Requirement: ---------------------------------------------------------------------------------- Amazon Neptune must be accessed from **within the same VPC** as the Neptune cluster. It does not expose a public endpoint, so this code must be executed from: - An **AWS Lambda function** configured to run inside the same VPC - An **EC2 instance** or **ECS task** running in the same VPC - A connected environment such as a **VPN**, **AWS Direct Connect**, or a **peered VPC** """ GRAPH_ID = "<your-graph-id>" def main(): config = Config(retries={"total_max_attempts": 1, "mode": "standard"}, read_timeout=None) client = boto3.client("neptune-graph", config=config) try: print("\n--- Running OpenCypher query without parameters ---") run_open_cypher_query(client, GRAPH_ID) print("\n--- Running OpenCypher query with parameters ---") run_open_cypher_query_with_params(client, GRAPH_ID) print("\n--- Running OpenCypher explain query ---") run_open_cypher_explain_query(client, GRAPH_ID) except Exception as e: print(f"Unexpected error in main: {e}") def run_open_cypher_query(client, graph_id): """ Run an OpenCypher query without parameters. """ try: resp = client.execute_query( graphIdentifier=graph_id, queryString="MATCH (n {code: 'ANC'}) RETURN n", language='OPEN_CYPHER' ) print(resp['payload'].read().decode('UTF-8')) except client.exceptions.InternalServerException as e: print(f"InternalServerException: {e.response['Error']['Message']}") except ClientError as e: print(f"ClientError: {e.response['Error']['Message']}") except Exception as e: # <--- ADD THIS BLOCK print(f"Unexpected error: {e}") def run_open_cypher_query_with_params(client, graph_id): """ Run an OpenCypher query with parameters. """ try: parameters = {'code': 'ANC'} resp = client.execute_query( graphIdentifier=graph_id, queryString="MATCH (n {code: $code}) RETURN n", language='OPEN_CYPHER', parameters=parameters ) print(resp['payload'].read().decode('UTF-8')) except client.exceptions.InternalServerException as e: print(f"InternalServerException: {e.response['Error']['Message']}") except ClientError as e: print(f"ClientError: {e.response['Error']['Message']}") except Exception as e: # <--- ADD THIS BLOCK print(f"Unexpected error: {e}") def run_open_cypher_explain_query(client, graph_id): """ Run an OpenCypher explain query (explainMode = "debug"). """ try: resp = client.execute_query( graphIdentifier=graph_id, queryString="MATCH (n {code: 'ANC'}) RETURN n", language='OPEN_CYPHER', explainMode='DETAILS' ) print(resp['payload'].read().decode('UTF-8')) except ClientError as e: print(f"Neptune error: {e.response['Error']['Message']}") except BotoCoreError as e: print(f"Unexpected Boto3 error: {str(e)}") except Exception as e: # <-- Add this generic catch print(f"Unexpected error: {str(e)}") if __name__ == "__main__": main()
-
Para obtener más información sobre la API, consulta ExecuteQueryla AWS Referencia de API de SDK for Python (Boto3).
-
En el siguiente ejemplo de código, se muestra cómo utilizar StartDBCluster
.
- SDK para Python (Boto3)
-
nota
Hay más información al respecto. GitHub Busque el ejemplo completo y aprenda a configurar y ejecutar en el Repositorio de ejemplos de código de AWS
. def start_db_cluster(neptune_client, cluster_identifier: str): """ Starts an Amazon Neptune DB cluster and waits until it reaches 'available'. Args: neptune_client (boto3.client): The Neptune client. cluster_identifier (str): The DB cluster identifier. Raises: ClientError: Propagates AWS API issues like resource not found. RuntimeError: If cluster doesn't reach 'available' within timeout. """ try: # Initial wait in case the cluster was just stopped time.sleep(30) neptune_client.start_db_cluster(DBClusterIdentifier=cluster_identifier) except ClientError as err: code = err.response["Error"]["Code"] message = err.response["Error"]["Message"] if code == "AccessDeniedException": print("Access denied. Please ensure you have the necessary permissions.") else: print(f"Couldn't start DB cluster. Here's why: {code}: {message}") raise start_time = time.time() paginator = neptune_client.get_paginator('describe_db_clusters') while True: try: pages = paginator.paginate(DBClusterIdentifier=cluster_identifier) clusters = [] for page in pages: clusters.extend(page.get('DBClusters', [])) except ClientError as err: code = err.response["Error"]["Code"] message = err.response["Error"]["Message"] if code == "DBClusterNotFound": print(f"Cluster '{cluster_identifier}' not found while polling. It may have been deleted.") else: print(f"Couldn't describe DB cluster. Here's why: {code}: {message}") raise status = clusters[0].get('Status') if clusters else None elapsed = time.time() - start_time print(f"\rElapsed: {int(elapsed)}s – Cluster status: {status}", end="", flush=True) if status and status.lower() == 'available': print(f"\n🎉 Cluster '{cluster_identifier}' is available.") return if elapsed > TIMEOUT_SECONDS: raise RuntimeError(f"Timeout waiting for cluster '{cluster_identifier}' to become available.") time.sleep(POLL_INTERVAL_SECONDS)
-
Para obtener más información sobre la API, consulta la referencia de la API Start DBCluster in AWS SDK for Python (Boto3).
-
En el siguiente ejemplo de código, se muestra cómo utilizar StopDBCluster
.
- SDK para Python (Boto3)
-
nota
Hay más información al respecto. GitHub Busque el ejemplo completo y aprenda a configurar y ejecutar en el Repositorio de ejemplos de código de AWS
. def stop_db_cluster(neptune_client, cluster_identifier: str): """ Stops an Amazon Neptune DB cluster and waits until it's fully stopped. Args: neptune_client (boto3.client): The Neptune client. cluster_identifier (str): The DB cluster identifier. Raises: ClientError: For AWS API errors (e.g., resource not found). RuntimeError: If the cluster doesn't stop within the timeout. """ try: neptune_client.stop_db_cluster(DBClusterIdentifier=cluster_identifier) except ClientError as err: code = err.response["Error"]["Code"] message = err.response["Error"]["Message"] if code == "AccessDeniedException": print("Access denied. Please ensure you have the necessary permissions.") else: print(f"Couldn't stop DB cluster. Here's why: {code}: {message}") raise start_time = time.time() paginator = neptune_client.get_paginator('describe_db_clusters') while True: try: pages = paginator.paginate(DBClusterIdentifier=cluster_identifier) clusters = [] for page in pages: clusters.extend(page.get('DBClusters', [])) except ClientError as err: code = err.response["Error"]["Code"] message = err.response["Error"]["Message"] if code == "DBClusterNotFound": print(f"Cluster '{cluster_identifier}' not found while polling. It may have been deleted.") else: print(f"Couldn't describe DB cluster. Here's why: {code}: {message}") raise status = clusters[0].get('Status') if clusters else None elapsed = time.time() - start_time print(f"\rElapsed: {int(elapsed)}s – Cluster status: {status}", end="", flush=True) if status and status.lower() == 'stopped': print(f"\nCluster '{cluster_identifier}' is now stopped.") return if elapsed > TIMEOUT_SECONDS: raise RuntimeError(f"Timeout waiting for cluster '{cluster_identifier}' to stop.") time.sleep(POLL_INTERVAL_SECONDS)
-
Para obtener más información sobre la API, consulta la referencia de la API Stop DBCluster in AWS SDK for Python (Boto3).
-