Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BucketPolicy: add tests for ConfirmRemoveSelfBucketAccess header #567

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions s3tests.conf.SAMPLE
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,13 @@ secret_key = aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
user_id = RGW11111111111111111
email = [email protected]

# iam account non-root user in the same account [iam root]
[iam root alt]
access_key = CCCCCCCCCCCCCCCCCCcc
secret_key = cccccccccccccccccccccccccccccccccccccccc
user_id = testacct1user
email = [email protected]

# iam account root user in a different account than [iam root]
[iam alt root]
access_key = BBBBBBBBBBBBBBBBBBbb
Expand Down
16 changes: 16 additions & 0 deletions s3tests_boto3/functional/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,11 @@ def configure():
config.iam_root_user_id = cfg.get('iam root',"user_id")
config.iam_root_email = cfg.get('iam root',"email")

config.iam_root_alt_access_key = cfg.get('iam root alt',"access_key")
config.iam_root_alt_secret_key = cfg.get('iam root alt',"secret_key")
config.iam_root_alt_user_id = cfg.get('iam root alt',"user_id")
config.iam_root_alt_email = cfg.get('iam root alt',"email")

config.iam_alt_root_access_key = cfg.get('iam alt root',"access_key")
config.iam_alt_root_secret_key = cfg.get('iam alt root',"secret_key")
config.iam_alt_root_user_id = cfg.get('iam alt root',"user_id")
Expand Down Expand Up @@ -461,6 +466,17 @@ def get_iam_root_client(**kwargs):
verify=config.default_ssl_verify,
**kwargs)

def get_iam_root_alt_client(**kwargs):
kwargs.setdefault('service_name', 's3')
kwargs.setdefault('aws_access_key_id', config.iam_root_alt_access_key)
kwargs.setdefault('aws_secret_access_key', config.iam_root_alt_secret_key)

return boto3.client(endpoint_url=config.default_endpoint,
region_name='',
use_ssl=config.default_is_secure,
verify=config.default_ssl_verify,
**kwargs)

def get_iam_alt_root_client(**kwargs):
kwargs.setdefault('service_name', 'iam')
kwargs.setdefault('aws_access_key_id', config.iam_alt_root_access_key)
Expand Down
79 changes: 79 additions & 0 deletions s3tests_boto3/functional/test_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@
get_alt_user_id,
get_alt_email,
get_alt_client,
get_iam_root_client,
get_iam_root_alt_client,
get_tenant_client,
get_tenant_iam_client,
get_tenant_user_id,
Expand Down Expand Up @@ -10403,6 +10405,83 @@ def test_bucketv2_policy():
response = alt_client.list_objects_v2(Bucket=bucket_name)
assert len(response['Contents']) == 1

@pytest.mark.bucket_policy
def test_bucket_policy_deny_self_denied_policy():
root_client = get_iam_root_client(service_name="s3")
bucket_name = get_new_bucket(root_client)

resource1 = "arn:aws:s3:::" + bucket_name
resource2 = "arn:aws:s3:::" + bucket_name + "/*"
policy_document = json.dumps(
{
"Version": "2012-10-17",
"Statement": [{
"Effect": "Deny",
"Principal": "*",
"Action": [
"s3:PutBucketPolicy",
"s3:GetBucketPolicy",
"s3:DeleteBucketPolicy",
],
"Resource": [
"{}".format(resource1),
"{}".format(resource2)
]
}]
})

root_client.put_bucket_policy(Bucket=bucket_name, Policy=policy_document)

# non-root account should not be able to get, put or delete bucket policy
root_alt_client = get_iam_root_alt_client()
check_access_denied(root_alt_client.get_bucket_policy, Bucket=bucket_name)
check_access_denied(root_alt_client.delete_bucket_policy, Bucket=bucket_name)
check_access_denied(root_alt_client.put_bucket_policy, Bucket=bucket_name, Policy=policy_document)

# root account should be able to get, put or delete bucket policy
response = root_client.get_bucket_policy(Bucket=bucket_name)
assert response['Policy'] == policy_document
root_client.delete_bucket_policy(Bucket=bucket_name)
root_client.put_bucket_policy(Bucket=bucket_name, Policy=policy_document)

@pytest.mark.bucket_policy
def test_bucket_policy_deny_self_denied_policy_confirm_header():
root_client = get_iam_root_client(service_name="s3")
bucket_name = get_new_bucket(root_client)

resource1 = "arn:aws:s3:::" + bucket_name
resource2 = "arn:aws:s3:::" + bucket_name + "/*"
policy_document = json.dumps(
{
"Version": "2012-10-17",
"Statement": [{
"Effect": "Deny",
"Principal": "*",
"Action": [
"s3:PutBucketPolicy",
"s3:GetBucketPolicy",
"s3:DeleteBucketPolicy",
],
"Resource": [
"{}".format(resource1),
"{}".format(resource2)
]
}]
})

root_client.put_bucket_policy(Bucket=bucket_name, Policy=policy_document, ConfirmRemoveSelfBucketAccess=True)

# non-root account should not be able to get, put or delete bucket policy
root_alt_client = get_iam_root_alt_client()
check_access_denied(root_alt_client.get_bucket_policy, Bucket=bucket_name)
check_access_denied(root_alt_client.delete_bucket_policy, Bucket=bucket_name)
check_access_denied(root_alt_client.put_bucket_policy, Bucket=bucket_name, Policy=policy_document)

# root account should not be able to get, put or delete bucket policy
check_access_denied(root_client.get_bucket_policy, Bucket=bucket_name)
check_access_denied(root_client.delete_bucket_policy, Bucket=bucket_name)
check_access_denied(root_client.put_bucket_policy, Bucket=bucket_name, Policy=policy_document)

@pytest.mark.bucket_policy
def test_bucket_policy_acl():
bucket_name = get_new_bucket()
Expand Down