Skip to content

Commit

Permalink
Update main.py
Browse files Browse the repository at this point in the history
  • Loading branch information
TeneBrae93 authored Jun 5, 2024
1 parent cf53546 commit 1b9d33d
Showing 1 changed file with 22 additions and 26 deletions.
48 changes: 22 additions & 26 deletions pacu/core/enumerate_iam/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import boto3
import botocore
import random
from typing import Dict, Tuple, Generator, Any

from botocore.client import Config
from botocore.endpoint import MAX_POOL_CONNECTIONS
Expand All @@ -32,10 +33,10 @@
from .bruteforce_tests import BRUTEFORCE_TESTS

MAX_THREADS = 25
CLIENT_POOL = {}
CLIENT_POOL: Dict[str, boto3.client] = {}


def report_arn(candidate):
def report_arn(candidate: str) -> Tuple[str, str, str]:
"""
Attempt to extract and slice up an ARN from the input string
"""
Expand All @@ -58,7 +59,7 @@ def report_arn(candidate):
return None, None, None


def enumerate_using_bruteforce(access_key, secret_key, session_token, region):
def enumerate_using_bruteforce(access_key: str, secret_key: str, session_token: str, region: str) -> Dict[str, Any]:
"""
Attempt to brute-force common describe calls.
"""
Expand Down Expand Up @@ -100,8 +101,7 @@ def enumerate_using_bruteforce(access_key, secret_key, session_token, region):
return output


def generate_args(access_key, secret_key, session_token, region):

def generate_args(access_key: str, secret_key: str, session_token: str, region: str) -> Generator[Tuple[str, str, str, str, str, str], None, None]:
service_names = list(BRUTEFORCE_TESTS.keys())

random.shuffle(service_names)
Expand All @@ -114,7 +114,7 @@ def generate_args(access_key, secret_key, session_token, region):
yield access_key, secret_key, session_token, region, service_name, action


def get_client(access_key, secret_key, session_token, service_name, region):
def get_client(access_key: str, secret_key: str, session_token: str, service_name: str, region: str) -> boto3.client:
key = '%s-%s-%s-%s-%s' % (access_key, secret_key, session_token, service_name, region)

client = CLIENT_POOL.get(key, None)
Expand All @@ -141,28 +141,28 @@ def get_client(access_key, secret_key, session_token, service_name, region):
)
except:
# The service might not be available in this region
return
return None

CLIENT_POOL[key] = client

return client


def check_one_permission(arg_tuple):
def check_one_permission(arg_tuple: Tuple[str, str, str, str, str, str]) -> Tuple[str, Any]:
access_key, secret_key, session_token, region, service_name, operation_name = arg_tuple
logger = logging.getLogger()

service_client = get_client(access_key, secret_key, session_token, service_name, region)
if service_client is None:
return
return None

try:
action_function = getattr(service_client, operation_name)
except AttributeError:
# The service might not have this action (this is most likely
# an error with generate_bruteforce_tests.py)
logger.error('Remove %s.%s action' % (service_name, operation_name))
return
return None

logger.debug('Testing %s.%s() in region %s' % (service_name, operation_name, region))

Expand All @@ -172,10 +172,10 @@ def check_one_permission(arg_tuple):
botocore.exceptions.EndpointConnectionError,
botocore.exceptions.ConnectTimeoutError,
botocore.exceptions.ReadTimeoutError):
return
return None
except botocore.exceptions.ParamValidationError:
logger.error('Remove %s.%s action' % (service_name, operation_name))
return
return None

msg = '-- %s.%s() worked!'
args = (service_name, operation_name)
Expand Down Expand Up @@ -203,11 +203,8 @@ def configure_logging():
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

# import botocore.vendored.requests.packages.urllib3 as urllib3
urllib3.disable_warnings(botocore.vendored.requests.packages.urllib3.exceptions.InsecureRequestWarning)


def enumerate_iam(access_key, secret_key, session_token, region):
def enumerate_iam(access_key: str, secret_key: str, session_token: str, region: str) -> Dict[str, Any]:
"""IAM Account Enumerator.
This code provides a mechanism to attempt to validate the permissions assigned
Expand All @@ -222,7 +219,7 @@ def enumerate_iam(access_key, secret_key, session_token, region):
return output


def enumerate_using_iam(access_key, secret_key, session_token, region):
def enumerate_using_iam(access_key: str, secret_key: str, session_token: str, region: str) -> Dict[str, Any]:
output = dict()
logger = logging.getLogger()

Expand Down Expand Up @@ -254,7 +251,7 @@ def enumerate_using_iam(access_key, secret_key, session_token, region):
return output


def enumerate_role(iam_client, output):
def enumerate_role(iam_client: boto3.client, output: Dict[str, Any]) -> Dict[str, Any]:
logger = logging.getLogger()

# This is the closest thing we have to a role ARN
Expand All @@ -263,7 +260,7 @@ def enumerate_role(iam_client, output):
if user_or_role_arn is None:
# The checks which follow all required the user name to run, if we were
# unable to get that piece of information just return
return
return output

# Attempt to get role to start.
try:
Expand All @@ -277,8 +274,8 @@ def enumerate_role(iam_client, output):
output['arn_path'] = arn_path

if 'role' not in user_or_role_arn:
# We did out best, but we got nothing from iam
return
# We did our best, but we got nothing from iam
return output
else:
role_name = user_or_role_arn

Expand Down Expand Up @@ -325,7 +322,7 @@ def enumerate_role(iam_client, output):
return output


def enumerate_user(iam_client, output):
def enumerate_user(iam_client: boto3.client, output: Dict[str, Any]) -> Dict[str, Any]:
logger = logging.getLogger()
output['root_account'] = False

Expand All @@ -341,7 +338,7 @@ def enumerate_user(iam_client, output):

# The checks which follow all required the user name to run, if we were
# unable to get that piece of information just return
return
return output
else:
output['iam.get_user'] = remove_metadata(user)

Expand All @@ -350,10 +347,10 @@ def enumerate_user(iam_client, output):
# OMG
logger.warn('Found root credentials!')
output['root_account'] = True
return
return output
else:
logger.error('Unexpected iam.get_user() response: %s' % user)
return
return output
else:
user_name = user['User']['UserName']

Expand Down Expand Up @@ -432,4 +429,3 @@ def enumerate_user(iam_client, output):
pass

return output

0 comments on commit 1b9d33d

Please sign in to comment.