Skip to content

Commit ffe3535

Browse files
committed
feat(tasks): add streaming methods for elicitation and sampling
Backport of PR #1210 to 1.x branch. - Add createMessageStream() with capability and message validation - Add elicitInputStream() with form/url mode support - Add collect-user-info-task example demonstrating bidirectional tasks - Add comprehensive tests for streaming methods
1 parent b0cf837 commit ffe3535

File tree

4 files changed

+944
-22
lines changed

4 files changed

+944
-22
lines changed

src/examples/client/simpleStreamableHttp.ts

Lines changed: 68 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,15 @@ import {
1515
LoggingMessageNotificationSchema,
1616
ResourceListChangedNotificationSchema,
1717
ElicitRequestSchema,
18+
ElicitResult,
1819
ResourceLink,
1920
ReadResourceRequest,
2021
ReadResourceResultSchema,
2122
RELATED_TASK_META_KEY,
2223
ErrorCode,
2324
McpError
2425
} from '../../types.js';
26+
import { InMemoryTaskStore } from '../../experimental/tasks/stores/in-memory.js';
2527
import { getDisplayName } from '../../shared/metadataUtils.js';
2628
import { Ajv } from 'ajv';
2729

@@ -65,6 +67,7 @@ function printHelp(): void {
6567
console.log(' greet [name] - Call the greet tool');
6668
console.log(' multi-greet [name] - Call the multi-greet tool with notifications');
6769
console.log(' collect-info [type] - Test form elicitation with collect-user-info tool (contact/preferences/feedback)');
70+
console.log(' collect-info-task [type] - Test bidirectional task support (server+client tasks) with elicitation');
6871
console.log(' start-notifications [interval] [count] - Start periodic notifications');
6972
console.log(' run-notifications-tool-with-resumability [interval] [count] - Run notification tool with resumability');
7073
console.log(' list-prompts - List available prompts');
@@ -131,6 +134,11 @@ function commandLoop(): void {
131134
await callCollectInfoTool(args[1] || 'contact');
132135
break;
133136

137+
case 'collect-info-task': {
138+
await callCollectInfoWithTask(args[1] || 'contact');
139+
break;
140+
}
141+
134142
case 'start-notifications': {
135143
const interval = args[1] ? parseInt(args[1], 10) : 2000;
136144
const count = args[2] ? parseInt(args[2], 10) : 10;
@@ -232,7 +240,10 @@ async function connect(url?: string): Promise<void> {
232240
console.log(`Connecting to ${serverUrl}...`);
233241

234242
try {
235-
// Create a new client with form elicitation capability
243+
// Create task store for client-side task support
244+
const clientTaskStore = new InMemoryTaskStore();
245+
246+
// Create a new client with form elicitation capability and task support
236247
client = new Client(
237248
{
238249
name: 'example-client',
@@ -242,25 +253,46 @@ async function connect(url?: string): Promise<void> {
242253
capabilities: {
243254
elicitation: {
244255
form: {}
256+
},
257+
tasks: {
258+
requests: {
259+
elicitation: {
260+
create: {}
261+
}
262+
}
245263
}
246-
}
264+
},
265+
taskStore: clientTaskStore
247266
}
248267
);
249268
client.onerror = error => {
250269
console.error('\x1b[31mClient error:', error, '\x1b[0m');
251270
};
252271

253-
// Set up elicitation request handler with proper validation
254-
client.setRequestHandler(ElicitRequestSchema, async request => {
272+
// Set up elicitation request handler with proper validation and task support
273+
client.setRequestHandler(ElicitRequestSchema, async (request, extra) => {
255274
if (request.params.mode !== 'form') {
256275
throw new McpError(ErrorCode.InvalidParams, `Unsupported elicitation mode: ${request.params.mode}`);
257276
}
258277
console.log('\n🔔 Elicitation (form) Request Received:');
259278
console.log(`Message: ${request.params.message}`);
260279
console.log(`Related Task: ${request.params._meta?.[RELATED_TASK_META_KEY]?.taskId}`);
280+
console.log(`Task Creation Requested: ${request.params.task ? 'yes' : 'no'}`);
261281
console.log('Requested Schema:');
262282
console.log(JSON.stringify(request.params.requestedSchema, null, 2));
263283

284+
// Helper to return result, optionally creating a task if requested
285+
const returnResult = async (result: ElicitResult) => {
286+
if (request.params.task && extra.taskStore) {
287+
// Create a task and store the result
288+
const task = await extra.taskStore.createTask({ ttl: extra.taskRequestedTtl });
289+
await extra.taskStore.storeTaskResult(task.taskId, 'completed', result);
290+
console.log(`📋 Created client-side task: ${task.taskId}`);
291+
return { task };
292+
}
293+
return result;
294+
};
295+
264296
const schema = request.params.requestedSchema;
265297
const properties = schema.properties;
266298
const required = schema.required || [];
@@ -381,7 +413,7 @@ async function connect(url?: string): Promise<void> {
381413
}
382414

383415
if (inputCancelled) {
384-
return { action: 'cancel' };
416+
return returnResult({ action: 'cancel' });
385417
}
386418

387419
// If we didn't complete all fields due to an error, try again
@@ -394,7 +426,7 @@ async function connect(url?: string): Promise<void> {
394426
continue;
395427
} else {
396428
console.log('Maximum attempts reached. Declining request.');
397-
return { action: 'decline' };
429+
return returnResult({ action: 'decline' });
398430
}
399431
}
400432

@@ -412,7 +444,7 @@ async function connect(url?: string): Promise<void> {
412444
continue;
413445
} else {
414446
console.log('Maximum attempts reached. Declining request.');
415-
return { action: 'decline' };
447+
return returnResult({ action: 'decline' });
416448
}
417449
}
418450

@@ -426,25 +458,34 @@ async function connect(url?: string): Promise<void> {
426458
});
427459
});
428460

429-
if (confirmAnswer === 'yes' || confirmAnswer === 'y') {
430-
return {
431-
action: 'accept',
432-
content
433-
};
434-
} else if (confirmAnswer === 'cancel' || confirmAnswer === 'c') {
435-
return { action: 'cancel' };
436-
} else if (confirmAnswer === 'no' || confirmAnswer === 'n') {
437-
if (attempts < maxAttempts) {
438-
console.log('Please re-enter the information...');
439-
continue;
440-
} else {
441-
return { action: 'decline' };
461+
switch (confirmAnswer) {
462+
case 'yes':
463+
case 'y': {
464+
return returnResult({
465+
action: 'accept',
466+
content: content as ElicitResult['content']
467+
});
468+
}
469+
case 'cancel':
470+
case 'c': {
471+
return returnResult({ action: 'cancel' });
472+
}
473+
case 'no':
474+
case 'n': {
475+
if (attempts < maxAttempts) {
476+
console.log('Please re-enter the information...');
477+
continue;
478+
} else {
479+
return returnResult({ action: 'decline' });
480+
}
481+
482+
break;
442483
}
443484
}
444485
}
445486

446487
console.log('Maximum attempts reached. Declining request.');
447-
return { action: 'decline' };
488+
return returnResult({ action: 'decline' });
448489
});
449490

450491
transport = new StreamableHTTPClientTransport(new URL(serverUrl), {
@@ -641,6 +682,12 @@ async function callCollectInfoTool(infoType: string): Promise<void> {
641682
await callTool('collect-user-info', { infoType });
642683
}
643684

685+
async function callCollectInfoWithTask(infoType: string): Promise<void> {
686+
console.log(`\n🔄 Testing bidirectional task support with collect-user-info-task tool (${infoType})...`);
687+
console.log('This will create a task on the server, which will elicit input and create a task on the client.\n');
688+
await callToolTask('collect-user-info-task', { infoType });
689+
}
690+
644691
async function startNotifications(interval: number, count: number): Promise<void> {
645692
console.log(`Starting notification stream: interval=${interval}ms, count=${count || 'unlimited'}`);
646693
await callTool('start-notification-stream', { interval, count });

src/examples/server/simpleStreamableHttp.ts

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import { requireBearerAuth } from '../../server/auth/middleware/bearerAuth.js';
88
import { createMcpExpressApp } from '../../server/express.js';
99
import {
1010
CallToolResult,
11+
ElicitResult,
1112
ElicitResultSchema,
1213
GetPromptResult,
1314
isInitializeRequest,
@@ -280,6 +281,114 @@ const getServer = () => {
280281
}
281282
);
282283

284+
// Register a tool that demonstrates bidirectional task support:
285+
// Server creates a task, then elicits input from client using elicitInputStream
286+
// Using the experimental tasks API - WARNING: may change without notice
287+
server.experimental.tasks.registerToolTask(
288+
'collect-user-info-task',
289+
{
290+
title: 'Collect Info with Task',
291+
description: 'Collects user info via elicitation with task support using elicitInputStream',
292+
inputSchema: {
293+
infoType: z.enum(['contact', 'preferences']).describe('Type of information to collect').default('contact')
294+
}
295+
},
296+
{
297+
async createTask({ infoType }, { taskStore: createTaskStore, taskRequestedTtl }) {
298+
// Create the server-side task
299+
const task = await createTaskStore.createTask({
300+
ttl: taskRequestedTtl
301+
});
302+
303+
// Perform async work that makes a nested elicitation request using elicitInputStream
304+
(async () => {
305+
try {
306+
const message = infoType === 'contact' ? 'Please provide your contact information' : 'Please set your preferences';
307+
308+
// Define schemas with proper typing for PrimitiveSchemaDefinition
309+
const contactSchema: {
310+
type: 'object';
311+
properties: Record<string, PrimitiveSchemaDefinition>;
312+
required: string[];
313+
} = {
314+
type: 'object',
315+
properties: {
316+
name: { type: 'string', title: 'Full Name', description: 'Your full name' },
317+
email: { type: 'string', title: 'Email', description: 'Your email address' }
318+
},
319+
required: ['name', 'email']
320+
};
321+
322+
const preferencesSchema: {
323+
type: 'object';
324+
properties: Record<string, PrimitiveSchemaDefinition>;
325+
required: string[];
326+
} = {
327+
type: 'object',
328+
properties: {
329+
theme: { type: 'string', title: 'Theme', enum: ['light', 'dark', 'auto'] },
330+
notifications: { type: 'boolean', title: 'Enable Notifications', default: true }
331+
},
332+
required: ['theme']
333+
};
334+
335+
const requestedSchema = infoType === 'contact' ? contactSchema : preferencesSchema;
336+
337+
// Use elicitInputStream to elicit input from client
338+
// This demonstrates the streaming elicitation API
339+
// Access via server.server to get the underlying Server instance
340+
const stream = server.server.experimental.tasks.elicitInputStream({
341+
mode: 'form',
342+
message,
343+
requestedSchema
344+
});
345+
346+
let elicitResult: ElicitResult | undefined;
347+
for await (const msg of stream) {
348+
if (msg.type === 'result') {
349+
elicitResult = msg.result as ElicitResult;
350+
} else if (msg.type === 'error') {
351+
throw msg.error;
352+
}
353+
}
354+
355+
if (!elicitResult) {
356+
throw new Error('No result received from elicitation');
357+
}
358+
359+
let resultText: string;
360+
if (elicitResult.action === 'accept') {
361+
resultText = `Collected ${infoType} info: ${JSON.stringify(elicitResult.content, null, 2)}`;
362+
} else if (elicitResult.action === 'decline') {
363+
resultText = `User declined to provide ${infoType} information`;
364+
} else {
365+
resultText = 'User cancelled the request';
366+
}
367+
368+
await taskStore.storeTaskResult(task.taskId, 'completed', {
369+
content: [{ type: 'text', text: resultText }]
370+
});
371+
} catch (error) {
372+
console.error('Error in collect-user-info-task:', error);
373+
await taskStore.storeTaskResult(task.taskId, 'failed', {
374+
content: [{ type: 'text', text: `Error: ${error}` }],
375+
isError: true
376+
});
377+
}
378+
})();
379+
380+
return { task };
381+
},
382+
async getTask(_args, { taskId, taskStore: getTaskStore }) {
383+
return await getTaskStore.getTask(taskId);
384+
},
385+
async getTaskResult(_args, { taskId, taskStore: getResultTaskStore }) {
386+
const result = await getResultTaskStore.getTaskResult(taskId);
387+
return result as CallToolResult;
388+
}
389+
}
390+
);
391+
283392
// Register a simple prompt with title
284393
server.registerPrompt(
285394
'greeting-template',

0 commit comments

Comments
 (0)