diff --git a/src/Cancellation.php b/src/Cancellation.php index 64b0b68..91bc311 100644 --- a/src/Cancellation.php +++ b/src/Cancellation.php @@ -4,25 +4,22 @@ namespace Tak\Async; -use Swoole\Process; +use Swoole\Coroutine\Channel; use Closure; class Cancellation { - readonly private array $processes; + protected array $cids; + protected Channel $channel; readonly private float $canceled; - protected bool $status; + protected bool $status = false; public function __construct(public string $message = 'The task was canceled !'){ - $this->setStatus(true); } public function cancel() : bool { - if(isset($this->processes) and $this->isCanceled() === false and $this->getStatus()): - foreach($this->processes as $pid => $process): - if(Process::kill($pid,SIG_DFL)): - $process->push(serialize(new Errors($this->message))); - Process::kill($pid,SIGKILL); - endif; + if(isset($this->cids) and $this->isCanceled() === false and $this->getStatus()): + foreach($this->cids as $cid): + $this->channel->push(['id'=>$cid,'error'=>new Errors($this->message)]); endforeach; $this->canceled = microtime(true); return true; @@ -39,8 +36,11 @@ public function setStatus(bool $status) : void { public function getStatus() : bool { return $this->status; } - public function setProcesses(array $processes) : void { - $this->processes = $processes; + public function setCids(array $cids) : void { + $this->cids = $cids; + } + public function setChannel(Channel $channel) : void { + $this->channel = $channel; } } diff --git a/src/Errors.php b/src/Errors.php index 93d48cf..1e9e122 100644 --- a/src/Errors.php +++ b/src/Errors.php @@ -10,7 +10,7 @@ use Closure; -class Errors extends Exception { +final class Errors extends Exception { private readonly array $exceptions; static private Closure $handler; @@ -22,13 +22,29 @@ public function __construct(string | array $exceptions,mixed ...$args){ parent::__construct('Multiple exceptions occurred !',...$args); $this->exceptions = $exceptions; endif; - if(isset(self::$handler)) Coroutine::create(self::$handler,...$this->exceptions); } public function getExceptions() : array { return $this->exceptions; } - static public function setErrorHandler(callable $callback) : void { - self::$handler = Closure::fromCallable($callback); + public function throw() : void { + if(isset(self::$handler)): + foreach($this->exceptions as $exception): + if(Tools::inCoroutine()): + Coroutine::create(self::$handler,$exception); + else: + call_user_func(self::$handler,$exception); + endif; + endforeach; + else: + throw $this; + endif; + } + static public function setErrorHandler(callable $callback = null) : void { + if(is_null($callback)): + unset(self::$handler); + else: + self::$handler = Closure::fromCallable($callback); + endif; } } diff --git a/src/Future.php b/src/Future.php index f71b08d..af23238 100644 --- a/src/Future.php +++ b/src/Future.php @@ -21,7 +21,7 @@ public function await(float $timeout = -1) : mixed { if(isset($this->catch)): Coroutine::create($this->catch,$result); elseif($this->ignore === false): - throw $result; + $result->throw(); endif; return null; endif; diff --git a/src/Loop.php b/src/Loop.php index 95fe0f3..2c29cab 100644 --- a/src/Loop.php +++ b/src/Loop.php @@ -14,22 +14,24 @@ final class Loop { protected int $id = 0; public function __construct(public readonly Closure $callback,private int $interval,int $start = 0,int $finish = 0){ - Timer::after($start,$this->start(...)); - Timer::after($finish,$this->finish(...)); + if($start > 0) Timer::after($start,$this->start(...)); + if($finish > 0) Timer::after($finish,$this->finish(...)); } public function start() : void { - if(Timer::exists($this->id) === false): + if($this->interval > 0 and Timer::exists($this->id) === false): $this->id = Timer::tick($this->interval,$this->run(...)); endif; } private function run() : void { $result = call_user_func($this->callback,$this); - if($result === 0): - $this->stop(); - elseif($result !== $this->interval): - $this->interval = intval($result); - Timer::clear($this->id); - $this->start(); + if(is_null($result) === false): + if($result === 0): + $this->stop(); + elseif($result !== $this->interval): + $this->interval = intval($result); + Timer::clear($this->id); + $this->start(); + endif; endif; } public function stop() : void { diff --git a/src/Task.php b/src/Task.php new file mode 100644 index 0000000..48cf22b --- /dev/null +++ b/src/Task.php @@ -0,0 +1,123 @@ + $this->async($callback),$callbacks); + $this->channel = new Channel($count); + } + public function async(callable $callback,mixed ...$args) : self { + $closure = Closure::fromCallable($callback); + if(Tools::isStaticClosure($closure) === false): + $closure = $closure->bindTo($this); + endif; + $create = function() use($closure,$args) : void { + $cid = Coroutine::create(function() use($closure,$args) : void { + try { + $result = call_user_func($closure,...$args); + $this->channel->push(['id'=>Coroutine::getCid(),'result'=>$result]); + } catch(Throwable $error){ + $this->channel->push(['id'=>Coroutine::getCid(),'error'=>$error]); + } + }); + if(is_int($cid)) $this->cids []= $cid; + }; + if(Tools::inCoroutine()): + $create->call($this); + else: + run($create); + endif; + return $this; + } + public function await(Cancellation $cancellation = null) : mixed { + $join = function() use($cancellation) : void { + if(is_null($cancellation) === false): + $cancellation->setCids($this->cids); + $cancellation->setChannel($this->channel); + $cancellation->setStatus(true); + endif; + foreach($this->cids as $cid): + do { + $pop = $this->channel->pop(); + } while(in_array($pop['id'],$this->results) or in_array($pop['id'],$this->errors) or in_array($pop['id'],$this->cids) === false); + if(isset($pop['result'])): + $this->results[$pop['id']] = $pop['result']; + elseif(isset($pop['error'])): + $this->errors[$pop['id']] = $pop['error']; + endif; + endforeach; + if(is_null($cancellation) === false): + $cancellation->setStatus(false); + endif; + }; + if(Tools::inCoroutine()): + $join->call($this); + else: + run($join); + endif; + if(empty($this->errors) === false): + if(isset($this->catch)): + foreach($this->errors as $pid => $error): + if(Tools::inCoroutine()): + Coroutine::create($this->catch,$error,$pid); + else: + call_user_func($this->catch,$error,$pid); + endif; + endforeach; + elseif($this->ignore === false): + $exception = new Errors($this->errors); + $exception->throw(); + endif; + endif; + if(isset($this->finally)): + if(Tools::inCoroutine()): + Coroutine::defer($this->finally); + else: + call_user_func($this->finally); + endif; + endif; + return count($this->cids) < 2 ? current($this->results) : $this->results; + } + public function sleep(float $seconds) : bool { + return Coroutine::sleep($seconds); + } + public function cancel() : bool { + $canceled = array_map(fn(int $cid) : bool => Coroutine::cancel($cid),$this->cids); + return in_array(false,$canceled) === false; + } + public function ignore() : self { + $this->ignore = true; + return $this; + } + public function catch(callable $catch) : self { + $this->catch = Closure::fromCallable($catch); + return $this; + } + public function finally(callable $finally) : self { + $this->finally = Closure::fromCallable($finally); + return $this; + } +} + +?> \ No newline at end of file diff --git a/src/TimeoutCancellation.php b/src/TimeoutCancellation.php index 3efaef0..4536134 100644 --- a/src/TimeoutCancellation.php +++ b/src/TimeoutCancellation.php @@ -6,13 +6,23 @@ use Swoole\Timer; -class TimeoutCancellation extends Cancellation { +final class TimeoutCancellation extends Cancellation { + private int $timer; + public function __construct(private float $timeout,public string $message = 'Time out !'){ - $this->setStatus(true); } - public function setProcesses(array $processes) : void { - $this->processes = $processes; - Timer::after(intval($this->timeout * 1000),$this->cancel(...)); + public function setStatus(bool $status) : void { + $this->status = $status; + if($status): + if(isset($this->timer)): + $exception = new Errors('The cancellation has already been used !'); + $exception->throw(); + else: + $this->timer = Timer::after(intval($this->timeout * 1000),$this->cancel(...)); + endif; + else: + Timer::clear($this->timer); + endif; } } diff --git a/src/Tools.php b/src/Tools.php new file mode 100644 index 0000000..3dd30aa --- /dev/null +++ b/src/Tools.php @@ -0,0 +1,23 @@ +isStatic(); + } + static public function inCoroutine() : bool { + return boolval(Coroutine::getCid() > 0); + } +} + +?> \ No newline at end of file diff --git a/src/functions.php b/src/functions.php index c9c3ee7..6f25628 100644 --- a/src/functions.php +++ b/src/functions.php @@ -4,18 +4,15 @@ namespace Tak; -use Swoole\Coroutine: - -function async(callable $callback,mixed ...$args) : Run { - $run = new Run; - return $run->async($callback,...$args); +function async(callable $callback,mixed ...$args) : Task { + return (new Task)->async($callback,...$args); } function delay(float $seconds) : bool { - return Coroutine::sleep($seconds); + return (new Task)->sleep($seconds); } -function setErrorHandler(callable $callback) : void { +function setErrorHandler(callable $callback = null) : void { Errors::setErrorHandler($callback); }