From 908af22319e1772e081efe9b454bfa988cff83d4 Mon Sep 17 00:00:00 2001 From: TeneBrae93 Date: Fri, 31 May 2024 12:02:38 -0500 Subject: [PATCH] Fixed the apigateway__enum module - If the user has "GET" permissions on API Gateways but not the API Keys, the module will still get all the paths from the Gateway rather than failing - There was no error handling on "all regions" so it would fail. Added error handling so the user can check all regions without issues - Previously it saved the data to a standalone file, updated this so it stores to the Pacu DB like other modules --- pacu/modules/apigateway__enum/main.py | 81 +++++++++++++++++---------- 1 file changed, 52 insertions(+), 29 deletions(-) diff --git a/pacu/modules/apigateway__enum/main.py b/pacu/modules/apigateway__enum/main.py index 9cabdb99..c13de207 100644 --- a/pacu/modules/apigateway__enum/main.py +++ b/pacu/modules/apigateway__enum/main.py @@ -6,8 +6,11 @@ import pprint import json import time +from copy import deepcopy from pacu.core.lib import downloads_dir +from pacu.core.lib import strip_lines +from pacu import Main module_info = { 'name': 'enum__apigateway', @@ -32,7 +35,7 @@ pp = pprint.PrettyPrinter(indent=2) -# Get all resources for API +# Get all resources for API # # return [] dict def get_api_resources(client, api_id): @@ -69,24 +72,31 @@ def get_api_methods(client, api_id, resource): def get_api_keys(client): - response = client.get_api_keys(limit=500, includeValues=True) - if response.get('items'): - return response['items'] + try: + response = client.get_api_keys(limit=500, includeValues=True) + if response.get('items'): + return response['items'] + except client.exceptions.ClientError as e: + print(f"Failed to get API keys: {e}") return [] def get_client_certs(client): - response = client.get_client_certificates(limit=500) - data = [] - if response.get('items'): - for cert in response['items']: - res = client.get_client_certificate(clientCertificateId=cert['clientCertificateId']) - cert['pemEncodedCertificate'] = res['pemEncodedCertificate'] - data.append(cert) - return data + try: + response = client.get_client_certificates(limit=500) + data = [] + if response.get('items'): + for cert in response['items']: + res = client.get_client_certificate(clientCertificateId=cert['clientCertificateId']) + cert['pemEncodedCertificate'] = res['pemEncodedCertificate'] + data.append(cert) + return data + except client.exceptions.ClientError as e: + print(f"Failed to get client certificates: {e}") + return [] -# If permissions supported export the API documentaion as Swagger File +# If permissions supported export the API documentation as Swagger File def export_api_doc(client, session, api_summary, exportType='swagger'): files_names = [] @@ -141,10 +151,10 @@ def parse_method(base_url, method, path, stages): return api_method -def main(args, pacu): +def main(args, pacu_main: "Main"): """Main module function, called from Pacu""" - print = pacu.print - session = pacu.get_active_session() + session = pacu_main.get_active_session() + print = pacu_main.print args = parser.parse_args(args) outfile_path = downloads_dir()/'apigateway' @@ -152,32 +162,39 @@ def main(args, pacu): if args.regions: regions = args.regions.split(',') else: - regions = pacu.get_regions('apigateway') + regions = pacu_main.get_regions('apigateway') # Set up summary data object - # apis[] holds each api object which contains api info an route info - # apikeys[] holds all api keys - # clientCerts[] holds all client certs summary_data = {'apis': [], 'apiKeys': [], 'clientCerts': []} for region in regions: - client = pacu.get_boto3_client('apigateway', region) + client = pacu_main.get_boto3_client('apigateway', region) print(f"Enumerating {region}") # Get global API data - summary_data['apiKeys'] = get_api_keys(client) - summary_data['clientCerts'] = get_client_certs(client) + try: + summary_data['apiKeys'] = get_api_keys(client) + except Exception as e: + print(f"Failed to get API Keys in {region}: {e}") + try: + summary_data['clientCerts'] = get_client_certs(client) + except Exception as e: + print(f"Failed to get Client Certs in {region}: {e}") # Currently this only supports REST apis - # Get all apis in AWS Gatway - response = client.get_rest_apis() + # Get all apis in AWS Gateway + try: + response = client.get_rest_apis() + except Exception as e: + print(f"Failed to get APIs in {region}: {e}") + continue items = response['items'] # for each api in the account for api in items: - # create api objecy summary + # create api object summary api_summary = { 'id': '', 'name': '', @@ -195,7 +212,7 @@ def main(args, pacu): print(f"Enumerating API: {api_summary['name']}") - # For each resource get all methods and parse it into it's method summary. + # For each resource get all methods and parse it into its method summary. for resource in get_api_resources(client, api_summary['id']): for method in get_api_methods(client, api_summary['id'], resource): api_summary['urlPaths'].append(parse_method(api_summary['urlBase'], method, resource['path'], api_summary['stages'])) @@ -216,10 +233,16 @@ def main(args, pacu): ) f.close() + # Write all the data to the database for storage + api_data = deepcopy(session.APIGateway) + for key, value in summary_data.items(): + api_data[key] = value + session.update(pacu_main.database, APIGateway=api_data) + return summary_data -def summary(data, pacu): +def summary(data, pacu_main): msg = '' if not data: msg = 'Module execution failed' @@ -242,7 +265,7 @@ def summary(data, pacu): for key in data['apiKeys']: print(f"\tKey Name: {key['name']} \n\t\tValue: {key['value']} \n\t\tcreatedDate: {key['createdDate']}") - # Display Gloal Client Certs + # Display Global Client Certs print("-----Client Certs-----") for cert in data['clientCerts']: print(f"\tCert Id: {cert['clientCertificateId']} \n\t\texpirationDate: {cert['expirationDate']} \n\t\tpem: {cert['pemEncodedCertificate']}")