[feat] Support messages with generic types#149
Conversation
### Motivation
Pulsar C++ client doesn't support schema yet. It only supports
configuring `SchemaInfo` when creating producers or consumers. The main
reason is that C++'s templates are processed at the time of compilation.
Templatizing `Producer`, `Consumer` or `Message` could expose all
internal code.
Currently, users might write the following code for serialization and
deserialization:
```c++
producer.send(MessageBuilder().setContent(encode(value)).build());
```
```c++
Message msg;
consumer.receive(msg);
auto value = decode(msg.getData(), msg.getSize());
```
However, the `encode` and `decode` functions are just possible
solutions from users, they might use some other interfaces like a class
with two virtual methods. There is no way to provide a common interface
for serialization and deserialization.
### Modifications
Add a `TypedMessageBuilder<T>` class template that accepts an encoder
function and a validation function. The validation function is used when
users want to simulate the Java client's `AUTO_PRODUCE` schema. Define a
full specialization for `std::string` template argument to avoid
encoding. Since it inherits the `MessageBuilder`, it's compatible with
the current code style:
```c++
// It should be noted you have to call `setValue` before methods in base class
auto msg = TypedMessageBuilder<T>(encoder).setValue(value).setPartitionKey(key).build();
```
Add a `TypedMessage<T>` class that only adds a decoder to the `Message`
instance and can be converted from the `Message` directly:
```c++
auto typedMsg = TypedMessage<T>(msg, decoder);
std::cout << typedMsg.getValue() << std::endl; // decode the bytes
std::cout << typedMsg.getMessageId() << std::endl; // call methods from Message
```
For convenience, the following APIs are added to `Consumer`:
```c++
template <typename T>
Result receive(TypedMessage<T>& msg, typename TypedMessage<T>::Decoder decoder);
template <typename T>
Result receive(TypedMessage<T>& msg, int timeoutMs, typename TypedMessage<T>::Decoder decoder);
template <typename T>
void receiveAsync(std::function<void(Result result, const TypedMessage<T>&)> callback,
typename TypedMessage<T>::Decoder decoder)
```
The `ConsumerConfiguration` can configure a listener that accepts a
`TypedMessage<T>` now:
```c++
template <typename T>
ConsumerConfiguration& setTypedMessageListener(
std::function<void(Consumer&, const TypedMessage<T>&)> listener,
typename TypedMessage<T>::Decoder decoder);
```
Since it calls the original listener and `Consumer` is only forward
declared in `ConsumerConfiguration.h`, the 1st argument is changed from
`Consumer` to `Consumer&`. It's an API change but it's backward
compatible because `std::function<void(Consumer, ...)>` can be cast to
`std::function<Consumer&, ...)>` implicitly.
Based on these API changes, we can write a separated header-only C++
library as the schema extension.
1984f94 to
2bb7a7b
Compare
|
Hi @shibd, I changed the tests and APIs in the latest code, PTAL again. |
|
@BewareMyPower Thanks for your PR, This PR can make it easier for users to serialize and deserialize messages, and it makes a lot of sense. I'm trying to diverge my ideas. Can I use it in combination with Take the Producer side, for example:
template <typename T>
class TypedMessageBuilder : public MessageBuilder {
// ... Omit existing code
SchemaInfo getSchemaInfo() {
// Two methods to generate:
// 1. Generated according to Template T?
// 2. The user passes it in by himself?
}
private:
const Encoder encoder_;
const Validator validator_;
};
(1) set TypedMessageBuilder when creating a producer. auto typeMsg = TypedMessageBuilder<int>{intEncoder}
// The SchemaInfo will be obtained and set according to TypedMessageBuilder
client.createProducer(topic, typeMsg, producer);
(2) The template <typename T>
class Producer {
// ....
/**
*
*/
Result send(const T& msg) {
Message msg = typeMessageBuilder.value(msg);
return send(msg);
}
}
Complete users-side code auto typeMsg = TypedMessageBuilder<int>{intEncoder}
client.createProducer(topic, typeMsg, producer);
producer.send(100); |
|
@shibd See the point I mentioned in this PR:
You can try implementing the templated producer yourself. Then you might know better what I meant. BTW, struct Student {
int age;
std::string name;
};
class StudentSchema {
public:
StudentSchema() {}
SchemaInfo getSchemaInfo() const { return schemaInfo_; }
std::string operator()(const Student& student) const {
// TODO: serialize student using the schemaInfo_ and Avro C++ client
return R"("age": )" + std::to_string(student.age) + R"(, "name": ")" + student.name + R"(")";
}
private:
SchemaInfo schemaInfo_{SchemaType::AVRO, "<name>", "<avro-string>"};
};Then, users can write a producer like: StudentSchema schema;
Client client(lookupUrl);
Producer producer;
ProducerConfiguration conf;
conf.setSchema(schema.getSchemaInfo());
client.createProducer("my-topic", conf, producer);
// NOTE: schema is a StudentSchema, which could be treated as
// a std::function<std::string(const Student&)>
producer.send(TypedMessageBuilder<Student>{schema}.setValue(Student{10, "xyz"}).build()); |
|
The |
This is a good example, it shows how to use it with |
Motivation
Pulsar C++ client doesn't support schema yet. It only supports configuring
SchemaInfowhen creating producers or consumers. The main reason is that C++'s templates are processed at the time of compilation. TemplatizingProducer,ConsumerorMessagecould expose all internal code.Currently, users might write the following code for serialization and deserialization:
Message msg; consumer.receive(msg); auto value = decode(msg.getData(), msg.getSize());However, the
encodeanddecodefunctions are just possible solutions from users, they might use some other interfaces like a class with two virtual methods. There is no way to provide a common interface for serialization and deserialization.Modifications
Add a
TypedMessageBuilder<T>class template that accepts an encoder function and a validation function. The validation function is used when users want to simulate the Java client'sAUTO_PRODUCEschema. Define a full specialization forstd::stringtemplate argument to avoid encoding. Since it inherits theMessageBuilder, it's compatible with the current code style:Add a
TypedMessage<T>class that only adds a decoder to theMessageinstance and can be converted from theMessagedirectly:For convenience, the following APIs are added to
Consumer:The
ConsumerConfigurationcan configure a listener that accepts aTypedMessage<T>now:Since it calls the original listener and
Consumeris only forward declared inConsumerConfiguration.h, the 1st argument is changed fromConsumertoConsumer&. It's an API change but it's backward compatible becausestd::function<void(Consumer, ...)>can be cast tostd::function<Consumer&, ...)>implicitly.Based on these API changes, we can write a separated header-only C++ library as the schema extension.
Documentation
doc-required(Your PR needs to update docs and you will update later)
doc-not-needed(Please explain why)
doc(Your PR contains doc changes)
doc-complete(Docs have been already added)