Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions tests/test_communication/test_simplenotifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,13 @@ def create_destination_uri(self):

async def test_send_notification(self):
notifier = SimpleNotifier(MockUTransport())
status = await notifier.notify(self.create_topic(), self.create_destination_uri(), None)
status = await notifier.notify(self.create_topic(), self.create_destination_uri())
self.assertEqual(status.code, UCode.OK)

async def test_send_notification_with_payload(self):
uri = UUri(authority_name="Neelam")
notifier = SimpleNotifier(MockUTransport())
status = await notifier.notify(self.create_topic(), self.create_destination_uri(), UPayload.pack(uri))
status = await notifier.notify(self.create_topic(), self.create_destination_uri(), payload=UPayload.pack(uri))
self.assertEqual(status.code, UCode.OK)

async def test_register_listener(self):
Expand Down
6 changes: 3 additions & 3 deletions tests/test_communication/test_simplepublisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@ def create_topic(self):

async def test_send_publish(self):
publisher = SimplePublisher(MockUTransport())
status = await publisher.publish(self.create_topic(), None)
status = await publisher.publish(self.create_topic())
self.assertEqual(status.code, UCode.OK)

async def test_send_publish_with_stuffed_payload(self):
uri = UUri(authority_name="Neelam")
publisher = SimplePublisher(MockUTransport())
status = await publisher.publish(self.create_topic(), UPayload.pack_to_any(uri))
status = await publisher.publish(self.create_topic(), payload=UPayload.pack_to_any(uri))
self.assertEqual(status.code, UCode.OK)

def test_constructor_transport_none(self):
Expand All @@ -52,7 +52,7 @@ async def test_publish_topic_none(self):
uri = UUri(authority_name="Neelam")

with self.assertRaises(ValueError) as context:
await publisher.publish(None, UPayload.pack_to_any(uri))
await publisher.publish(None, payload=UPayload.pack_to_any(uri))
self.assertEqual(str(context.exception), "Publish topic missing")


Expand Down
27 changes: 17 additions & 10 deletions tests/test_communication/test_uclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,14 @@ def test_create_upclient_with_null_transport(self):
UClient(None)

async def test_send_notification(self):
status = await UClient(MockUTransport()).notify(create_topic(), create_destination_uri(), None)
status = await UClient(MockUTransport()).notify(create_topic(), create_destination_uri())
self.assertEqual(status.code, UCode.OK)

async def test_send_notification_with_payload(self):
uri = UUri(authority_name="neelam")
status = await UClient(MockUTransport()).notify(create_topic(), create_destination_uri(), UPayload.pack(uri))
status = await UClient(MockUTransport()).notify(
create_topic(), create_destination_uri(), payload=UPayload.pack(uri)
)
self.assertEqual(status.code, UCode.OK)

async def test_register_listener(self):
Expand Down Expand Up @@ -85,19 +87,24 @@ async def test_unregister_listener_not_registered(self):
self.assertEqual(status.code, UCode.INVALID_ARGUMENT)

async def test_send_publish(self):
status = await UClient(MockUTransport()).publish(create_topic(), None)
status = await UClient(MockUTransport()).publish(create_topic())
self.assertEqual(status.code, UCode.OK)

async def test_send_publish_with_stuffed_payload(self):
uri = UUri(authority_name="neelam")
status = await UClient(MockUTransport()).publish(create_topic(), UPayload.pack_to_any(uri))
status = await UClient(MockUTransport()).publish(create_topic(), payload=UPayload.pack_to_any(uri))
self.assertEqual(status.code, UCode.OK)

async def test_send_publish_with_stuffed_payload_and_calloptions(self):
uri = UUri(authority_name="neelam")
status = await UClient(MockUTransport()).publish(
create_topic(), CallOptions(token="134"), payload=UPayload.pack_to_any(uri)
)
self.assertEqual(status.code, UCode.OK)

async def test_invoke_method_with_payload(self):
payload = UPayload.pack_to_any(UUri())
future_result = asyncio.ensure_future(
UClient(MockUTransport()).invoke_method(create_method_uri(), payload, None)
)
future_result = asyncio.ensure_future(UClient(MockUTransport()).invoke_method(create_method_uri(), payload))
response = await future_result
self.assertIsNotNone(response)
self.assertFalse(future_result.exception())
Expand Down Expand Up @@ -132,10 +139,10 @@ async def test_invoke_method_with_multi_invoke_transport(self):
rpc_client = UClient(MockUTransport())
payload = UPayload.pack_to_any(UUri())

future_result1 = asyncio.ensure_future(rpc_client.invoke_method(create_method_uri(), payload, None))
future_result1 = asyncio.ensure_future(rpc_client.invoke_method(create_method_uri(), payload))
response = await future_result1
self.assertIsNotNone(response)
future_result2 = asyncio.ensure_future(rpc_client.invoke_method(create_method_uri(), payload, None))
future_result2 = asyncio.ensure_future(rpc_client.invoke_method(create_method_uri(), payload))
response2 = await future_result2

self.assertIsNotNone(response2)
Expand Down Expand Up @@ -195,7 +202,7 @@ async def test_request_handler_for_notification(self):
handler = create_autospec(RequestHandler, instance=True)

await client.register_request_handler(create_method_uri(), handler)
self.assertEqual(await client.notify(create_topic(), transport.get_source(), None), UStatus(code=UCode.OK))
self.assertEqual(await client.notify(create_topic(), transport.get_source()), UStatus(code=UCode.OK))


def create_topic():
Expand Down
9 changes: 7 additions & 2 deletions uprotocol/communication/notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
"""

from abc import ABC, abstractmethod
from typing import Optional

from uprotocol.communication.calloptions import CallOptions
from uprotocol.communication.upayload import UPayload
from uprotocol.transport.ulistener import UListener
from uprotocol.v1.uri_pb2 import UUri
Expand All @@ -29,12 +31,15 @@ class Notifier(ABC):
"""

@abstractmethod
async def notify(self, topic: UUri, destination: UUri, payload: UPayload) -> UStatus:
async def notify(
self, topic: UUri, destination: UUri, options: Optional[CallOptions] = None, payload: Optional[UPayload] = None
) -> UStatus:
"""
Send a notification to a given topic passing a payload.
Send a notification to a given topic.

:param topic: The topic to send the notification to.
:param destination: The destination to send the notification to.
:param options: Call options for the notification.
:param payload: The payload to send with the notification.
:return: Returns the UStatus with the status of the notification.
"""
Expand Down
9 changes: 7 additions & 2 deletions uprotocol/communication/publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
"""

from abc import ABC, abstractmethod
from typing import Optional

from uprotocol.communication.calloptions import CallOptions
from uprotocol.communication.upayload import UPayload
from uprotocol.v1.uri_pb2 import UUri
from uprotocol.v1.ustatus_pb2 import UStatus
Expand All @@ -30,11 +32,14 @@ class Publisher(ABC):
"""

@abstractmethod
async def publish(self, topic: UUri, payload: UPayload) -> UStatus:
async def publish(
self, topic: UUri, options: Optional[CallOptions] = None, payload: Optional[UPayload] = None
) -> UStatus:
"""
Publish a message to a topic passing UPayload as the payload.
Publish a message to a topic.

:param topic: The topic to publish to.
:param options: Call options for the publish.
:param payload: The UPayload to publish.
:return: An instance of UStatus indicating the status of the publish operation.
"""
Expand Down
10 changes: 9 additions & 1 deletion uprotocol/communication/simplenotifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

from typing import Optional

from uprotocol.communication.calloptions import CallOptions
from uprotocol.communication.notifier import Notifier
from uprotocol.communication.upayload import UPayload
from uprotocol.transport.builder.umessagebuilder import UMessageBuilder
Expand Down Expand Up @@ -45,16 +46,23 @@ def __init__(self, transport: UTransport):
raise ValueError(UTransport.TRANSPORT_NOT_INSTANCE_ERROR)
self.transport = transport

async def notify(self, topic: UUri, destination: UUri, payload: Optional[UPayload] = None) -> UStatus:
async def notify(
self, topic: UUri, destination: UUri, options: Optional[CallOptions] = None, payload: Optional[UPayload] = None
) -> UStatus:
"""
Send a notification to a given topic.

:param topic: The topic to send the notification to.
:param destination: The destination to send the notification to.
:param options: Call options for the notification.
:param payload: The payload to send with the notification.
:return: Returns the UStatus with the status of the notification.
"""
builder = UMessageBuilder.notification(topic, destination)
if options:
builder.with_priority(options.priority)
builder.with_ttl(options.timeout)
builder.with_token(options.token)
return await self.transport.send(builder.build() if payload is None else builder.build_from_upayload(payload))

async def register_notification_listener(self, topic: UUri, listener: UListener) -> UStatus:
Expand Down
16 changes: 13 additions & 3 deletions uprotocol/communication/simplepublisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
SPDX-License-Identifier: Apache-2.0
"""

from typing import Optional

from uprotocol.communication.calloptions import CallOptions
from uprotocol.communication.publisher import Publisher
from uprotocol.communication.upayload import UPayload
from uprotocol.transport.builder.umessagebuilder import UMessageBuilder
Expand All @@ -33,16 +36,23 @@ def __init__(self, transport: UTransport):
raise ValueError(UTransport.TRANSPORT_NOT_INSTANCE_ERROR)
self.transport = transport

async def publish(self, topic: UUri, payload: UPayload) -> UStatus:
async def publish(
self, topic: UUri, options: Optional[CallOptions] = None, payload: Optional[UPayload] = None
) -> UStatus:
"""
Publishes a message to a topic using the provided payload.

:param topic: The topic to publish the message to.
:param options: Call options for the publish.
:param payload: The payload to be published.
:return: An instance of UStatus indicating the status of the publish operation.
"""
if topic is None:
raise ValueError("Publish topic missing")

message = UMessageBuilder.publish(topic).build_from_upayload(payload)
return await self.transport.send(message)
builder = UMessageBuilder.publish(topic)
if options:
builder.with_priority(options.priority)
builder.with_ttl(options.timeout)
builder.with_token(options.token)
return await self.transport.send(builder.build_from_upayload(payload))
14 changes: 10 additions & 4 deletions uprotocol/communication/uclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,16 +108,19 @@ async def unregister_listener(self, topic: UUri, listener: UListener) -> UStatus
"""
return await self.subscriber.unregister_listener(topic, listener)

async def notify(self, topic: UUri, destination: UUri, payload: UPayload) -> UStatus:
async def notify(
self, topic: UUri, destination: UUri, options: Optional[CallOptions] = None, payload: Optional[UPayload] = None
) -> UStatus:
"""
Send a notification to a given topic.

:param topic: The topic to send the notification to.
:param destination: The destination to send the notification to.
:param options: Call options for the notification.
:param payload: The payload to send with the notification.
:return: Returns the UStatus with the status of the notification.
"""
return await self.notifier.notify(topic, destination, payload)
return await self.notifier.notify(topic, destination, options, payload)

async def register_notification_listener(self, topic: UUri, listener: UListener) -> UStatus:
"""
Expand All @@ -139,15 +142,18 @@ async def unregister_notification_listener(self, topic: UUri, listener: UListene
"""
return await self.notifier.unregister_notification_listener(topic, listener)

async def publish(self, topic: UUri, payload: UPayload) -> UStatus:
async def publish(
self, topic: UUri, options: Optional[CallOptions] = None, payload: Optional[UPayload] = None
) -> UStatus:
"""
Publishes a message to a topic using the provided payload.

:param topic: The topic to publish the message to.
:param options: Call options for the publish.
:param payload: The payload to be published.
:return: An instance of UStatus indicating the status of the publish operation.
"""
return await self.publisher.publish(topic, payload)
return await self.publisher.publish(topic, options, payload)

async def register_request_handler(self, method: UUri, handler):
"""
Expand Down