Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
feat: Add batch message sending and publishing
This commit introduces batch sending capabilities to the bus implementation and its transports.

New methods `sendBatch` (for commands) and `publishBatch` (for events) have been added to the `Transport` interface and implemented in the `BusInstance`.

- SqsTransport: Implements batching using SNS `PublishBatchCommand`. Messages are grouped by topic and sent in chunks of up to 10 to respect SNS limits.
- RabbitMqTransport: Throws a "not supported" error for batch operations, as the underlying transport does not natively support them.
- InMemoryQueue: Implements batching by iterating and calling the single send/publish methods, suitable for testing.

Unit tests have been added for the new methods in `BusInstance`.
Integration tests have been added to the `transportTests` suite to cover batch operations for all relevant transports, verifying both successful execution and correct error handling for unsupported transports.
  • Loading branch information
google-labs-jules[bot] committed May 25, 2025
commit badfb059366231868c988313ff190c511be362fd
254 changes: 254 additions & 0 deletions packages/bus-core/src/service-bus/bus-instance.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,254 @@
import { BusInstance } from './bus-instance'
import { Transport } from '../transport'
import {
Command,
Event,
MessageAttributes
} from '@node-ts/bus-messages'
import { CoreDependencies, MiddlewareDispatcher } from '../util'
import { HandlerRegistry } from '../handler'
import { WorkflowRegistry } from '../workflow/registry'
import { Logger } from '../logger'
import { ContainerAdapter } from '../container'
import { Receiver } from '../receiver'
import { messageHandlingContext }
from '../message-handling-context' // To test prepareTransportOptions behavior

// Mock external modules and dependencies
jest.mock('../workflow/registry')
jest.mock('../util/middleware-dispatcher')
jest.mock('../handler/handler-registry')
jest.mock('../logger')
// jest.mock('uuid', () => ({ v4: () => 'mock-uuid' })) // If generateUuid is directly used and needs mocking

// Define mock types explicitly
type MockTransport = jest.Mocked<Transport>
type MockLogger = jest.Mocked<Logger>
type MockCoreDependencies = jest.Mocked<CoreDependencies>
type MockWorkflowRegistry = jest.Mocked<WorkflowRegistry>
type MockMiddlewareDispatcher = jest.Mocked<MiddlewareDispatcher<any>>
type MockHandlerRegistry = jest.Mocked<HandlerRegistry>
type MockContainerAdapter = jest.Mocked<ContainerAdapter>
type MockReceiver = jest.Mocked<Receiver>


describe('BusInstance', () => {
let busInstance: BusInstance
let mockTransport: MockTransport
let mockLogger: MockLogger
let mockLoggerFactory: jest.Mock<MockLogger>
let mockCoreDependencies: MockCoreDependencies
let mockWorkflowRegistry: MockWorkflowRegistry
let mockMessageReadMiddleware: MockMiddlewareDispatcher
let mockHandlerRegistry: MockHandlerRegistry
let mockContainer: MockContainerAdapter | undefined
let mockReceiver: MockReceiver | undefined

const concurrency = 1
const sendOnly = false

// Sample messages and attributes for testing
class TestCommand implements Command {
$name = 'test-command'
$version = 1
constructor(public readonly id: string) {}
}

class TestEvent implements Event {
$name = 'test-event'
$version = 1
constructor(public readonly id: string) {}
}

beforeEach(() => {
mockTransport = {
connect: jest.fn(),
disconnect: jest.fn(),
publish: jest.fn(),
send: jest.fn(),
sendBatch: jest.fn().mockResolvedValue(undefined),
publishBatch: jest.fn().mockResolvedValue(undefined),
readNextMessage: jest.fn(),
deleteMessage: jest.fn(),
returnMessage: jest.fn(),
fail: jest.fn(),
initialize: jest.fn(),
dispose: jest.fn(),
start: jest.fn(),
stop: jest.fn(),
prepare: jest.fn()
}

mockLogger = {
debug: jest.fn(),
info: jest.fn(),
warn: jest.fn(),
error: jest.fn(),
fatal: jest.fn(),
trace: jest.fn()
}
mockLoggerFactory = jest.fn(() => mockLogger)

mockCoreDependencies = {
loggerFactory: mockLoggerFactory,
messageSerializer: {
serialize: jest.fn(message => JSON.stringify(message)),
deserialize: jest.fn(messageStr => JSON.parse(messageStr))
},
handlerRegistry: new HandlerRegistry(mockLoggerFactory) as any, // Actual instance or mock
interruptSignals: []
// ... other properties if needed by BusInstance constructor or prepareTransportOptions
} as unknown as MockCoreDependencies // Use unknown for partial mock

mockWorkflowRegistry =
new WorkflowRegistry(undefined, undefined) as MockWorkflowRegistry
mockMessageReadMiddleware =
new MiddlewareDispatcher<any>() as MockMiddlewareDispatcher
mockHandlerRegistry = new HandlerRegistry(mockLoggerFactory) as MockHandlerRegistry
mockContainer = undefined
mockReceiver = undefined


busInstance = new BusInstance(
mockTransport,
concurrency,
mockWorkflowRegistry,
mockCoreDependencies,
mockMessageReadMiddleware,
mockHandlerRegistry,
mockContainer,
sendOnly,
mockReceiver
)
})

describe('sendBatch', () => {
const commands: TestCommand[] = [
new TestCommand('1'),
new TestCommand('2')
]

it('should call transport.sendBatch with commands and attributes when attributes are provided', async () => {
const messageAttributes: Partial<MessageAttributes> = { correlationId: 'test-corid', attributes: { 'custom': 'value' } }
const expectedAttributesMatcher = expect.objectContaining({
correlationId: 'test-corid',
attributes: { 'custom': 'value' }
})

await busInstance.sendBatch(commands, messageAttributes)

expect(mockLogger.debug).toHaveBeenCalledWith(
'Sending command batch',
{ commands, messageAttributes }
)
expect(mockTransport.sendBatch).toHaveBeenCalledWith(
commands,
expectedAttributesMatcher
)
})

it('should call transport.sendBatch with commands and generated attributes when no attributes are provided', async () => {
// Mock generateUuid if not already done and if it's a direct dependency of prepareTransportOptions
// For this test, we'll rely on the fact that prepareTransportOptions generates a correlationId.
const expectedAttributesMatcher = expect.objectContaining({
correlationId: expect.any(String), // Should be generated
attributes: {} // Default
})

await busInstance.sendBatch(commands)

expect(mockLogger.debug).toHaveBeenCalledWith(
'Sending command batch',
{ commands, messageAttributes: {} } // Default is empty object
)
expect(mockTransport.sendBatch).toHaveBeenCalledWith(
commands,
expectedAttributesMatcher
)
})

it('should use correlationId from message handling context if available and no explicit correlationId is passed', async () => {
const contextCorrelationId = 'context-corid'
const handlingContext = {
message: {} as any,
attributes: { correlationId: contextCorrelationId, attributes: {}, stickyAttributes: {} }
}
const expectedAttributesMatcher = expect.objectContaining({
correlationId: contextCorrelationId
})

await messageHandlingContext.run(handlingContext, async () => {
await busInstance.sendBatch(commands, { attributes: { 'key': 'val'} });
});

expect(mockTransport.sendBatch).toHaveBeenCalledWith(
commands,
expectedAttributesMatcher
);
});
})

describe('publishBatch', () => {
const events: TestEvent[] = [
new TestEvent('ev1'),
new TestEvent('ev2')
]

it('should call transport.publishBatch with events and attributes when attributes are provided', async () => {
const messageAttributes: Partial<MessageAttributes> = { correlationId: 'test-corid-event', attributes: { 'eventProp': 'eventValue' } }
const expectedAttributesMatcher = expect.objectContaining({
correlationId: 'test-corid-event',
attributes: { 'eventProp': 'eventValue' }
})

await busInstance.publishBatch(events, messageAttributes)

expect(mockLogger.debug).toHaveBeenCalledWith(
'Publishing event batch',
{ events, messageAttributes }
)
expect(mockTransport.publishBatch).toHaveBeenCalledWith(
events,
expectedAttributesMatcher
)
})

it('should call transport.publishBatch with events and generated attributes when no attributes are provided', async () => {
const expectedAttributesMatcher = expect.objectContaining({
correlationId: expect.any(String), // Should be generated
attributes: {} // Default
})

await busInstance.publishBatch(events)

expect(mockLogger.debug).toHaveBeenCalledWith(
'Publishing event batch',
{ events, messageAttributes: {} } // Default is empty object
)
expect(mockTransport.publishBatch).toHaveBeenCalledWith(
events,
expectedAttributesMatcher
)
})

it('should use correlationId from message handling context if available and no explicit correlationId is passed for publish', async () => {
const contextCorrelationId = 'context-corid-publish'
const handlingContext = {
message: {} as any,
attributes: { correlationId: contextCorrelationId, attributes: {}, stickyAttributes: {} }
}
const expectedAttributesMatcher = expect.objectContaining({
correlationId: contextCorrelationId
})

await messageHandlingContext.run(handlingContext, async () => {
await busInstance.publishBatch(events, { attributes: { 'key': 'val'} });
});

expect(mockTransport.publishBatch).toHaveBeenCalledWith(
events,
expectedAttributesMatcher
);
});
})
})
32 changes: 32 additions & 0 deletions packages/bus-core/src/service-bus/bus-instance.ts
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,38 @@ export class BusInstance<TTransportMessage = {}> {
}
}

/**
* Sends a batch of commands to the transport
* @param commands An array of commands to send
* @param messageAttributes A set of attributes to attach to the outgoing message when sent
*/
async sendBatch<TCommand extends Command>(
commands: TCommand[],
messageAttributes: Partial<MessageAttributes> = {}
): Promise<void> {
this.logger.debug('Sending command batch', { commands, messageAttributes });
const attributes = this.prepareTransportOptions(messageAttributes);
// TODO: Consider outbox pattern for batch operations if required in future.
// For now, directly calling transport.
return this.transport.sendBatch(commands, attributes);
}

/**
* Publishes a batch of events to the transport
* @param events An array of events to publish
* @param messageAttributes A set of attributes to attach to the outgoing message when published
*/
async publishBatch<TEvent extends Event>(
events: TEvent[],
messageAttributes: Partial<MessageAttributes> = {}
): Promise<void> {
this.logger.debug('Publishing event batch', { events, messageAttributes });
const attributes = this.prepareTransportOptions(messageAttributes);
// TODO: Consider outbox pattern for batch operations if required in future.
// For now, directly calling transport.
return this.transport.publishBatch(events, attributes);
}

/**
* Instructs the bus that the current message being handled cannot be processed even with
* retries and instead should immediately be routed to the dead letter queue
Expand Down
20 changes: 20 additions & 0 deletions packages/bus-core/src/transport/in-memory-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,26 @@ export class InMemoryQueue implements Transport<InMemoryMessage> {
this.addToQueue(command, messageOptions)
}

async sendBatch<TCommand extends Command>(
commands: TCommand[],
messageOptions?: MessageAttributes
): Promise<void> {
this.logger.debug(`Sending batch of ${commands.length} commands to in-memory queue.`);
await Promise.all(
commands.map(command => this.send(command, messageOptions))
);
}

async publishBatch<TEvent extends Event>(
events: TEvent[],
messageOptions?: MessageAttributes
): Promise<void> {
this.logger.debug(`Publishing batch of ${events.length} events to in-memory queue.`);
await Promise.all(
events.map(event => this.publish(event, messageOptions))
);
}

async fail(
transportMessage: TransportMessage<InMemoryMessage>
): Promise<void> {
Expand Down
22 changes: 22 additions & 0 deletions packages/bus-core/src/transport/transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,28 @@ export interface Transport<TransportMessageType = {}> {
messageOptions?: MessageAttributes
): Promise<void>

/**
* Sends a batch of commands to the underlying transport.
* @param commands An array of domain commands to be sent
* @param messageOptions Options that control the behaviour around how the messages are sent and
* additional information that travels with them.
*/
sendBatch<TCommand extends Command>(
commands: TCommand[],
messageOptions?: MessageAttributes
): Promise<void>

/**
* Publishes a batch of events to the underlying transport.
* @param events An array of domain events to be published
* @param messageOptions Options that control the behaviour around how the messages are sent and
* additional information that travels with them.
*/
publishBatch<TEvent extends Event>(
events: TEvent[],
messageOptions?: MessageAttributes
): Promise<void>

/**
* Forwards @param transportMessage to the dead letter queue. The message must have been read in from the
* queue and have a receipt handle.
Expand Down
16 changes: 16 additions & 0 deletions packages/bus-rabbitmq/src/rabbitmq-transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,22 @@ export class RabbitMqTransport implements Transport<RabbitMqMessage> {
await this.publishMessage(command, messageAttributes)
}

async sendBatch<TCommand extends Command>(
_commands: TCommand[],
_messageOptions?: MessageAttributes
): Promise<void> {
this.logger.warn('sendBatch is called but not supported by RabbitMqTransport.');
throw new Error('Batch operations are not supported by RabbitMqTransport.');
}

async publishBatch<TEvent extends Event>(
_events: TEvent[],
_messageOptions?: MessageAttributes
): Promise<void> {
this.logger.warn('publishBatch is called but not supported by RabbitMqTransport.');
throw new Error('Batch operations are not supported by RabbitMqTransport.');
}

async fail(transportMessage: TransportMessage<unknown>): Promise<void> {
const rawMessage = transportMessage.raw as GetMessage
const serializedPayload = this.coreDependencies.messageSerializer.serialize(
Expand Down
Loading