diff --git a/README.md b/README.md index 75b3442d..6b9d1d2f 100644 --- a/README.md +++ b/README.md @@ -104,6 +104,7 @@ A configuration wizard will prompt you to enter the necessary configuration para - The reserved word `acc-role` will use the name component of the role arn prepended with account number (or alias if `resolve_aws_alias` is set to y) to avoid collisions, i.e. arn:aws:iam::123456789012:role/okta-1234-role becomes section [123456789012-okta-1234-role], or if `resolve_aws_alias` [-okta-1234-role] in the aws credentials file - If set to `default` then the temp creds will be stored in the default profile - Note: if there are multiple roles, and `default` is selected it will be overwritten multiple times and last role wins. The same happens when `role` is selected and you have many accounts with the same role names. Consider using `acc-role` if this happens. +- jar - This is optional. Path to read/store cookies containing Okta session cookies potentially reducing password/MFA prompts depending on your organization's policy configuration. - aws_appname - This is optional. The Okta AWS App name, which has the role you want to assume. - aws_rolename - This is optional. The ARN of the role you want temporary AWS credentials for. The reserved word 'all' can be used to get and store credentials for every role the user is permissioned for. - aws_default_duration = This is optional. Lifetime for temporary credentials, in seconds. Defaults to 1 hour (3600) diff --git a/gimme_aws_creds/config.py b/gimme_aws_creds/config.py index cbd99d53..1ae16e29 100644 --- a/gimme_aws_creds/config.py +++ b/gimme_aws_creds/config.py @@ -248,6 +248,7 @@ def update_config_file(self): 'remember_device': 'n', 'aws_default_duration': '3600', 'device_token': '', + 'jar': '', 'output_format': 'export', } diff --git a/gimme_aws_creds/main.py b/gimme_aws_creds/main.py index 600e783d..caa848f8 100644 --- a/gimme_aws_creds/main.py +++ b/gimme_aws_creds/main.py @@ -536,6 +536,7 @@ def okta(self): self.okta_org_url, self.config.verify_ssl_certs, self.device_token, + self.conf_dict.get('jar') ) if self.config.username is not None: diff --git a/gimme_aws_creds/okta.py b/gimme_aws_creds/okta.py index d23b36bb..9b3be583 100644 --- a/gimme_aws_creds/okta.py +++ b/gimme_aws_creds/okta.py @@ -18,7 +18,10 @@ import uuid import webbrowser from codecs import decode +from http.cookiejar import LWPCookieJar, lwp_cookie_str from multiprocessing import Process +from pathlib import Path +from tempfile import NamedTemporaryFile from urllib.parse import parse_qs from urllib.parse import urlparse, quote @@ -28,6 +31,7 @@ from fido2.utils import websafe_decode from keyring.backends.fail import Keyring as FailKeyring from keyring.errors import PasswordDeleteError +from requests.cookies import RequestsCookieJar from requests.adapters import HTTPAdapter, Retry from gimme_aws_creds.u2f import FactorU2F @@ -37,6 +41,50 @@ from .registered_authenticators import RegisteredAuthenticators +class OktaCookieJar(RequestsCookieJar): + """Extends RequestsCookieJar adding serialization to/from a file.""" + def __init__(self, path, *args, **kwargs): + """Populates the jar from filename if the file exists.""" + super().__init__(*args, **kwargs) + self._path = path + # Use a temporary LWPCookieJar to read the cookies from disk + jar = LWPCookieJar(path.as_posix()) + try: + jar.load(ignore_discard=True) + except FileNotFoundError: + pass # jar may not exist on first run + for cookie in jar: + super().set_cookie(cookie) + + def save(self): + """Mimics LWPCookieJar.save but with multi-process safety.""" + # Because we move the temporary file, this may throw FileNotFoundError + try: + # Write the new jar into a temporary file in the same directory + with NamedTemporaryFile(mode="w", dir=self._path.parent) as tmp: + now = time.time() + tmp.write("#LWP-Cookies-2.0\n") + for cookie in self: + if cookie.is_expired(now): + continue + tmp.write("Set-Cookie3: %s\n" % lwp_cookie_str(cookie)) + tmp.flush() + # rename is an atomic operation (within the same fs) + Path(tmp.name).replace(self._path) + except FileNotFoundError: + pass # expected if rename successful + + def clear(self, domain=None, path=None, name=None): + """Clears a cookie from the jar and writes to disk.""" + super().clear(domain, path, name) + self.save() + + def set_cookie(self, cookie, *args, **kwargs): + """Saves a cookie into the jar and to disk.""" + super().set_cookie(cookie, *args, **kwargs) + self.save() + + class OktaClient(object): """ The Okta Client Class performs the necessary API @@ -47,7 +95,7 @@ class OktaClient(object): KEYRING_SERVICE = 'gimme-aws-creds' KEYRING_ENABLED = not isinstance(keyring.get_keyring(), FailKeyring) - def __init__(self, gac_ui, okta_org_url, verify_ssl_certs=True, device_token=None): + def __init__(self, gac_ui, okta_org_url, verify_ssl_certs=True, device_token=None, jar=None): """ :type gac_ui: ui.UserInterface :param okta_org_url: Base URL string for Okta IDP. @@ -55,6 +103,7 @@ def __init__(self, gac_ui, okta_org_url, verify_ssl_certs=True, device_token=Non """ self.ui = gac_ui self._okta_org_url = okta_org_url + self._okta_domain = urlparse(self._okta_org_url).netloc self._verify_ssl_certs = verify_ssl_certs if verify_ssl_certs is False: @@ -71,10 +120,12 @@ def __init__(self, gac_ui, okta_org_url, verify_ssl_certs=True, device_token=Non self._oauth_access_token = None self._oauth_id_token = None - self._jar = requests.cookies.RequestsCookieJar() + self._jar = OktaCookieJar(Path(jar).expanduser()) if jar else RequestsCookieJar() # Allow up to 5 retries on requests to Okta in case we have network issues self._http_client = requests.Session() + self._http_client.verify = verify_ssl_certs + self._http_client.headers = self._get_headers() self._http_client.cookies = self._jar self.device_token = device_token @@ -83,15 +134,20 @@ def __init__(self, gac_ui, okta_org_url, verify_ssl_certs=True, device_token=Non method_whitelist=['GET', 'POST']) self._http_client.mount('https://', HTTPAdapter(max_retries=retries)) + @property + def session_id(self): + """ Retrieve the Okta Session ID (sid) from cookies """ + return self._get_okta_cookie('sid') + @property def device_token(self): - return self._http_client.cookies.get('DT') + """ Retrieve the Okta Device Token (DT) from cookies """ + return self._get_okta_cookie('DT') @device_token.setter def device_token(self, device_token): if device_token is not None: - match = re.search(r'^https://(.*)/?', self._okta_org_url) - self._http_client.cookies.set('DT', device_token, domain=match.group(1), path='/') + self._jar.set('DT', device_token, domain=self._okta_domain, path='/') def set_username(self, username): self._username = username @@ -139,8 +195,7 @@ def stepup_auth_saml(self, embed_link, state_token=None): login_result = self._http_client.post( saml_response['TargetUrl'], - data=saml_response, - verify=self._verify_ssl_certs + data=saml_response ) return login_result.text @@ -157,31 +212,28 @@ def auth(self): def auth_session(self, **kwargs): """ Authenticate the user and return the Okta Session ID and username""" - login_response = self.auth() - - session_url = self._okta_org_url + '/login/sessionCookieRedirect' - - if 'redirect_uri' not in kwargs: - redirect_uri = 'http://localhost:8080/login' - else: - redirect_uri = kwargs['redirect_uri'] - - params = { - 'token': login_response['sessionToken'], - 'redirectUrl': redirect_uri - } - - response = self._http_client.get( - session_url, - params=params, - headers=self._get_headers(), - verify=self._verify_ssl_certs, - allow_redirects=False - ) + # If there is a session ID in the cookie jar already, check if it's still valid + current_user = None + if self.session_id: + response = self._http_client.get(self._okta_org_url + '/api/v1/users/me') + if response.ok: + current_user = response.json() + # Session wasn't valid, redo authentication + if not current_user: + login_response = self.auth() + response = self._http_client.get( + self._okta_org_url + '/login/sessionCookieRedirect', + params={ + 'token': login_response['sessionToken'], + 'redirectUrl': kwargs.get('redirect_uri', 'http://localhost:8080/login') + }, + allow_redirects=False + ) + current_user = login_response['_embedded']['user'] return { - "username": login_response['_embedded']['user']['profile']['login'], - "session": response.cookies['sid'], - "device_token": self._http_client.cookies['DT'] + "username": current_user['profile']['login'], + "session": self.session_id, + "device_token": self.device_token } def auth_oauth(self, client_id, **kwargs): @@ -242,8 +294,6 @@ def auth_oauth(self, client_id, **kwargs): response = self._http_client.get( oauth_url, params=params, - headers=self._get_headers(), - verify=self._verify_ssl_certs, allow_redirects=False ) response.raise_for_status() @@ -272,6 +322,12 @@ def _get_headers(): } return headers + def _get_okta_cookie(self, name): + """Returns an cookie with name for the specified okta domain.""" + for cookie in self._jar: + if cookie.name == name and cookie.domain == self._okta_domain: + return cookie.value + def _get_initial_flow_state(self, embed_link, state_token=None): """ Starts the authentication flow with Okta""" if state_token is None: @@ -283,9 +339,7 @@ def _get_initial_flow_state(self, embed_link, state_token=None): response = self._http_client.post( self._okta_org_url + '/api/v1/authn', - json={'stateToken': state_token}, - headers=self._get_headers(), - verify=self._verify_ssl_certs + json={'stateToken': state_token} ) response.raise_for_status() return {'stateToken': state_token, 'apiResponse': response.json()} @@ -335,12 +389,7 @@ def _login_username_password(self, state_token, url): if state_token is not None: login_json['stateToken'] = state_token - response = self._http_client.post( - url, - json=login_json, - headers=self._get_headers(), - verify=self._verify_ssl_certs - ) + response = self._http_client.post(url, json=login_json) response_data = response.json() @@ -375,9 +424,7 @@ def _login_send_sms(self, state_token, factor): response = self._http_client.post( factor['_links']['verify']['href'], params={'rememberDevice': self._remember_device}, - json={'stateToken': state_token}, - headers=self._get_headers(), - verify=self._verify_ssl_certs + json={'stateToken': state_token} ) response.raise_for_status() @@ -394,9 +441,7 @@ def _login_send_call(self, state_token, factor): response = self._http_client.post( factor['_links']['verify']['href'], params={'rememberDevice': self._remember_device}, - json={'stateToken': state_token}, - headers=self._get_headers(), - verify=self._verify_ssl_certs + json={'stateToken': state_token} ) response.raise_for_status() @@ -413,9 +458,7 @@ def _login_send_push(self, state_token, factor): response = self._http_client.post( factor['_links']['verify']['href'], params={'rememberDevice': self._remember_device}, - json={'stateToken': state_token}, - headers=self._get_headers(), - verify=self._verify_ssl_certs + json={'stateToken': state_token} ) response.raise_for_status() @@ -430,9 +473,7 @@ def _login_input_webauthn_challenge(self, state_token, factor): """ Retrieve nonce """ response = self._http_client.post( factor['_links']['verify']['href'], - json={'stateToken': state_token}, - headers=self._get_headers(), - verify=self._verify_ssl_certs + json={'stateToken': state_token} ) response.raise_for_status() @@ -519,8 +560,6 @@ def _get_response_data(self, href, state_token): response = self._http_client.post(href, params={'rememberDevice': self._remember_device}, json={'stateToken': state_token}, - headers=self._get_headers(), - verify=self._verify_ssl_certs ) response_data = response.json() return response_data @@ -575,9 +614,7 @@ def _login_input_mfa_challenge(self, state_token, next_url): response = self._http_client.post( next_url, params={'rememberDevice': self._remember_device}, - json={'stateToken': state_token, 'passCode': pass_code}, - headers=self._get_headers(), - verify=self._verify_ssl_certs + json={'stateToken': state_token, 'passCode': pass_code} ) response.raise_for_status() @@ -595,9 +632,7 @@ def _check_push_result(self, state_token, login_data): time.sleep(1) response = self._http_client.post( login_data['_links']['next']['href'], - json={'stateToken': state_token}, - headers=self._get_headers(), - verify=self._verify_ssl_certs + json={'stateToken': state_token} ) response.raise_for_status() @@ -636,9 +671,7 @@ def _check_u2f_result(self, state_token, login_data): response = self._http_client.post( login_data['_links']['next']['href'] + "?rememberDevice=false", - json={'stateToken': state_token, 'clientData': client_data, 'signatureData': signature_data}, - headers=self._get_headers(), - verify=self._verify_ssl_certs + json={'stateToken': state_token, 'clientData': client_data, 'signatureData': signature_data} ) response.raise_for_status() @@ -673,9 +706,7 @@ def _check_webauthn_result(self, state_token, login_data): response = self._http_client.post( login_data['_links']['next']['href'] + "?rememberDevice=false", json={'stateToken': state_token, 'clientData': client_data, 'signatureData': signature_data, - 'authenticatorData': auth_data}, - headers=self._get_headers(), - verify=self._verify_ssl_certs + 'authenticatorData': auth_data} ) response.raise_for_status() @@ -690,7 +721,7 @@ def _check_webauthn_result(self, state_token, login_data): def get_saml_response(self, url): """ return the base64 SAML value object from the SAML Response""" - response = self._http_client.get(url, verify=self._verify_ssl_certs) + response = self._http_client.get(url) response.raise_for_status() saml_response = None @@ -899,8 +930,7 @@ def _get_username_password_creds(self): def setup_fido_authenticator(self): setup_fido_authenticator_url = self._okta_org_url + '/user/settings/factors/setup?factorType=FIDO_WEBAUTHN' - response = self._http_client.get(setup_fido_authenticator_url, headers=self._get_headers(), - verify=self._verify_ssl_certs) + response = self._http_client.get(setup_fido_authenticator_url) response.raise_for_status() parsed_url = urlparse(response.url) @@ -917,8 +947,7 @@ def setup_fido_authenticator(self): # Expected while adding a new fido authenticator pass - response = self._http_client.get(setup_fido_authenticator_url, json={'stateToken': state_token}, - headers=self._get_headers(), verify=self._verify_ssl_certs) + response = self._http_client.get(setup_fido_authenticator_url, json={'stateToken': state_token}) response.raise_for_status() state_token = self._extract_state_token_from_http_response(response) @@ -948,13 +977,11 @@ def _verify_password(self, verify_password_page_response): # Must be form urlencoded headers['Content-Type'] = 'application/x-www-form-urlencoded; charset=UTF-8' data = '_xsrfToken={xsrf_token}&password={password}'.format(xsrf_token=xsrf_token, password=creds['password']) - response = self._http_client.post(self._okta_org_url + '/user/verify_password', - data=data, headers=headers, verify=self._verify_ssl_certs) + response = self._http_client.post(self._okta_org_url + '/user/verify_password', data=data) response.raise_for_status() response = self._http_client.get( - self._okta_org_url + '/login/second-factor?fromURI=%2Fenduser%2Fsettings&forcePrompt=true&hideBgImage=true', - headers=self._get_headers(), verify=self._verify_ssl_certs) + self._okta_org_url + '/login/second-factor?fromURI=%2Fenduser%2Fsettings&forcePrompt=true&hideBgImage=true') response.raise_for_status() return response @@ -980,8 +1007,7 @@ def _activate_webauthn_factor(self, state_token): response = self._http_client.post( next_link['href'], - json={"stateToken": state_token, "clientData": client_data, "attestation": attestation_data}, - headers=self._get_headers(), verify=self._verify_ssl_certs) + json={"stateToken": state_token, "clientData": client_data, "attestation": attestation_data}) response.raise_for_status() session_token = response.json()['sessionToken'] @@ -990,8 +1016,7 @@ def _activate_webauthn_factor(self, state_token): response = self._http_client.get( self._okta_org_url + '/login/sessionCookieRedirect?checkAccountSetupComplete=true&' 'token={session_token}&redirectUrl={redirect_url}'.format(session_token=session_token, - redirect_url=redirect_url), - headers=self._get_headers(), verify=self._verify_ssl_certs) + redirect_url=redirect_url)) response.raise_for_status() return attestation.auth_data.credential_data.credential_id, user_obj.get('name', 'gimme-aws-creds') @@ -1006,9 +1031,7 @@ def _enroll_factor(self, state_token): response = self._http_client.post( webauthn_factor['_links']['enroll']['href'], json={"stateToken": state_token, "factorType": webauthn_factor['factorType'], - "provider": webauthn_factor['provider']}, - headers=self._get_headers(), - verify=self._verify_ssl_certs + "provider": webauthn_factor['provider']} ) response.raise_for_status() @@ -1016,8 +1039,7 @@ def _enroll_factor(self, state_token): def _introspect_factors(self, state_token): response = self._http_client.post(self._okta_org_url + '/api/v1/authn/introspect', - json={"stateToken": state_token}, headers=self._get_headers(), - verify=self._verify_ssl_certs) + json={"stateToken": state_token}) response.raise_for_status() factors = response.json()['_embedded']['factors'] if not factors: