Skip to content

Commit

Permalink
Fixed the apigateway__enum module
Browse files Browse the repository at this point in the history
- If the user has "GET" permissions on API Gateways but not the API Keys, the module will still get all the paths from the Gateway rather than failing
- There was no error handling on "all regions" so it would fail. Added error handling so the user can check all regions without issues
- Previously it saved the data to a standalone file, updated this so it stores to the Pacu DB like other modules
  • Loading branch information
TeneBrae93 committed May 31, 2024
1 parent 7252de6 commit 908af22
Showing 1 changed file with 52 additions and 29 deletions.
81 changes: 52 additions & 29 deletions pacu/modules/apigateway__enum/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@
import pprint
import json
import time
from copy import deepcopy

from pacu.core.lib import downloads_dir
from pacu.core.lib import strip_lines
from pacu import Main

module_info = {
'name': 'enum__apigateway',
Expand All @@ -32,7 +35,7 @@
pp = pprint.PrettyPrinter(indent=2)


# Get all resources for API
# Get all resources for API
#
# return [] dict
def get_api_resources(client, api_id):
Expand Down Expand Up @@ -69,24 +72,31 @@ def get_api_methods(client, api_id, resource):


def get_api_keys(client):
response = client.get_api_keys(limit=500, includeValues=True)
if response.get('items'):
return response['items']
try:
response = client.get_api_keys(limit=500, includeValues=True)
if response.get('items'):
return response['items']
except client.exceptions.ClientError as e:
print(f"Failed to get API keys: {e}")
return []


def get_client_certs(client):
response = client.get_client_certificates(limit=500)
data = []
if response.get('items'):
for cert in response['items']:
res = client.get_client_certificate(clientCertificateId=cert['clientCertificateId'])
cert['pemEncodedCertificate'] = res['pemEncodedCertificate']
data.append(cert)
return data
try:
response = client.get_client_certificates(limit=500)
data = []
if response.get('items'):
for cert in response['items']:
res = client.get_client_certificate(clientCertificateId=cert['clientCertificateId'])
cert['pemEncodedCertificate'] = res['pemEncodedCertificate']
data.append(cert)
return data
except client.exceptions.ClientError as e:
print(f"Failed to get client certificates: {e}")
return []


# If permissions supported export the API documentaion as Swagger File
# If permissions supported export the API documentation as Swagger File
def export_api_doc(client, session, api_summary, exportType='swagger'):

files_names = []
Expand Down Expand Up @@ -141,43 +151,50 @@ def parse_method(base_url, method, path, stages):
return api_method


def main(args, pacu):
def main(args, pacu_main: "Main"):
"""Main module function, called from Pacu"""
print = pacu.print
session = pacu.get_active_session()
session = pacu_main.get_active_session()
print = pacu_main.print
args = parser.parse_args(args)

outfile_path = downloads_dir()/'apigateway'

if args.regions:
regions = args.regions.split(',')
else:
regions = pacu.get_regions('apigateway')
regions = pacu_main.get_regions('apigateway')

# Set up summary data object
# apis[] holds each api object which contains api info an route info
# apikeys[] holds all api keys
# clientCerts[] holds all client certs
summary_data = {'apis': [], 'apiKeys': [], 'clientCerts': []}

for region in regions:
client = pacu.get_boto3_client('apigateway', region)
client = pacu_main.get_boto3_client('apigateway', region)
print(f"Enumerating {region}")

# Get global API data
summary_data['apiKeys'] = get_api_keys(client)
summary_data['clientCerts'] = get_client_certs(client)
try:
summary_data['apiKeys'] = get_api_keys(client)
except Exception as e:
print(f"Failed to get API Keys in {region}: {e}")
try:
summary_data['clientCerts'] = get_client_certs(client)
except Exception as e:
print(f"Failed to get Client Certs in {region}: {e}")

# Currently this only supports REST apis
# Get all apis in AWS Gatway
response = client.get_rest_apis()
# Get all apis in AWS Gateway
try:
response = client.get_rest_apis()
except Exception as e:
print(f"Failed to get APIs in {region}: {e}")
continue

items = response['items']

# for each api in the account
for api in items:

# create api objecy summary
# create api object summary
api_summary = {
'id': '',
'name': '',
Expand All @@ -195,7 +212,7 @@ def main(args, pacu):

print(f"Enumerating API: {api_summary['name']}")

# For each resource get all methods and parse it into it's method summary.
# For each resource get all methods and parse it into its method summary.
for resource in get_api_resources(client, api_summary['id']):
for method in get_api_methods(client, api_summary['id'], resource):
api_summary['urlPaths'].append(parse_method(api_summary['urlBase'], method, resource['path'], api_summary['stages']))
Expand All @@ -216,10 +233,16 @@ def main(args, pacu):
)
f.close()

# Write all the data to the database for storage
api_data = deepcopy(session.APIGateway)
for key, value in summary_data.items():
api_data[key] = value
session.update(pacu_main.database, APIGateway=api_data)

return summary_data


def summary(data, pacu):
def summary(data, pacu_main):
msg = ''
if not data:
msg = 'Module execution failed'
Expand All @@ -242,7 +265,7 @@ def summary(data, pacu):
for key in data['apiKeys']:
print(f"\tKey Name: {key['name']} \n\t\tValue: {key['value']} \n\t\tcreatedDate: {key['createdDate']}")

# Display Gloal Client Certs
# Display Global Client Certs
print("-----Client Certs-----")
for cert in data['clientCerts']:
print(f"\tCert Id: {cert['clientCertificateId']} \n\t\texpirationDate: {cert['expirationDate']} \n\t\tpem: {cert['pemEncodedCertificate']}")
Expand Down

0 comments on commit 908af22

Please sign in to comment.