Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes for zsh/zpty fd message exchange #45

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ ok ./async_test.zsh 0.070s

## Limitations

* Processing a lot of output (>10240 bytes) can be slow due to the default value of `ASYNC_MAX_BUFFER_SIZE` (default `1020`). The low default value is a safety precaution, raising it is unlikely to cause problems on e.g. Linux, but caution is warranted.
* A NULL-character (`$'\0'`) is used by `async_job` to signify the end of the command, it is recommended not to pass them as arguments, although they should work when passing multiple arguments to `async_job` (because of quoting).
* Tell me? :)

Expand Down
97 changes: 69 additions & 28 deletions async.zsh
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,21 @@
typeset -g ASYNC_VERSION=1.8.6
# Produce debug output from zsh-async when set to 1.
typeset -g ASYNC_DEBUG=${ASYNC_DEBUG:-0}
# When ASYNC_DEBUG=1, worker stderr output will be redirected here.
typeset -g ASYNC_DEBUG_WORKER_STDERR=${ASYNC_DEBUG_WORKER_STDERR:-/dev/null}

# The maximum buffer size when outputting to zpty.
#
# When processing large amounts of data, the limit of 1024 bytes is
# slow. If you're going to output a lot more than that, consider
# increasing the buffer size.
#
# This value was chosen as a safe limit for macOS and other systems that
# have a low limit (1024) for the buffer, on Linux this can likely be
# raised significantly.
#
# Note: Subtract 4 to accommodate "\r\n" times two.
typeset -g ASYNC_MAX_BUFFER_SIZE=${ASYNC_MAX_BUFFER_SIZE:-$((1024 - 4))}

# Execute commands that can manipulate the environment inside the async worker. Return output via callback.
_async_eval() {
Expand All @@ -20,17 +35,17 @@ _async_eval() {
# simplicity, this could be improved in the future.
{
eval "$@"
} &> >(ASYNC_JOB_NAME=[async/eval] _async_job 'command -p cat')
} &> >(ASYNC_JOB_NAME=[async/eval] _async_job 0 'command -p cat')
}

# Wrapper for jobs executed by the async worker, gives output in parseable format with execution time
_async_job() {
# Disable xtrace as it would mangle the output.
setopt localoptions noxtrace

# Store start time for job.
float -F duration=$EPOCHREALTIME

# Parent pid for notifications via kill signal.
local parent_pid=$1; shift

# Run the command and capture both stdout (`eval`) and stderr (`cat`) in
# separate subshells. When the command is complete, we grab write lock
# (mutex token) and output everything except stderr inside the command
Expand All @@ -40,6 +55,10 @@ _async_job() {
local jobname=${ASYNC_JOB_NAME:-$1} out
out="$(
local stdout stderr ret tok

# Disable xtrace as it would mangle the stderr. The user can
# still enable xtrace inside the async job, if required.
setopt noxtrace
{
stdout=$(eval "$@")
ret=$?
Expand All @@ -56,8 +75,33 @@ _async_job() {
# Grab mutex lock, stalls until token is available.
read -r -k 1 -p tok || return 1

# Return output (<job_name> <return_code> <stdout> <duration> <stderr>).
print -r -n - "$out"
# Chunk up the output so as to not fill up the entire fd.
for ((i = 1; i < $#out; i += ASYNC_MAX_BUFFER_SIZE)); do
# Note: We are surrounding the (potentially partial) message in newlines
# here in an attempt to flush the file descriptor and prevent behavior
# that could cause zpty to hang. Literal newlines will be filtered by
# async_process_results. Any newlines in the job output will survive, as
# they are quoted.
#
# Return output (<job_name> <return_code> <stdout> <duration> <stderr>).
if ! print -r -n - $'\n'"${out[$i,$((i + ASYNC_MAX_BUFFER_SIZE - 1))]}"$'\n'; then
# BUG(mafredri): The worker and parent process should be informed.
# Unlock mutex to prevent a deadlock.
print -n -p $tok
break
fi

# When notifications are enabled, inform the parent that the
# buffer is filling up and must be consumed.
if ((parent_pid)); then
# On older version of zsh (pre 5.2) we notify the parent through a
# SIGWINCH signal because `zpty` did not return a file descriptor
# (fd) prior to that. We use SIGWINCH for because other signals
# (INFO, ALRM, USR1, etc.) can cause a deadlock in some situations.
# (The deadlock was fixed in zsh 5.1.1.)
kill -WINCH $parent_pid
fi
done

# Unlock mutex by inserting a token.
print -n -p $tok
Expand All @@ -72,11 +116,18 @@ _async_worker() {
# pids of child processes.
unsetopt monitor

# Redirect stderr to `/dev/null` in case unforseen errors produced by the
# Redirect stderr to `/dev/null` in case unforeseen errors produced by the
# worker. For example: `fork failed: resource temporarily unavailable`.
# Some older versions of zsh might also print malloc errors (know to happen
# on at least zsh 5.0.2 and 5.0.8) likely due to kill signals.
exec 2>/dev/null
if ((ASYNC_DEBUG)); then
exec 2>>${ASYNC_DEBUG_WORKER_STDERR}
if [[ $ASYNC_DEBUG_WORKER_STDERR != /dev/null ]]; then
setopt xtrace
fi
else
exec 2>/dev/null
fi

# When a zpty is deleted (using -d) all the zpty instances created before
# the one being deleted receive a SIGHUP, unless we catch it, the async
Expand Down Expand Up @@ -115,22 +166,8 @@ _async_worker() {
fi
}

child_exit() {
close_idle_coproc

# On older version of zsh (pre 5.2) we notify the parent through a
# SIGWINCH signal because `zpty` did not return a file descriptor (fd)
# prior to that.
if (( notify_parent )); then
# We use SIGWINCH for compatibility with older versions of zsh
# (pre 5.1.1) where other signals (INFO, ALRM, USR1, etc.) could
# cause a deadlock in the shell under certain circumstances.
kill -WINCH $parent_pid
fi
}

# Register a SIGCHLD trap to handle the completion of child processes.
trap child_exit CHLD
trap close_idle_coproc CHLD

# Process option parameters passed to worker.
while getopts "np:uz" opt; do
Expand All @@ -141,6 +178,9 @@ _async_worker() {
z) notify_parent=0;; # Uses ZLE watcher instead.
esac
done
if ((!notify_parent)) {
parent_pid=0
}

# Terminate all running jobs, note that this function does not
# reinstall the child trap.
Expand Down Expand Up @@ -173,10 +213,10 @@ _async_worker() {
(( coproc_pid )) && read -r -k 1 -p tok

terminate_jobs
trap child_exit CHLD # Reinstall child trap.
trap close_idle_coproc CHLD # Reinstall child trap.
}

local request do_eval=0
local request job do_eval=0
local -a cmd
while :; do
# Wait for jobs sent by async_job.
Expand Down Expand Up @@ -211,7 +251,7 @@ _async_worker() {
cmd=("${(z)request}")

# Name of the job (first argument).
local job=$cmd[1]
job=$cmd[1]

# Check if a worker should perform unique jobs, unless
# this is an eval since they run synchronously.
Expand Down Expand Up @@ -243,7 +283,7 @@ _async_worker() {
_async_eval $cmd
else
# Run job in background, completed jobs are printed to stdout.
_async_job $cmd &
_async_job $parent_pid $cmd &
# Store pid because zsh job manager is extremely unflexible (show jobname as non-unique '$job')...
storage[$job]="$!"
fi
Expand Down Expand Up @@ -292,7 +332,8 @@ async_process_results() {

# Read output from zpty and parse it if available.
while zpty -r -t $worker data 2>/dev/null; do
ASYNC_PROCESS_BUFFER[$worker]+=$data
# Trim newlines that are not part of the data.
ASYNC_PROCESS_BUFFER[$worker]+=${${data//$'\r'/}//$'\n'/}
len=${#ASYNC_PROCESS_BUFFER[$worker]}
pos=${ASYNC_PROCESS_BUFFER[$worker][(i)$null]} # Get index of NULL-character (delimiter).

Expand Down
94 changes: 89 additions & 5 deletions async_test.zsh
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
#!/usr/bin/env zsh

autoload -Uz is-at-least

test__async_job_print_hi() {
coproc cat
print -n -p t # Insert token into coproc.

local line
local -a out
line=$(_async_job print hi)
line=$(_async_job 0 print hi)
# Remove leading/trailing null, parse, unquote and interpret as array.
line=${${line//$'\r'}//$'\n'}
line=$line[2,$#line-1]
out=("${(@Q)${(z)line}}")

Expand All @@ -24,8 +27,9 @@ test__async_job_stderr() {

local line
local -a out
line=$(_async_job print 'hi 1>&2')
line=$(_async_job 0 print 'hi 1>&2')
# Remove trailing null, parse, unquote and interpret as array.
line=${${line//$'\r'}//$'\n'}
line=$line[1,$#line-1]
out=("${(@Q)${(z)line}}")

Expand Down Expand Up @@ -63,8 +67,9 @@ test__async_job_multiple_commands() {

local line
local -a out
line="$(_async_job print '-n hi; for i in "1 2" 3 4; do print -n $i; done')"
line="$(_async_job 0 print '-n hi; for i in "1 2" 3 4; do print -n $i; done')"
# Remove trailing null, parse, unquote and interpret as array.
line=${${line//$'\r'}//$'\n'}
line=$line[1,$#line-1]
out=("${(@Q)${(z)line}}")

Expand Down Expand Up @@ -507,7 +512,7 @@ setopt_helper() {
test_all_options() {
local -a opts exclude

if [[ $ZSH_VERSION == 5.0.? ]]; then
if ! is-at-least 5.1; then
t_skip "Test is not reliable on zsh 5.0.X"
fi

Expand Down Expand Up @@ -593,7 +598,9 @@ zpty_deinit() {
}

test_zle_watcher() {
t_skip "Test is not reliable on zsh 5.0.X"
if ! is-at-least 5.1; then
t_skip "Test is not reliable on zsh 5.0.X"
fi

setopt localoptions
zpty_init '
Expand Down Expand Up @@ -627,6 +634,83 @@ test_zle_watcher() {
}
}

test_lorem_ipsum_stress() {
local -a result
cb() { result=("$@") }

# About 10k characters.
local want='Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua.'
local times=80

async_start_worker test
async_job test "out=; for i in {1..$times}; do out+=\$'$want\n'; done; print \"\$out\";"
while ! async_process_results test cb; do :; done
async_stop_worker test

[[ $result[1] = 'out=' ]] || t_error "want command name: out=, got" $result[1]
[[ $result[2] = 0 ]] || t_error "want exit code: 0, got" $result[2]

local want_full=$want
for i in {2..$times}; do
want_full+=$'\n'$want
done

[[ $result[3] = $want_full ]] || {
t_error "want output: ${(Vq-)want} * $times, got" ${(Vq-)result[3]}
}
}

test_lorem_ipsum_stress_zle() {
if ! is-at-least 5.1; then
t_skip "Test is not reliable on zsh 5.0.X"
fi

setopt localoptions
zpty_init '
emulate -R zsh
setopt zle
stty 38400 columns 80 rows 24 tabs -icanon -iexten
TERM=vt100

. "'$PWD'/async.zsh"
async_init

print_result_cb() { print $3 }
async_start_worker test
async_register_callback test print_result_cb
' || {
zpty_deinit
t_fatal "failed to init zpty"
}

t_defer zpty_deinit # Deinit after test completion.

# About 10k characters.
local want='Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua.'
local times=80

cmd="out=; for i in {1..$times}; do out+=\$'$want\n'; done; print \"\${out}EOF\""
zpty_run async_job test ${(q)cmd} || t_fatal "could not send async_job command"

zpty -r -m zsh result "*EOF" || {
t_fatal "want lorem ipsum followed by \"EOF\", got output ${(Vq-)result}"
}

# Remove terminal codes preceding the output.
result=Lorem${result#*Lorem}
result=${result//$'\r'/}
result=${result%$'\n'EOF}

local want_full=$want
for i in {2..$times}; do
want_full+=$'\n'$want
done

[[ $result = $want_full ]] || {
t_error "want output: ${(Vq-)want} * $times, got ${(Vq-)result}"
}
}

test_main() {
# Load zsh-async before running each test.
zmodload zsh/datetime
Expand Down