Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Add open() API to UTransport
Some transports require initialization that should not be done inside of the constructor so we add the open() API that can be implemented to do the async initialization of the underlining transport.

#146
  • Loading branch information
czfdcn committed Jul 12, 2024
commit a5c2472c7dfec800d78252a6d94f9601430390f9
40 changes: 26 additions & 14 deletions src/main/java/org/eclipse/uprotocol/transport/UTransport.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,19 @@
package org.eclipse.uprotocol.transport;

import java.util.concurrent.CompletionStage;
import java.util.concurrent.CompletableFuture;

import org.eclipse.uprotocol.v1.UCode;
import org.eclipse.uprotocol.v1.UMessage;
import org.eclipse.uprotocol.v1.UStatus;
import org.eclipse.uprotocol.v1.UUri;

/**
* UTransport is the uP-L1 interface that provides a common API for uE
* developers to send and receive messages.
* UTransport implementations contain the details for connecting to the
* underlying transport technology and
* sending UMessage using the configured technology. For more information please
* refer to
* UTransport is the uP-L1 interface that provides a common API for uE developers to send and receive messages.
* UTransport implementations contain the details for connecting to the underlying transport technology and
* sending UMessage using the configured technology. For more information please refer to
* https://github.com/eclipse-uprotocol/up-spec/blob/main/up-l1/README.adoc.
*/

public interface UTransport {

/**
Expand All @@ -45,6 +42,7 @@ public interface UTransport {
*/
CompletionStage<UStatus> send(UMessage message);


/**
* Register {@code UListener} for {@code UUri} source filters to be called when
* a message is received.
Expand All @@ -62,6 +60,7 @@ default CompletionStage<UStatus> registerListener(UUri sourceFilter, UListener l
return registerListener(sourceFilter, null, listener);
}


/**
* Register {@code UListener} for {@code UUri} source and sink filters to be
* called when a message is received.
Expand All @@ -79,10 +78,10 @@ default CompletionStage<UStatus> registerListener(UUri sourceFilter, UListener l
*/
CompletionStage<UStatus> registerListener(UUri sourceFilter, UUri sinkFilter, UListener listener);


/**
* Unregister {@code UListener} for {@code UUri} source filters. Messages
* arriving on this topic will
* no longer be processed by this listener.
* arriving on this topic will no longer be processed by this listener.
*
* @param sourceFilter The UAttributes::source address pattern that the message
* to receive needs to match.
Expand All @@ -97,10 +96,10 @@ default CompletionStage<UStatus> unregisterListener(UUri sourceFilter, UListener
return unregisterListener(sourceFilter, null, listener);
}


/**
* Unregister {@code UListener} for {@code UUri} source and sink filters.
* Messages arriving on this topic will
* no longer be processed by this listener.
* Messages arriving on this topic will no longer be processed by this listener.
*
* @param sourceFilter The UAttributes::source address pattern that the message
* to receive needs to match.
Expand All @@ -115,18 +114,31 @@ default CompletionStage<UStatus> unregisterListener(UUri sourceFilter, UListener
*/
CompletionStage<UStatus> unregisterListener(UUri sourceFilter, UUri sinkFilter, UListener listener);


/**
* Return the source address for the uE (authority, entity, and resource
* information)
* Return the source address of the uE.
* The Source address is passed to the constructor of a given transport
*
* @return UUri containing the source address
*/
UUri getSource();


/**
* Open the connection to the transport that will trigger any registered listeners
* to be registered.
*
* @return Returns {@link UStatus} with {@link UCode.OK} if the connection is
* opened correctly, otherwise it returns with the appropriate failure.
*/
default CompletionStage<UStatus> open() {
return CompletableFuture.completedFuture(UStatus.newBuilder().setCode(UCode.OK).build());
}


/**
* Close the connection to the transport that will trigger any registered listeners
* to be unregistered.
*/
void close();
default void close() { }
}
37 changes: 37 additions & 0 deletions src/test/java/org/eclipse/uprotocol/transport/UTransportTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CompletableFuture;

import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import org.junit.jupiter.api.DisplayName;
Expand Down Expand Up @@ -84,6 +85,42 @@ public void test_unhappy_register_unlistener() {
assertEquals(result.toCompletableFuture().join().getCode(), UCode.INTERNAL);
}

@Test
@DisplayName("Test happy path calling open() API")
public void test_happy_open() {
UTransport transport = new HappyUTransport();
assertEquals(transport.open().toCompletableFuture().join().getCode(), UCode.OK);
}

@Test
@DisplayName("Test default oepn() and close() APIs")
public void test_default_open_close() {
UTransport transport = new UTransport() {
@Override
public CompletionStage<UStatus> send(UMessage message) {
return CompletableFuture.completedFuture(UStatus.newBuilder().setCode(UCode.OK).build());
}

@Override
public CompletionStage<UStatus> registerListener(UUri source, UUri sink, UListener listener) {
return CompletableFuture.completedFuture(UStatus.newBuilder().setCode(UCode.OK).build());
}

@Override
public CompletionStage<UStatus> unregisterListener(UUri source, UUri sink, UListener listener) {
return CompletableFuture.completedFuture(UStatus.newBuilder().setCode(UCode.OK).build());
}

@Override
public UUri getSource() {
return UUri.getDefaultInstance();
}
};

assertDoesNotThrow(() -> transport.close());
}


class MyListener implements UListener {
@Override
public void onReceive(UMessage message) {}
Expand Down