Skip to content

Commit

Permalink
Simulate SSH Session functionality with Container-based Hosts
Browse files Browse the repository at this point in the history
This change adds the ability to use "sftp" actions against containers.
This is done by (un)packing files in tar archives, which can be added/copied from running containers.
There isn't a current implementation to emulate an interactive shell, but that is likel possible with attach_socket.

I also made some unrelated optimizations to existing code.
  • Loading branch information
JacobCallahan committed Aug 26, 2022
1 parent 08ded46 commit 9882032
Show file tree
Hide file tree
Showing 9 changed files with 153 additions and 51 deletions.
22 changes: 14 additions & 8 deletions broker/binds/containers.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
class ContainerBind:
def __init__(self, host=None, username=None, password=None, port=22):
def __init__(self, host=None, username=None, password=None, port=22, timeout=None):
self.host = host
self.username = username
self.password = password
self.port = port
self.timeout = timeout
self._client = None
self._ClientClass = None

@property
def client(self):
if not isinstance(self._client, self._ClientClass):
self._client = self._ClientClass(base_url=self.uri)
self._client = self._ClientClass(base_url=self.uri, timeout=self.timeout)
return self._client

@property
Expand Down Expand Up @@ -74,24 +75,29 @@ def __repr__(self):


class PodmanBind(ContainerBind):
def __init__(self, host=None, username=None, password=None, port=22):
super().__init__(host, username, password, port)
def __init__(self, **kwargs):
super().__init__(**kwargs)
from podman import PodmanClient

self._ClientClass = PodmanClient
if self.host == "localhost":
self.uri = "unix:///run/user/1000/podman/podman.sock"
else:
self.uri = f"http+ssh://{username}@{host}:{port}/run/podman/podman.sock"
self.uri = (
"http+ssh://{username}@{host}:{port}/run/podman/podman.sock".format(
**kwargs
)
)


class DockerBind(ContainerBind):
def __init__(self, host=None, username=None, password=None, port=2375):
super().__init__(host, username, password, port)
def __init__(self, port=2375, **kwargs):
kwargs["port"] = port
super().__init__(**kwargs)
from docker import DockerClient

self._ClientClass = DockerClient
if self.host == "localhost":
self.uri = "unix://var/run/docker.sock"
else:
self.uri = f"ssh://{username}@{host}"
self.uri = "ssh://{username}@{host}".format(**kwargs)
39 changes: 38 additions & 1 deletion broker/helpers.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
"""Miscellaneous helpers live here"""
from contextlib import contextmanager
import getpass
import inspect
import json
import logging
import os
import sys
import tarfile
import time
from collections import UserDict, namedtuple
from collections.abc import MutableMapping
from copy import deepcopy
from pathlib import Path
from uuid import uuid4

import yaml
from logzero import logger
Expand Down Expand Up @@ -497,11 +500,45 @@ def find_origin():
"""Move up the call stack to find tests, fixtures, or cli invocations"""
prev = None
for frame in inspect.stack():
if frame.function == "checkout" and frame.filename.endswith("broker/commands.py"):
if frame.function == "checkout" and frame.filename.endswith(
"broker/commands.py"
):
return f"broker_cli:{getpass.getuser()}"
if frame.function.startswith("test_"):
return f"{frame.function}:{frame.filename}"
if frame.function == "call_fixture_func":
return prev or "Uknown fixture"
prev = f"{frame.function}:{frame.filename}"
return f"Unknown origin by {getpass.getuser()}"


@contextmanager
def data_to_tempfile(data, path=None, as_tar=False):
"""Write data to a temporary file and return the path"""
path = Path(path or uuid4().hex[-10])
logger.debug(f"Creating temporary file {path.absolute()}")
if isinstance(data, bytes):
path.write_bytes(data)
elif isinstance(data, str):
path.write_text(data)
else:
raise TypeError(f"data must be bytes or str, not {type(data)}")
if as_tar:
tar = tarfile.open(path)
yield tarfile.open(path)
tar.close()
else:
yield path
path.unlink()


@contextmanager
def temporary_tar(paths):
"""Create a temporary tar file and return the path"""
temp_tar = Path(f"{uuid4().hex[-10]}.tar")
with tarfile.open(temp_tar, mode="w") as tar:
for path in paths:
logger.debug(f"Adding {path.absolute()} to {temp_tar.absolute()}")
tar.add(path, arcname=path.name)
yield temp_tar.absolute()
temp_tar.unlink()
16 changes: 8 additions & 8 deletions broker/hosts.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import pickle
from logzero import logger
from broker.exceptions import NotImplementedError
from broker.session import Session
from broker.session import ContainerSession, Session
from broker.settings import settings


Expand Down Expand Up @@ -36,7 +36,11 @@ def __del__(self):
def session(self):
# This attribute may be missing after pickling
if not isinstance(getattr(self, "_session", None), Session):
self.connect()
# Check to see if we're a non-ssh-enabled Container Host
if hasattr(self, "_cont_inst") and not self._cont_inst.ports.get(22):
self._session = ContainerSession(self)
else:
self.connect()
return self._session

def __getstate__(self):
Expand All @@ -48,7 +52,7 @@ def __getstate__(self):
logger.warning(f"Recursion limit reached on {self._purify_target}")
self.__dict__[self._purify_target] = None
self.__getstate__()
del self.__dict__["_purify_target"]
self.__dict__.pop("_purify_target", None)
return self.__dict__

def _purify(self):
Expand Down Expand Up @@ -85,15 +89,11 @@ def close(self):
# This attribute may be missing after pickling
if isinstance(getattr(self, "_session", None), Session):
self._session.session.disconnect()
self._session = None
self._session = None

def release(self):
raise NotImplementedError("release has not been implemented for this provider")

# @cached_property
def hostname(self):
return self.session.execute("hostname").strip()

# @cached_property
def _pkg_mgr(self):
for mgr in ["yum", "dnf", "zypper"]:
Expand Down
2 changes: 1 addition & 1 deletion broker/providers/ansible_tower.py
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ def construct_host(self, provider_params, host_classes, **kwargs):
if key.endswith("host_type"):
host_type = value if value in host_classes else host_type
if not hostname:
raise Exception(f"No hostname found in job attributes:\n{job_attrs}")
logger.warning(f"No hostname found in job attributes:\n{job_attrs}")
logger.debug(f"hostname: {hostname}, name: {name}, host type: {host_type}")
host_inst = host_classes[host_type](
**{**kwargs, "hostname": hostname, "name": name}
Expand Down
36 changes: 11 additions & 25 deletions broker/providers/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,6 @@
from broker.binds import containers


def container_execute(self, command, demux=True, **kwargs):
"""This method is injected into the container host object on creation"""
kwargs["demux"] = demux
logger.debug(f"{self.hostname} executing command: {command}")
result = self._cont_inst.exec_run(command, **kwargs)
if demux:
result = helpers.Result.from_duplexed_exec(result)
else:
result = helpers.Result.from_nonduplexed_exec(result)
logger.debug(f"{self.hostname} command result:\n{result}")
return result


def container_info(container_inst):
return {
"_broker_provider": "Container",
Expand All @@ -43,9 +30,13 @@ def _cont_inst(self):
return self._cont_inst_p


def _host_release(self):
def _host_release():
caller_host = inspect.stack()[1][0].f_locals["host"]
caller_host._cont_inst.remove(v=True, force=True)
if not caller_host._cont_inst_p:
caller_host._cont_inst_p = caller_host._prov_inst._cont_inst_by_name(
caller_host.name
)
caller_host._cont_inst_p.remove(v=True, force=True)


class Container(Provider):
Expand All @@ -57,6 +48,7 @@ class Container(Provider):
"CONTAINER.host_password",
),
Validator("CONTAINER.host_port", default=22),
Validator("CONTAINER.timeout", default=360),
Validator("CONTAINER.auto_map_ports", is_type_of=bool, default=True),
]
_checkout_options = [
Expand Down Expand Up @@ -101,6 +93,7 @@ def runtime(self):
username=settings.container.host_username,
password=settings.container.host_password,
port=settings.container.host_port,
timeout=settings.container.timeout,
)
return self._runtime

Expand Down Expand Up @@ -139,12 +132,10 @@ def _set_attributes(self, host_inst, broker_args=None, cont_inst=None):
"_cont_inst_p": cont_inst,
"_broker_provider": "Container",
"_broker_args": broker_args,
"release": _host_release,
}
)
host_inst.__class__.release = _host_release
host_inst.__class__._cont_inst = _cont_inst
if not cont_inst.ports.get(22):
host_inst.__class__.execute = container_execute

def _port_mapping(self, image, **kwargs):
"""
Expand Down Expand Up @@ -229,19 +220,14 @@ def nick_help(self, **kwargs):
images = [
img.tags[0]
for img in self.runtime.images
if img.labels.get("broker_compatible")
and img.tags
if img.labels.get("broker_compatible") and img.tags
]
if res_filter := kwargs.get("results_filter"):
images = helpers.results_filter(images, res_filter)
images = "\n".join(images[:results_limit])
logger.info(f"Available host images:\n{images}")
elif kwargs.get("container_apps"):
images = [
img.tags[0]
for img in self.runtime.images
if img.tags
]
images = [img.tags[0] for img in self.runtime.images if img.tags]
if res_filter := kwargs.get("results_filter"):
images = helpers.results_filter(images, res_filter)
images = "\n".join(images[:results_limit])
Expand Down
77 changes: 73 additions & 4 deletions broker/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from logzero import logger
from ssh2.session import Session as ssh2_Session
from ssh2 import sftp as ssh2_sftp
from broker.helpers import simple_retry, translate_timeout, Result
from broker import helpers

SESSIONS = {}

Expand All @@ -30,7 +30,7 @@ def __init__(self, **kwargs):
sock.settimeout(kwargs.get("timeout"))
port = kwargs.get("port", 22)
key_filename = kwargs.get("key_filename")
simple_retry(sock.connect, [(host, port)])
helpers.simple_retry(sock.connect, [(host, port)])
self.session = ssh2_Session()
self.session.handshake(sock)
if key_filename:
Expand All @@ -53,14 +53,14 @@ def _read(channel):
except UnicodeDecodeError as err:
logger.error(f"Skipping data chunk due to {err}\nReceived: {data}")
size, data = channel.read()
return Result.from_ssh(
return helpers.Result.from_ssh(
stdout=results,
channel=channel,
)

def run(self, command, timeout=0):
"""run a command on the host and return the results"""
self.session.set_timeout(translate_timeout(timeout))
self.session.set_timeout(helpers.translate_timeout(timeout))
channel = self.session.open_session()
channel.execute(
command,
Expand Down Expand Up @@ -188,3 +188,72 @@ def stdout(self):
results += data.decode("utf-8")
size, data = self._chan.read()
return results


class ContainerSession:
"""An approximation of ssh-based functionality from the Session class"""

def __init__(self, cont_inst):
self._cont_inst = cont_inst

def run(self, command, demux=True, **kwargs):
"""This is the container approximation of Session.run"""
kwargs.pop("timeout", None) # Timeouts are set at the client level
kwargs["demux"] = demux
if any([s in command for s in "|&><"]):
# Containers don't handle pipes, redirects, etc well in a bare exec_run
command = f"/bin/bash -c '{command}'"
result = self._cont_inst._cont_inst.exec_run(command, **kwargs)
if demux:
result = helpers.Result.from_duplexed_exec(result)
else:
result = helpers.Result.from_nonduplexed_exec(result)
return result

def disconnect(self):
"""Needed for simple compatability with Session"""
pass

def sftp_write(self, sources, destination=None):
"""Add one of more files to the container"""
# ensure sources is a list of Path objects
if not isinstance(sources, list):
sources = [Path(sources)]
else:
sources = [Path(source) for source in sources]
# validate each source's existenence
for source in sources:
if not Path(source).exists():
raise FileNotFoundError(source)
destination = destination or sources[0].parent
# Files need to be added to a tarfile
with helpers.temporary_tar(sources) as tar:
logger.debug(
f"{self._cont_inst.hostname} adding file(s) {source} to {destination}"
)
self._cont_inst._cont_inst.put_archive(str(destination), tar.read_bytes())

def sftp_read(self, source, destination=None):
"""Get a file or directory from the container"""
destination = Path(destination or source)
logger.debug(
f"{self._cont_inst.hostname} getting file {source} from {destination}"
)
data, status = self._cont_inst._cont_inst.get_archive(source)
logger.debug(f"{self._cont_inst.hostname}: {status}")
all_data = b"".join(d for d in data)
if destination.name == "_raw":
return all_data
with helpers.data_to_tempfile(all_data, as_tar=True) as tar:
logger.debug(f"Extracting {source} to {destination}")
tar.extractall(destination.parent if destination.is_file() else destination)

def shell(self, pty=False):
"""Create and return an interactive shell instance"""
raise NotImplementedError("ContainerSession.shell has not been implemented")

def __enter__(self):
return self

def __exit__(self, *args):
pass
Original file line number Diff line number Diff line change
@@ -1 +1 @@
container_host: ch-d:ubi8
container_host: ubi8:latest
2 changes: 1 addition & 1 deletion tests/data/cli_scenarios/containers/execute_ch-d_ubi8.yaml
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
container_app: ch-d:ubi8
container_app: ubi8:latest
command: "ls -lah"
Loading

0 comments on commit 9882032

Please sign in to comment.