Skip to content

Commit

Permalink
Enable RBAC check
Browse files Browse the repository at this point in the history
This will allow krkn to check if the current user context can do something and allow us to create alternate flows in the krkn code based on the user's privilege/RBAC.

Signed-off-by: yogananth subramanian <[email protected]>
  • Loading branch information
yogananth-subramanian committed Aug 15, 2024
1 parent 32dfb12 commit 4b27908
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 0 deletions.
45 changes: 45 additions & 0 deletions src/krkn_lib/k8s/krkn_kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ def __initialize_clients(self, kubeconfig_path: str = None):
self.apps_api = client.AppsV1Api(self.api_client)
self.batch_cli = client.BatchV1Api(self.k8s_client)
self.net_cli = client.NetworkingV1Api(self.api_client)
self.auth_cli = client.AuthorizationV1Api(self.k8s_client)
self.custom_object_client = client.CustomObjectsApi(
self.k8s_client
)
Expand Down Expand Up @@ -1651,6 +1652,50 @@ def check_if_pvc_exists(
logging.error("Namespace '%s' doesn't exist", str(namespace))
return False

def check_rbac_access(self, resource: str, verb: str,
namespace: str = None) -> bool:
"""
Check if the current user can perform an action in the given namespace.
If namespace is not passed, check would be performed against all
namespace.
:param resource: One of the existing resource types
:param verb: Verb is a kubernetes resource API verb.
:param namespace: Namespace is the namespace of the action being
requested.
:return: boolean value indicating whether
the user is allowed to do the requested action.
"""
if namespace:
body = client.V1SelfSubjectAccessReview(
spec=client.V1SelfSubjectAccessReviewSpec(
resource_attributes=client.V1ResourceAttributes(
namespace=namespace,
resource=resource,
verb=verb
)
)
)
else:
body = client.V1SelfSubjectAccessReview(
spec=client.V1SelfSubjectAccessReviewSpec(
resource_attributes=client.V1ResourceAttributes(
resource=resource,
verb=verb
)
)
)
try:
api_response = self.auth_cli.create_self_subject_access_review(
body=body)
allowed = api_response.status.allowed
except ApiException as e:
logging.error(
"Exception when calling"
"AuthorizationV1Api->create_self_subject_access_review: %s\n",
str(e))
return allowed

def get_pvc_info(self, name: str, namespace: str) -> PVC:
"""
Retrieve information about a Persistent Volume Claim in a
Expand Down
12 changes: 12 additions & 0 deletions src/krkn_lib/tests/test_krkn_kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -612,6 +612,18 @@ def test_get_pod_info(self):
logging.error("test raised exception {0}".format(str(e)))
self.assertTrue(False)


def test_check_rbac_access(self):
try:
namespace = "test-ns-" + self.get_random_string(10)
self.deploy_namespace(namespace, [])
self.assertTrue(self.lib_k8s.check_rbac_access('pod', 'get', namespace))
self.assertTrue(self.lib_k8s.check_rbac_access('services', 'create', 'default'))
self.assertTrue(self.lib_k8s.check_rbac_access('secretes', 'delete', namespace))
except Exception as e:
logging.error("test raised exception {0}".format(str(e)))
self.assertTrue(False)

def test_check_if_namespace_exists(self):
try:
namespace = "test-ns-" + self.get_random_string(10)
Expand Down

0 comments on commit 4b27908

Please sign in to comment.