Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
"google/protobuf": "^3.7 || ^4.0",
"spiral/roadrunner-worker": "^3.0",
"spiral/goridge": "^4.0",
"spiral/roadrunner": "^2023.1 || ^2024.1"
"spiral/roadrunner": "^2024.3"
},
"require-dev": {
"jetbrains/phpstorm-attributes": "^1.0",
Expand Down
2 changes: 1 addition & 1 deletion psalm-baseline.xml
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
<?xml version="1.0" encoding="UTF-8"?>
<files psalm-version="dev-master@765dcbfe43002e52e4808b65561842784fe7bcc7"/>
<files psalm-version="5.26.1@d747f6500b38ac4f7dfc5edbcae6e4b637d7add0"/>
46 changes: 46 additions & 0 deletions src/Internal/CallContext.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
<?php

declare(strict_types=1);

namespace Spiral\RoadRunner\GRPC\Internal;

use Spiral\RoadRunner\GRPC\ServiceInterface;

/**
* @internal
* @psalm-internal Spiral\RoadRunner\GRPC
*/
final class CallContext
{
/**
* @param class-string<ServiceInterface> $service
* @param non-empty-string $method
* @param array<string, array<string>> $context
*/
public function __construct(
public readonly string $service,
public readonly string $method,
public readonly array $context,
) {}

/**
* @throws \JsonException
*/
public static function decode(string $payload): self
{
/**
* @psalm-var array{
* service: class-string<ServiceInterface>,
* method: non-empty-string,
* context: array<string, array<string>>
* } $data
*/
$data = Json::decode($payload);

return new self(
service: $data['service'],
method: $data['method'],
context: $data['context'],
);
Comment thread
rauanmayemir marked this conversation as resolved.
}
}
72 changes: 72 additions & 0 deletions src/ResponseTrailers.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
<?php

declare(strict_types=1);

namespace Spiral\RoadRunner\GRPC;

use Spiral\RoadRunner\GRPC\Internal\Json;

/**
* @psalm-type THeaderKey = non-empty-string
* @psalm-type THeaderValue = string
* @implements \IteratorAggregate<THeaderKey, string>
*/
final class ResponseTrailers implements \IteratorAggregate, \Countable
{
/**
* @var array<THeaderKey, THeaderValue>
*/
private array $trailers = [];

/**
* @param iterable<THeaderKey, THeaderValue> $trailers
*/
public function __construct(iterable $trailers = [])
{
foreach ($trailers as $key => $value) {
$this->set($key, $value);
}
}

/**
* @param THeaderKey $key
* @param THeaderValue $value
*/
public function set(string $key, string $value): void
{
$this->trailers[$key] = $value;
}

/**
* @param THeaderKey $key
* @return THeaderValue|null
*/
public function get(string $key, ?string $default = null): ?string
{
return $this->trailers[$key] ?? $default;
}

public function getIterator(): \Traversable
{
return new \ArrayIterator($this->trailers);
}

public function count(): int
{
return \count($this->trailers);
}

/**
* @throws \JsonException
*/
public function packTrailers(): string
{
// If an empty array is serialized, it is cast to the string "[]"
// instead of object string "{}"
if ($this->trailers === []) {
return '{}';
}

return Json::encode($this->trailers);
}
}
83 changes: 40 additions & 43 deletions src/Server.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
use Spiral\RoadRunner\GRPC\Exception\GRPCExceptionInterface;
use Spiral\RoadRunner\GRPC\Exception\NotFoundException;
use Spiral\RoadRunner\GRPC\Exception\ServiceException;
use Spiral\RoadRunner\GRPC\Internal\CallContext;
use Spiral\RoadRunner\GRPC\Internal\Json;
use Spiral\RoadRunner\Payload;
use Spiral\RoadRunner\Worker;
Expand All @@ -21,12 +22,6 @@
* @psalm-type ServerOptions = array{
* debug?: bool
* }
*
* @psalm-type ContextResponse = array{
* service: class-string<ServiceInterface>,
* method: non-empty-string,
* context: array<string, array<string>>
* }
*/
final class Server
{
Expand Down Expand Up @@ -77,15 +72,43 @@ public function serve(?WorkerInterface $worker = null, ?callable $finalize = nul
return;
}

try {
/** @var ContextResponse $context */
$context = Json::decode($request->header);
$responseHeaders = new ResponseHeaders();
$responseTrailers = new ResponseTrailers();

[$answerBody, $answerHeaders] = $this->tick($request->body, $context);

$this->workerSend($worker, $answerBody, $answerHeaders);
try {
$call = CallContext::decode($request->header);

$context = new Context(\array_merge(
$call->context,
[
ResponseHeaders::class => $responseHeaders,
ResponseTrailers::class => $responseTrailers,
],
));

$response = $this->invoke($call->service, $call->method, $context, $request->body);

$headers = [];
$responseHeaders->count() === 0 or $headers['headers'] = $responseHeaders->packHeaders();
$responseTrailers->count() === 0 or $headers['trailers'] = $responseTrailers->packTrailers();

$this->workerSend(
worker: $worker,
body: $response,
headers: $headers === [] ? '{}' : Json::encode($headers),
);
} catch (GRPCExceptionInterface $e) {
$this->workerGrpcError($worker, $e);
$headers = [
'error' => $this->createGrpcError($e),
];
$responseHeaders->count() === 0 or $headers['headers'] = $responseHeaders->packHeaders();
$responseTrailers->count() === 0 or $headers['trailers'] = $responseTrailers->packTrailers();

$this->workerSend(
worker: $worker,
body: '',
headers: Json::encode($headers),
);
} catch (\Throwable $e) {
$this->workerError($worker, $this->isDebugMode() ? (string) $e : $e->getMessage());
} finally {
Expand All @@ -112,24 +135,9 @@ protected function invoke(string $service, string $method, ContextInterface $con
return $this->services[$service]->invoke($method, $context, $body);
}

/**
* @param ContextResponse $data
* @return array{0: string, 1: string}
* @throws \JsonException
* @throws \Throwable
*/
private function tick(string $body, array $data): array
private function workerError(WorkerInterface $worker, string $message): void
{
$context = (new Context($data['context']))
->withValue(ResponseHeaders::class, new ResponseHeaders());

$response = $this->invoke($data['service'], $data['method'], $context, $body);

/** @var ResponseHeaders|null $responseHeaders */
$responseHeaders = $context->getValue(ResponseHeaders::class);
$responseHeadersString = $responseHeaders ? $responseHeaders->packHeaders() : '{}';

return [$response, $responseHeadersString];
$worker->error($message);
}

/**
Expand All @@ -140,12 +148,7 @@ private function workerSend(WorkerInterface $worker, string $body, string $heade
$worker->respond(new Payload($body, $headers));
}

private function workerError(WorkerInterface $worker, string $message): void
{
$worker->error($message);
}

private function workerGrpcError(WorkerInterface $worker, GRPCExceptionInterface $e): void
private function createGrpcError(GRPCExceptionInterface $e): string
{
$status = new Status([
'code' => $e->getCode(),
Expand All @@ -161,13 +164,7 @@ static function ($detail) {
),
]);

$this->workerSend(
$worker,
'',
Json::encode([
'error' => \base64_encode($status->serializeToString()),
]),
);
return \base64_encode($status->serializeToString());
}

/**
Expand Down
21 changes: 21 additions & 0 deletions tests/ContextTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
use PHPUnit\Framework\TestCase;
use Spiral\RoadRunner\GRPC\Context;
use Spiral\RoadRunner\GRPC\ResponseHeaders;
use Spiral\RoadRunner\GRPC\ResponseTrailers;

class ContextTest extends TestCase
{
Expand Down Expand Up @@ -75,4 +76,24 @@ public function testGetOutgoingHeaders(): void
$ctx = new Context([ResponseHeaders::class => $outgoingHeaders]);
$this->assertSame($outgoingHeaders, $ctx->getValue(ResponseHeaders::class));
}

public function testGetOutgoingTrailer(): void
{
$outgoingTrailers = [
'X-Some-Trailer' => 'foobar',
];
$ctx = new Context([ResponseTrailers::class => new ResponseTrailers($outgoingTrailers)]);

$this->assertSame($outgoingTrailers['X-Some-Trailer'], $ctx->getValue(ResponseTrailers::class)->get('X-Some-Trailer'));
$this->assertNull($ctx->getValue(ResponseTrailers::class)->get('not-existing'));
}

public function testGetOutgoingTrailers(): void
{
$outgoingTrailers = new ResponseTrailers([
'X-Some-Trailer' => 'foobar',
]);
$ctx = new Context([ResponseTrailers::class => $outgoingTrailers]);
$this->assertSame($outgoingTrailers, $ctx->getValue(ResponseTrailers::class));
}
}
37 changes: 36 additions & 1 deletion tests/ServerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,18 @@
use Google\Rpc\Status;
use Mockery as m;
use PHPUnit\Framework\TestCase;
use Service\DetailsMessageForException;
use Service\Message;
use Service\TestInterface;
use Spiral\Goridge\Frame;
use Spiral\Goridge\RelayInterface;
use Spiral\RoadRunner\GRPC\Exception\ServiceException;
use Spiral\RoadRunner\GRPC\InvokerInterface;
use Spiral\RoadRunner\GRPC\Server;
use Spiral\RoadRunner\GRPC\Tests\Stub\TestService;
use Spiral\RoadRunner\Payload;
use Spiral\RoadRunner\Worker;
use Spiral\RoadRunner\WorkerInterface;
use Spiral\RoadRunner\GRPC\InvokerInterface;

class ServerTest extends TestCase
{
Expand Down Expand Up @@ -143,6 +144,40 @@ public function testExceptionDetails(): void
$server->serve($worker);
}

public function testInvokeGrpcException(): void
{
$worker = m::mock(WorkerInterface::class);
$worker->shouldReceive('waitPayload')
->times(2)
->andReturn(
new Payload(
body: $this->packMessage('withDetailsAndHeaders'),
header: '{"context": {}, "service": "service.Test", "method": "Throw"}',
),
null,
);

$worker->shouldReceive('respond')->once()
->withArgs(static function (Payload $payload) {
$header = \json_decode($payload->header, true);

$status = new Status();
$status->mergeFromString(\base64_decode($header['error']));
/** @var DetailsMessageForException $message */
$message = $status->getDetails()->offsetGet(0)->unpack();

$outgoingHeaders = \json_decode($header['headers'], true);
$outgoingTrailers = \json_decode($header['trailers'], true);

return $message instanceof DetailsMessageForException
&& $message->getMessage() === 'details message'
&& $outgoingHeaders === ['foo' => 'bar']
&& $outgoingTrailers === ['baz' => 'bar'];
});

$this->server->serve($worker);
}

protected function setUp(): void
{
parent::setUp();
Expand Down
12 changes: 12 additions & 0 deletions tests/Stub/TestService.php
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,17 @@ public function Throw(ContextInterface $ctx, Message $in): Message

$grpcException = new GRPCException("main exception message", 3, [$detailsMessage]);

throw $grpcException;
case "withDetailsAndHeaders":
$ctx->getValue(GRPC\ResponseHeaders::class)->set('foo', 'bar');
$ctx->getValue(GRPC\ResponseTrailers::class)->set('baz', 'bar');

$detailsMessage = new DetailsMessageForException();
$detailsMessage->setCode(1);
$detailsMessage->setMessage("details message");

$grpcException = new GRPCException("main exception message", 3, [$detailsMessage]);

throw $grpcException;
case "regularException":
{
Expand Down Expand Up @@ -70,6 +81,7 @@ public function Info(ContextInterface $ctx, Message $in): Message
}

$ctx->getValue(GRPC\ResponseHeaders::class)->set('foo', 'bar');
$ctx->getValue(GRPC\ResponseTrailers::class)->set('baz', 'bar');

return $out;
}
Expand Down