Skip to content

Commit

Permalink
Add files via upload
Browse files Browse the repository at this point in the history
  • Loading branch information
Tak-Pesar authored Jul 1, 2024
1 parent 8122af9 commit 13153c4
Show file tree
Hide file tree
Showing 8 changed files with 209 additions and 38 deletions.
24 changes: 12 additions & 12 deletions src/Cancellation.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,22 @@

namespace Tak\Async;

use Swoole\Process;
use Swoole\Coroutine\Channel;

use Closure;

class Cancellation {
readonly private array $processes;
protected array $cids;
protected Channel $channel;
readonly private float $canceled;
protected bool $status;
protected bool $status = false;

public function __construct(public string $message = 'The task was canceled !'){
$this->setStatus(true);
}
public function cancel() : bool {
if(isset($this->processes) and $this->isCanceled() === false and $this->getStatus()):
foreach($this->processes as $pid => $process):
if(Process::kill($pid,SIG_DFL)):
$process->push(serialize(new Errors($this->message)));
Process::kill($pid,SIGKILL);
endif;
if(isset($this->cids) and $this->isCanceled() === false and $this->getStatus()):
foreach($this->cids as $cid):
$this->channel->push(['id'=>$cid,'error'=>new Errors($this->message)]);
endforeach;
$this->canceled = microtime(true);
return true;
Expand All @@ -39,8 +36,11 @@ public function setStatus(bool $status) : void {
public function getStatus() : bool {
return $this->status;
}
public function setProcesses(array $processes) : void {
$this->processes = $processes;
public function setCids(array $cids) : void {
$this->cids = $cids;
}
public function setChannel(Channel $channel) : void {
$this->channel = $channel;
}
}

Expand Down
24 changes: 20 additions & 4 deletions src/Errors.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

use Closure;

class Errors extends Exception {
final class Errors extends Exception {
private readonly array $exceptions;
static private Closure $handler;

Expand All @@ -22,13 +22,29 @@ public function __construct(string | array $exceptions,mixed ...$args){
parent::__construct('Multiple exceptions occurred !',...$args);
$this->exceptions = $exceptions;
endif;
if(isset(self::$handler)) Coroutine::create(self::$handler,...$this->exceptions);
}
public function getExceptions() : array {
return $this->exceptions;
}
static public function setErrorHandler(callable $callback) : void {
self::$handler = Closure::fromCallable($callback);
public function throw() : void {
if(isset(self::$handler)):
foreach($this->exceptions as $exception):
if(Tools::inCoroutine()):
Coroutine::create(self::$handler,$exception);
else:
call_user_func(self::$handler,$exception);
endif;
endforeach;
else:
throw $this;
endif;
}
static public function setErrorHandler(callable $callback = null) : void {
if(is_null($callback)):
unset(self::$handler);
else:
self::$handler = Closure::fromCallable($callback);
endif;
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/Future.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public function await(float $timeout = -1) : mixed {
if(isset($this->catch)):
Coroutine::create($this->catch,$result);
elseif($this->ignore === false):
throw $result;
$result->throw();
endif;
return null;
endif;
Expand Down
20 changes: 11 additions & 9 deletions src/Loop.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,24 @@ final class Loop {
protected int $id = 0;

public function __construct(public readonly Closure $callback,private int $interval,int $start = 0,int $finish = 0){
Timer::after($start,$this->start(...));
Timer::after($finish,$this->finish(...));
if($start > 0) Timer::after($start,$this->start(...));
if($finish > 0) Timer::after($finish,$this->finish(...));
}
public function start() : void {
if(Timer::exists($this->id) === false):
if($this->interval > 0 and Timer::exists($this->id) === false):
$this->id = Timer::tick($this->interval,$this->run(...));
endif;
}
private function run() : void {
$result = call_user_func($this->callback,$this);
if($result === 0):
$this->stop();
elseif($result !== $this->interval):
$this->interval = intval($result);
Timer::clear($this->id);
$this->start();
if(is_null($result) === false):
if($result === 0):
$this->stop();
elseif($result !== $this->interval):
$this->interval = intval($result);
Timer::clear($this->id);
$this->start();
endif;
endif;
}
public function stop() : void {
Expand Down
123 changes: 123 additions & 0 deletions src/Task.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
<?php

declare(strict_types = 1);

namespace Tak\Async;

use Swoole\Coroutine;

use Swoole\Coroutine\Channel;

use Throwable;

use Closure;

use function Swoole\Coroutine\run;

final class Task {
protected array $cids = array();
protected array $results = array();
protected array $errors = array();
protected Channel $channel;
public Closure $catch;
public Closure $finally;
public bool $ignore = false;

public function __construct(int $count = PHP_INT_MAX,callable ...$callbacks){
array_map(fn(callable $callback) : object => $this->async($callback),$callbacks);
$this->channel = new Channel($count);
}
public function async(callable $callback,mixed ...$args) : self {
$closure = Closure::fromCallable($callback);
if(Tools::isStaticClosure($closure) === false):
$closure = $closure->bindTo($this);
endif;
$create = function() use($closure,$args) : void {
$cid = Coroutine::create(function() use($closure,$args) : void {
try {
$result = call_user_func($closure,...$args);
$this->channel->push(['id'=>Coroutine::getCid(),'result'=>$result]);
} catch(Throwable $error){
$this->channel->push(['id'=>Coroutine::getCid(),'error'=>$error]);
}
});
if(is_int($cid)) $this->cids []= $cid;
};
if(Tools::inCoroutine()):
$create->call($this);
else:
run($create);
endif;
return $this;
}
public function await(Cancellation $cancellation = null) : mixed {
$join = function() use($cancellation) : void {
if(is_null($cancellation) === false):
$cancellation->setCids($this->cids);
$cancellation->setChannel($this->channel);
$cancellation->setStatus(true);
endif;
foreach($this->cids as $cid):
do {
$pop = $this->channel->pop();
} while(in_array($pop['id'],$this->results) or in_array($pop['id'],$this->errors) or in_array($pop['id'],$this->cids) === false);
if(isset($pop['result'])):
$this->results[$pop['id']] = $pop['result'];
elseif(isset($pop['error'])):
$this->errors[$pop['id']] = $pop['error'];
endif;
endforeach;
if(is_null($cancellation) === false):
$cancellation->setStatus(false);
endif;
};
if(Tools::inCoroutine()):
$join->call($this);
else:
run($join);
endif;
if(empty($this->errors) === false):
if(isset($this->catch)):
foreach($this->errors as $pid => $error):
if(Tools::inCoroutine()):
Coroutine::create($this->catch,$error,$pid);
else:
call_user_func($this->catch,$error,$pid);
endif;
endforeach;
elseif($this->ignore === false):
$exception = new Errors($this->errors);
$exception->throw();
endif;
endif;
if(isset($this->finally)):
if(Tools::inCoroutine()):
Coroutine::defer($this->finally);
else:
call_user_func($this->finally);
endif;
endif;
return count($this->cids) < 2 ? current($this->results) : $this->results;
}
public function sleep(float $seconds) : bool {
return Coroutine::sleep($seconds);
}
public function cancel() : bool {
$canceled = array_map(fn(int $cid) : bool => Coroutine::cancel($cid),$this->cids);
return in_array(false,$canceled) === false;
}
public function ignore() : self {
$this->ignore = true;
return $this;
}
public function catch(callable $catch) : self {
$this->catch = Closure::fromCallable($catch);
return $this;
}
public function finally(callable $finally) : self {
$this->finally = Closure::fromCallable($finally);
return $this;
}
}

?>
20 changes: 15 additions & 5 deletions src/TimeoutCancellation.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,23 @@

use Swoole\Timer;

class TimeoutCancellation extends Cancellation {
final class TimeoutCancellation extends Cancellation {
private int $timer;

public function __construct(private float $timeout,public string $message = 'Time out !'){
$this->setStatus(true);
}
public function setProcesses(array $processes) : void {
$this->processes = $processes;
Timer::after(intval($this->timeout * 1000),$this->cancel(...));
public function setStatus(bool $status) : void {
$this->status = $status;
if($status):
if(isset($this->timer)):
$exception = new Errors('The cancellation has already been used !');
$exception->throw();
else:
$this->timer = Timer::after(intval($this->timeout * 1000),$this->cancel(...));
endif;
else:
Timer::clear($this->timer);
endif;
}
}

Expand Down
23 changes: 23 additions & 0 deletions src/Tools.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
<?php

declare(strict_types = 1);

namespace Tak\Async;

use Swoole\Coroutine;

use ReflectionFunction;

use Closure;

abstract class Tools {
static public function isStaticClosure(Closure $closure) : bool {
$reflection = new ReflectionFunction($closure);
return $reflection->isStatic();
}
static public function inCoroutine() : bool {
return boolval(Coroutine::getCid() > 0);
}
}

?>
11 changes: 4 additions & 7 deletions src/functions.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,15 @@

namespace Tak;

use Swoole\Coroutine:

function async(callable $callback,mixed ...$args) : Run {
$run = new Run;
return $run->async($callback,...$args);
function async(callable $callback,mixed ...$args) : Task {
return (new Task)->async($callback,...$args);
}

function delay(float $seconds) : bool {
return Coroutine::sleep($seconds);
return (new Task)->sleep($seconds);
}

function setErrorHandler(callable $callback) : void {
function setErrorHandler(callable $callback = null) : void {
Errors::setErrorHandler($callback);
}

Expand Down

0 comments on commit 13153c4

Please sign in to comment.