Skip to content

Commit

Permalink
Use a plain old select loop
Browse files Browse the repository at this point in the history
Use `select.select` to track subproc completion since `select.epoll`
isn't available on OSX.

Resolves #16
  • Loading branch information
Tyler Goodlet committed Jun 17, 2016
1 parent 24c5077 commit 5e30bb3
Showing 1 changed file with 33 additions and 11 deletions.
44 changes: 33 additions & 11 deletions pysipp/launch.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@ def __init__(
self,
subprocmod=subprocess,
osmod=os,
poller=select.epoll,
select=select.select,
):
# these could optionally be rpyc proxy objs
self.spm = subprocmod
self.osm = osmod
self.poller = poller()
self.select = select
# collector thread placeholder
self._waiter = None
# store proc results
Expand Down Expand Up @@ -67,11 +67,9 @@ def __call__(self, cmds, block=True, rate=300, **kwargs):
stderr=sp.PIPE
)
fd = proc.stderr.fileno()
log.debug("registering fd '{}' for pid '{}'".format(
fd, proc.pid))
log.debug("registering fd '{}' for pid '{}'".format(fd, proc.pid))
fds2procs[fd] = self._procs[cmd] = proc
# register for stderr hangup events
self.poller.register(proc.stderr.fileno(), select.EPOLLHUP)

# limit launch rate
time.sleep(1. / rate)

Expand All @@ -87,15 +85,39 @@ def _wait(self, fds2procs):
signalled = None
left = len(fds2procs)
collected = 0
p2fd = {p: p.stderr for p in fds2procs.values()}
stderrs = {p: [] for p in fds2procs.values()}

# wait on stderr hangup events
while collected < left:
pairs = self.poller.poll() # wait on hangup events
log.debug("received hangup for pairs '{}'".format(pairs))
for fd, status in pairs:
try:
fds, _, _ = self.select(list(p2fd.values()), [], [])
except ValueError:
# all fds are now closed
hungup = p2fd.keys()
else:
hungup = []
for proc in (fds2procs[fd.fileno()] for fd in fds):
data = proc.stderr.read()
if data != '':
stderrs[proc].append(data) # append stderr
continue

p2fd.pop(proc)
hungup.append(proc)

if not hungup:
continue

for proc in hungup:
log.debug("received hangup for pid '{}'".format(proc.pid))
collected += 1
proc = fds2procs[fd]
# attach streams so they can be read more then once
log.debug("collecting streams for {}".format(proc))
proc.streams = Streams(*proc.communicate()) # timeout=2))
proc.streams = Streams(
stdout=proc.communicate()[0],
stderr=''.join(stderrs[proc]),
) # timeout=2))
if proc.returncode != 0 and not signalled:
# stop all other agents if there is a failure
signalled = self.stop()
Expand Down

0 comments on commit 5e30bb3

Please sign in to comment.