Skip to content
Prev Previous commit
Next Next commit
refactor: refine queue
  • Loading branch information
RobertIndie committed Feb 5, 2024
commit eea3d11a29ef234e46a4500421d0b8a3e8fe0bee
13 changes: 11 additions & 2 deletions lib/event_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,20 @@ type Event interface {
Ack()
}

type QueueConfig struct {
type SourceQueueConfig struct {
Topics []string
}

type EventQueueFactory func(ctx context.Context, config *QueueConfig, function *model.Function) (EventQueue, error)
type SinkQueueConfig struct {
Topic string
}

//type EventQueueFactory func(ctx context.Context, config *QueueConfig, function *model.Function) (EventQueue, error)

type EventQueueFactory interface {
NewSourceChan(ctx context.Context, config *SourceQueueConfig, function *model.Function) (<-chan Event, error)
NewSinkChan(ctx context.Context, config *SinkQueueConfig, function *model.Function) (chan<- Event, error)
}

type EventQueue interface {
GetSendChan() (chan<- Event, error)
Expand Down
45 changes: 16 additions & 29 deletions lib/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ import (
)

type FunctionInstance struct {
ctx context.Context
cancelFunc context.CancelFunc
definition *model.Function
newQueue EventQueueFactory
readyCh chan error
index int32
ctx context.Context
cancelFunc context.CancelFunc
definition *model.Function
queueFactory EventQueueFactory
readyCh chan error
index int32
}

func NewFunctionInstance(definition *model.Function, queueFactory EventQueueFactory, index int32) *FunctionInstance {
Expand All @@ -44,12 +44,12 @@ func NewFunctionInstance(definition *model.Function, queueFactory EventQueueFact
"function-index": index,
})
return &FunctionInstance{
ctx: ctx,
cancelFunc: cancelFunc,
definition: definition,
newQueue: queueFactory,
readyCh: make(chan error),
index: index,
ctx: ctx,
cancelFunc: cancelFunc,
definition: definition,
queueFactory: queueFactory,
readyCh: make(chan error),
index: index,
}
}

Expand Down Expand Up @@ -108,40 +108,27 @@ func (instance *FunctionInstance) Run() {
return
}

sourceQ, err := instance.newQueue(instance.ctx, &QueueConfig{Topics: instance.definition.Inputs}, instance.definition)
sourceChan, err := instance.queueFactory.NewSourceChan(instance.ctx, &SourceQueueConfig{Topics: instance.definition.Inputs}, instance.definition)
if err != nil {
instance.readyCh <- errors.Wrap(err, "Error creating source event queue")
return
}

recvChan, err := sourceQ.GetRecvChan()
if err != nil {
instance.readyCh <- errors.Wrap(err, "Error getting source channel")
return
}

sinkQ, err := instance.newQueue(instance.ctx, &QueueConfig{Topics: []string{instance.definition.Output}}, instance.definition)
sinkChan, err := instance.queueFactory.NewSinkChan(instance.ctx, &SinkQueueConfig{Topic: instance.definition.Output}, instance.definition)
if err != nil {
instance.readyCh <- errors.Wrap(err, "Error creating sink event queue")
return
}

sendChan, err := sinkQ.GetSendChan()
if err != nil {
instance.readyCh <- errors.Wrap(err, "Error getting sink channel")
return
}

instance.readyCh <- nil
for e := range recvChan {
for e := range sourceChan {
stdin.ResetBuffer(e.GetPayload())
_, err = process.Call(instance.ctx)
if err != nil {
handleErr(instance.ctx, err, "Error calling process function")
return
}
output := stdout.GetAndReset()
sendChan <- NewAckableEvent(output, e.Ack)
sinkChan <- NewAckableEvent(output, e.Ack)
}
}

Expand Down
15 changes: 0 additions & 15 deletions lib/memory_queue.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,4 @@
package lib

type MemoryQueueFactor struct {
}

type MemoryEvent struct {
payload []byte
}

func (e *MemoryEvent) GetPayload() []byte {
return e.payload
}

func NewMemoryQueueFactory() (*MemoryQueueFactor, error) {
return &MemoryQueueFactor{}, nil
}

type MemoryQueue struct {
}
147 changes: 66 additions & 81 deletions lib/pulsar_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,22 @@ import (
"github.com/functionstream/functionstream/common/model"
"github.com/pkg/errors"
"log/slog"
"sync"
)

func NewPulsarEventQueueFactory(ctx context.Context, config *Config) (func(ctx context.Context, config *QueueConfig, f *model.Function) (EventQueue, error), error) {
type PulsarEventQueueFactory struct {
newSourceChan func(ctx context.Context, config *SourceQueueConfig, f *model.Function) (<-chan Event, error)
newSinkChan func(ctx context.Context, config *SinkQueueConfig, f *model.Function) (chan<- Event, error)
}

func (f *PulsarEventQueueFactory) NewSourceChan(ctx context.Context, config *SourceQueueConfig, function *model.Function) (<-chan Event, error) {
return f.newSourceChan(ctx, config, function)
}

func (f *PulsarEventQueueFactory) NewSinkChan(ctx context.Context, config *SinkQueueConfig, function *model.Function) (chan<- Event, error) {
return f.newSinkChan(ctx, config, function)
}

func NewPulsarEventQueueFactory(ctx context.Context, config *Config) (EventQueueFactory, error) {
pc, err := pulsar.NewClient(pulsar.ClientOptions{
URL: config.PulsarURL,
})
Expand All @@ -23,88 +35,61 @@ func NewPulsarEventQueueFactory(ctx context.Context, config *Config) (func(ctx c
pc.Close()
}
}()
return func(ctx context.Context, config *QueueConfig, f *model.Function) (EventQueue, error) {
handleErr := func(ctx context.Context, err error, message string, args ...interface{}) {
if errors.Is(err, context.Canceled) {
slog.InfoContext(ctx, "function instance has been stopped")
return
}
slog.ErrorContext(ctx, message, args...)
handleErr := func(ctx context.Context, err error, message string, args ...interface{}) {
if errors.Is(err, context.Canceled) {
slog.InfoContext(ctx, "function instance has been stopped")
return
}
initRecvChan := &sync.Once{}
initSendChan := &sync.Once{}
return &PulsarEventQueue{
getRecvChan: func() (c <-chan Event, e error) {
var recvChan chan Event
initRecvChan.Do(func() {
recvChan = make(chan Event)
consumer, err := pc.Subscribe(pulsar.ConsumerOptions{
Topics: config.Topics,
SubscriptionName: fmt.Sprintf("function-stream-%s", f.Name),
Type: pulsar.Failover,
})
if err != nil {
e = errors.Wrap(err, "Error creating consumer")
return
}
go func() {
defer consumer.Close()
for msg := range consumer.Chan() {
recvChan <- NewAckableEvent(msg.Payload(), func() {
err := consumer.Ack(msg)
if err != nil {
handleErr(ctx, err, "Error acknowledging message", "error", err)
return
}
})
slog.ErrorContext(ctx, message, args...)
}
return &PulsarEventQueueFactory{
newSourceChan: func(ctx context.Context, config *SourceQueueConfig, f *model.Function) (<-chan Event, error) {
c := make(chan Event)
consumer, err := pc.Subscribe(pulsar.ConsumerOptions{
Topics: config.Topics,
SubscriptionName: fmt.Sprintf("function-stream-%s", f.Name),
Type: pulsar.Failover,
})
if err != nil {
return nil, errors.Wrap(err, "Error creating consumer")
}
go func() {
defer consumer.Close()
for msg := range consumer.Chan() {
c <- NewAckableEvent(msg.Payload(), func() {
err := consumer.Ack(msg)
if err != nil {
handleErr(ctx, err, "Error acknowledging message", "error", err)
return
}
}()
})
return recvChan, err
},
getSendChan: func() (c chan<- Event, e error) {
var sendChan chan Event
initSendChan.Do(func() {
sendChan = make(chan Event)
if len(config.Topics) > 1 {
e = errors.New("Pulsar sink queue only supports one output topic")
return
}
producer, err := pc.CreateProducer(pulsar.ProducerOptions{
Topic: config.Topics[0],
})
if err != nil {
e = errors.Wrap(err, "Error creating producer")
}
go func() {
for e := range sendChan {
producer.SendAsync(ctx, &pulsar.ProducerMessage{
Payload: e.GetPayload(),
}, func(id pulsar.MessageID, message *pulsar.ProducerMessage, err error) {
if err != nil {
handleErr(ctx, err, "Error sending message", "error", err, "messageId", id)
return
}
e.Ack()
})
}
}()
return c, nil
},
newSinkChan: func(ctx context.Context, config *SinkQueueConfig, f *model.Function) (chan<- Event, error) {
c := make(chan Event)
producer, err := pc.CreateProducer(pulsar.ProducerOptions{
Topic: config.Topic,
})
if err != nil {
return nil, errors.Wrap(err, "Error creating producer")
}
go func() {
defer producer.Close()
for e := range c {
producer.SendAsync(ctx, &pulsar.ProducerMessage{
Payload: e.GetPayload(),
}, func(id pulsar.MessageID, message *pulsar.ProducerMessage, err error) {
if err != nil {
handleErr(ctx, err, "Error sending message", "error", err, "messageId", id)
return
}
}()
})
return sendChan, nil
},
}, nil
e.Ack()
})
}
}()
return c, nil
},
}, nil
}

type PulsarEventQueue struct {
getRecvChan func() (<-chan Event, error)
getSendChan func() (chan<- Event, error)
}

func (q *PulsarEventQueue) GetSendChan() (c chan<- Event, e error) {
return q.getSendChan()
}

func (q *PulsarEventQueue) GetRecvChan() (c <-chan Event, err error) {
return q.getRecvChan()
}