Skip to content

Commit

Permalink
fix lookup Redirect response
Browse files Browse the repository at this point in the history
  • Loading branch information
ikilobyte committed May 24, 2024
1 parent 33404ae commit 1896968
Showing 1 changed file with 76 additions and 30 deletions.
106 changes: 76 additions & 30 deletions src/Lookup/TcpLookupService.php
Original file line number Diff line number Diff line change
Expand Up @@ -75,42 +75,60 @@ public function __construct(Options $options)
* @return Result
* @throws IOException
* @throws RuntimeException
* @throws OptionsException
*/
public function lookup(string $topic): Result
{
$command = new CommandLookupTopic();
$command->setRequestId(Helper::getRequestID());
$command->setAuthoritative(false);
$command->setTopic($topic);
$response = $this->connection->writeCommand(Type::LOOKUP(), $command)->wait();
$subCommand = $this->request($this->connection, $topic);

/**
* @var $subCommand CommandLookupTopicResponse
*/
$subCommand = $response->subCommand;
$brokerServiceUrl = $this->getBrokerAddress($subCommand);
$parse = parse_url($brokerServiceUrl);
for ($i = 0; $i < 20; $i++) {

list($brokerServiceUrl, $proxyServiceUrl) = $this->getBrokerAddress($subCommand);

switch ($subCommand->getResponse()->value()) {
$parse = parse_url($brokerServiceUrl);

// Need to connect to a new broker
// TLS is supported at this time
case CommandLookupTopicResponse\LookupType::Redirect_VALUE:
switch ($subCommand->getResponse()->value()) {

// TODO
return new Result($parse['host'], $parse['port'], $brokerServiceUrl);
// 1、Connect to a broker using a broker
// 2、Send lookups to new brokers
// 3、until you return to Connect
// 4、Maximum 20 attempts
case CommandLookupTopicResponse\LookupType::Redirect_VALUE:

// is the broker where the current connection is located
// But it also creates a new connection
// Instead of using this current connection
// TLS is supported at this time
case CommandLookupTopicResponse\LookupType::Connect_VALUE:
return new Result($parse['host'], $parse['port'], $brokerServiceUrl);
// clone the Options
$options = clone $this->options;

//
default:
throw new RuntimeException($subCommand->getMessage());
$options->setUrl($brokerServiceUrl);

// create Connection
$connection = Factory::create($options);

// establish a connection
$connection->connect($parse['host'], $parse['port']);

// handshake
$connection->handshake($options->offsetGet(Options::Authentication));

// lookup
$subCommand = $this->request($connection, $topic, $subCommand->getAuthoritative());

break;

// is the broker where the current connection is located
// But it also creates a new connection
// Instead of using this current connection
// TLS is supported at this time
case CommandLookupTopicResponse\LookupType::Connect_VALUE:

return new Result($parse['host'], $parse['port'], $proxyServiceUrl);

//
default:
throw new RuntimeException($subCommand->getMessage());
}
}

throw new RuntimeException('Maximum number of topic searches exceeded');
}


Expand Down Expand Up @@ -144,23 +162,51 @@ public function getPartitionedTopicMetadata(string $topic): int

/**
* @param CommandLookupTopicResponse $response
* @return string
* @return array<string>
* @throws OptionsException
*/
protected function getBrokerAddress(CommandLookupTopicResponse $response): string
protected function getBrokerAddress(CommandLookupTopicResponse $response): array
{
$brokerServiceUrl = $response->getBrokerServiceUrl();
if ($this->options->isTLS()) {
$brokerServiceUrl = $response->getBrokerServiceUrlTls();
} else {
$brokerServiceUrl = $response->getBrokerServiceUrl();
}

// Through the current service agent
$proxyBrokerServiceUrl = $brokerServiceUrl;

if ($response->getProxyThroughServiceUrl()) {
$brokerServiceUrl = $this->options->data['url'];
}

return $brokerServiceUrl;
return [$brokerServiceUrl, $proxyBrokerServiceUrl];
}



/**
* @param AbstractIO $connect
* @param string $topic
* @param bool $authoritative
* @return CommandLookupTopicResponse
* @throws IOException
*/
protected function request(AbstractIO $connect, string $topic, bool $authoritative = false): CommandLookupTopicResponse
{
$command = new CommandLookupTopic();
$command->setRequestId(Helper::getRequestID());
$command->setAuthoritative($authoritative);
$command->setTopic($topic);
$response = $connect->writeCommand(Type::LOOKUP(), $command)->wait();

/**
* @var $subCommand CommandLookupTopicResponse
*/
$subCommand = $response->subCommand;
return $subCommand;
}

/**
* @return void
*/
Expand Down

0 comments on commit 1896968

Please sign in to comment.