Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions hn-monitor-telegram/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# hn-monitor-telegram

<img src="./banner.png" alt="HN Monitor Telegram">

Instantly launch this agent on Agent Relay

[![Launch Agent](https://agentrelay.com/launch-agent_small.svg)](https://agentrelay.com/cloud/deploy?persona=https://github.com/AgentWorkforce/agents/blob/main/hn-monitor-telegram/persona.ts)

The **Telegram** sibling of [`hn-monitor`](../hn-monitor) — scans Hacker News a few times a day for your topics and posts a digest to a Telegram chat, **threaded under a compact count header**. Message the bot to ask about what it recently posted.

## How it works

- **Schedule:** cron scan → fetch the HN front page → keep stories matching `TOPICS` → drop already-posted (durable memory) → summarize with `ctx.llm` → post a count header, then thread the digest under it via **native Telegram `reply_to_message_id`** (no header/parentRef dance).
- **Trigger:** telegram `message` → Q&A over the last ~30 days of posted digests.
- Uses the shared Telegram transport (`../shared/telegram.ts`). Idempotency: claims "seen" before posting; if a header lands but the body send fails, it stores a pending threaded body and retries that body without reposting the header.

## Inputs

| input | required | purpose |
|---|---|---|
| `TOPICS` | no | Comma-separated keywords matched against story titles (default: `agents,ai,typescript,developer tools`). |
| `TELEGRAM_CHAT` | yes | Telegram chat id to post the digest to and answer Q&A in. No chat picker yet — enter the numeric chat id. |

## Auth

A **Telegram Nango connection** (bot token from `@BotFather`). `useSubscription` resolves the deployer's Anthropic credential for `ctx.llm` (summaries + Q&A).

> **Note:** telegram trigger/scope catalogs are a pending cutover (relayfile-adapters#222 / workforce#249); the deploy target must have the telegram adapter registered.
353 changes: 353 additions & 0 deletions hn-monitor-telegram/agent.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,353 @@
/**
* hn-monitor-telegram handler.
*
* cron tick
* → fetch the HN front page
* → keep stories whose title matches one of your TOPICS
* → drop ones already posted (durable memory)
* → summarize with ctx.llm
* → post a compact count header to Telegram, then thread the digest under it
*
* telegram message
* → answer questions about what's been posted (recall ~30 days of digests)
*
* The Telegram sibling of hn-monitor: same scan/summarize/Q&A, Telegram instead
* of Slack. Threading is native (reply_to_message_id) — simpler than slack's
* header+parentRef dance. Uses the shared Telegram transport (../shared/telegram.ts).
*/
import {
defineAgent,
isCronTickEvent,
type AgentEvent,
type WorkforceCtx
} from '@agentworkforce/runtime';
import {
readTelegramMessage,
skipReason,
replyToMessage,
defaultTelegram,
bareChatId,
type TelegramSender
} from '../shared/telegram.js';

export interface Story {
id: number;
title: string;
url: string;
points: number;
}

export interface PostRecord {
postedAt: string;
digest: string;
stories: Array<{ title: string; url: string; points: number }>;
}

interface PendingThreadBody {
chat: string;
header: string;
body: string;
replyToMessageId?: number;
createdAt: string;
stories: Array<{ title: string; url: string; points: number }>;
}

export default defineAgent({
schedules: [{ name: 'scan', cron: '0 9,17 * * *', tz: 'America/New_York' }],
triggers: {
// Q&A: message the bot to ask about recently posted digests.
telegram: [{ on: 'message' }]
},
handler: async (ctx, event) => {
if (typeof event.type === 'string' && event.type.startsWith('telegram.')) {
await handleTelegramMessage(ctx, event as unknown as AgentEvent);
return;
}
if (!isCronTickEvent(event as unknown as AgentEvent)) return;

const chat = input(ctx, 'TELEGRAM_CHAT');
if (!chat) throw new Error('TELEGRAM_CHAT is required');
if (await retryPendingThreadBody(ctx, chat)) return;

const topics = list(input(ctx, 'TOPICS')).map((t) => t.toLowerCase());

const stories = await fetchFrontPage();
ctx.log?.('info', 'hn-monitor-telegram.fetched', { stories: stories.length });
const matches = stories.filter((s) => topics.some((t) => s.title.toLowerCase().includes(t)));
ctx.log?.('info', 'hn-monitor-telegram.matched', { matched: matches.length });

const seen = await loadSeen(ctx);
const fresh = matches.filter((s) => !seen.includes(s.id));
ctx.log?.('info', 'hn-monitor-telegram.fresh', { fresh: fresh.length });
if (fresh.length === 0) {
ctx.log?.('info', 'hn-monitor-telegram.nothing-new', { matched: matches.length });
return;
}

await postFreshStories(ctx, chat, seen, fresh);
}
});

/** Q&A path: recall recent digests and answer the user's question over Telegram. */
export async function handleTelegramMessage(
ctx: WorkforceCtx,
event: AgentEvent,
deps: { complete?: (prompt: string) => Promise<string>; telegram?: TelegramSender } = {}
): Promise<void> {
const msg = readTelegramMessage((await event.expand('full')).data);
if (!msg) {
ctx.log?.('info', 'hn-monitor-telegram.inbox.unparseable');
return;
}
const reason = skipReason(msg, input(ctx, 'TELEGRAM_CHAT'));
if (reason) {
ctx.log?.('info', `hn-monitor-telegram.inbox.skip reason=${reason.replace(/\s+/g, '-')}`);
return;
}

const posts = await loadPosts(ctx);
ctx.log?.('info', 'hn-monitor-telegram.inbox.recalled', { posts: posts.length });

const context = posts.length
? posts.map((p) => `### Posted ${p.postedAt ?? 'Unknown'}\n${p.digest ?? ''}`).join('\n\n')
: 'No Hacker News digests have been posted yet.';

const prompt = [
"You are a Hacker News monitor. Answer the user's question using ONLY the recently posted digests below.",
'Do not invent stories or facts that are not present in the posts. If the posts do not cover the question, say so.',
'Be concise.',
'',
'## Recently posted digests (most recent ~30 days)',
context,
'',
'## User question',
msg.text.trim()
].join('\n');

const complete = deps.complete ?? ((p: string) => ctx.llm.complete(p, { maxTokens: 1024 }));
let answer: string;
try {
answer = await withTimeout(complete(prompt), 45_000, 'ctx.llm.complete');
} catch (error) {
ctx.log?.('warn', 'hn-monitor-telegram.llm-fallback', { error: String(error) });
const titles = posts
.flatMap((p) => (p.stories ?? []).map((s) => `- ${s.title ?? 'Untitled'} ${s.url ?? ''}`))
.slice(0, 15)
.join('\n');
answer = titles
? `I couldn't generate an answer right now; here are the recent post titles:\n${titles}`
: "I couldn't generate an answer right now, and I don't have any recent posts to show.";
}

const tg = deps.telegram ?? defaultTelegram();
await replyToMessage(ctx, tg, msg, answer.trim() || 'No answer available.');
}

export async function postFreshStories(
ctx: WorkforceCtx,
chat: string,
seen: number[],
fresh: Story[],
deps: { complete?: (prompt: string) => Promise<string>; telegram?: TelegramSender } = {}
): Promise<void> {
// Claim the stories as seen BEFORE the post (at-least-once concurrency guard;
// a cron tick can re-invoke this handler — cloud#1990).
await saveSeen(ctx, [...seen, ...fresh.map((s) => s.id)].slice(-200));
// Once the header posts, a thrown handler is retried by the runtime and would
// re-post a duplicate header — so only release the claim + rethrow while
// nothing has posted yet.
let headerPosted = false;
let pending: PendingThreadBody | null = null;
try {
ctx.log?.('info', 'hn-monitor-telegram.summarizing', { fresh: fresh.length });
const { header, body } = await summarize(ctx, fresh, deps.complete);
const tg = deps.telegram ?? defaultTelegram();
pending = {
chat: bareChatId(chat),
header,
body,
createdAt: new Date().toISOString(),
stories: fresh.map((s) => ({ title: s.title, url: s.url, points: s.points }))
};

// Native Telegram threading: post the header, then post the digest with
// reply_to_message_id = the header's delivered message id. ok:false means the
// writeback got no receipt (silent drop) — treat it as a loud failure.
const head = await tg.send(bareChatId(chat), header);
if (!head.ok) throw new Error(`Telegram header post to ${bareChatId(chat)} got no writeback receipt (silent drop)`);
headerPosted = true;
ctx.log?.('info', 'hn-monitor-telegram.header-posted', { messageId: head.messageId });
const replyToMessageId = head.messageId ? Number(head.messageId) || undefined : undefined;
pending.replyToMessageId = replyToMessageId;
const reply = await tg.send(bareChatId(chat), body, { replyToMessageId });
if (!reply.ok) throw new Error(`Telegram threaded digest to ${bareChatId(chat)} got no writeback receipt (silent drop)`);
ctx.log?.('info', 'hn-monitor-telegram.posted', { messageId: head.messageId });

await savePost(ctx, {
postedAt: new Date().toISOString(),
digest: `${header}\n${body}`,
stories: fresh.map((s) => ({ title: s.title, url: s.url, points: s.points }))
});
} catch (err) {
if (!headerPosted) {
// Nothing landed yet — release the provisional claim so the next tick
// retries this digest, then rethrow.
await saveSeen(ctx, seen).catch(() => {});
throw err;
}
if (pending) {
await savePendingThreadBody(ctx, pending)
.catch((error) => ctx.log?.('error', 'hn-monitor-telegram.pending-save-failed', { error: String(error) }));
}
// The header already posted; releasing + rethrowing would duplicate it on
// the runtime's retry. Keep the claim and let the next scan retry the body.
ctx.log?.('error', 'hn-monitor-telegram.thread-incomplete', { error: err instanceof Error ? err.message : String(err) });
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}

export async function retryPendingThreadBody(
ctx: WorkforceCtx,
chat: string,
deps: { telegram?: TelegramSender } = {}
): Promise<boolean> {
const pending = await loadPendingThreadBody(ctx);
if (!pending) return false;
const targetChat = bareChatId(chat);
if (bareChatId(pending.chat) !== targetChat) return false;

const tg = deps.telegram ?? defaultTelegram();
const reply = await tg.send(targetChat, pending.body, { replyToMessageId: pending.replyToMessageId });
if (!reply.ok) {
ctx.log?.('error', 'hn-monitor-telegram.pending-body-retry-failed', { chat: targetChat });
return true;
}

await savePost(ctx, {
postedAt: new Date().toISOString(),
digest: `${pending.header}\n${pending.body}`,
stories: pending.stories
});
await clearPendingThreadBody(ctx);
ctx.log?.('info', 'hn-monitor-telegram.pending-body-posted', { chat: targetChat });
return true;
}

/** Top ~30 front-page stories via the public HN Algolia API. Returns [] on any
* network/parse failure so a transient outage doesn't crash the run. */
async function fetchFrontPage(): Promise<Story[]> {
const controller = new AbortController();
const timeout = setTimeout(() => controller.abort(), 8_000);
try {
const res = await fetch('https://hn.algolia.com/api/v1/search?tags=front_page&hitsPerPage=30', {
signal: controller.signal
});
if (!res.ok) return [];
const data = (await res.json()) as { hits: Array<{ objectID: string; title: string; url: string | null; points: number }> };
return data.hits.map((h) => ({
id: Number(h.objectID),
title: h.title,
url: h.url ?? `https://news.ycombinator.com/item?id=${h.objectID}`,
points: h.points
}));
} catch {
return [];
} finally {
clearTimeout(timeout);
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}

/** Split into the count `header` (parent) and the `body` (threaded digest).
* summarize() must ALWAYS return a postable body — on timeout/error it falls
* back to a plain bulleted digest from the story lines. */
async function summarize(
ctx: WorkforceCtx,
stories: Story[],
complete?: (prompt: string) => Promise<string>
): Promise<{ header: string; body: string }> {
const lines = stories.map((s) => `- ${s.title} (${s.points} pts) ${s.url}`).join('\n');
const header = `📰 Hacker News — ${stories.length} new match(es)`;
const run = complete ?? ((p: string) => ctx.llm.complete(p, { maxTokens: 500 }));
try {
const digest = await withTimeout(
run(`Write a tight digest (one bullet per story, lead with why it matters):\n\n${lines}`),
45_000,
'ctx.llm.complete'
);
return { header, body: digest.trim() };
} catch (error) {
ctx.log?.('warn', 'hn-monitor-telegram.llm-fallback', { error: String(error) });
return { header, body: lines };
}
}

/** Race a promise against a timeout so a hung LLM can't stall the run. */
async function withTimeout<T>(p: Promise<T>, ms: number, label: string): Promise<T> {
let timer: NodeJS.Timeout;
const timeout = new Promise<never>((_, rej) => {
timer = setTimeout(() => rej(new Error(`${label} timed out after ${ms}ms`)), ms);
});
try {
return await Promise.race([p, timeout]);
} finally {
clearTimeout(timer!);
}
}

// ── tiny helpers ────────────────────────────────────────────────────────────
function list(raw: string | undefined): string[] {
return (raw ?? '').split(',').map((s) => s.trim()).filter(Boolean);
}
function input(ctx: WorkforceCtx, name: string): string | undefined {
const spec = ctx.persona.inputSpecs?.[name];
const v = process.env[spec?.env ?? name] ?? ctx.persona.inputs?.[name] ?? spec?.default;
return v && v.trim() ? v : undefined;
}
async function loadSeen(ctx: WorkforceCtx): Promise<number[]> {
const [item] = await ctx.memory.recall('hn-monitor seen', { tags: ['hn-monitor-telegram:seen'], limit: 1 });
try {
return item ? (JSON.parse(item.content) as number[]) : [];
} catch {
return [];
}
}
async function saveSeen(ctx: WorkforceCtx, ids: number[]): Promise<void> {
await ctx.memory.save(JSON.stringify(ids), { tags: ['hn-monitor-telegram:seen'], scope: 'workspace' });
}
async function savePost(ctx: WorkforceCtx, record: PostRecord): Promise<void> {
await ctx.memory.save(JSON.stringify(record), { tags: ['hn-monitor-telegram:post'], scope: 'workspace' });
}
async function loadPendingThreadBody(ctx: WorkforceCtx): Promise<PendingThreadBody | null> {
const [item] = await ctx.memory.recall('hn-monitor pending telegram body', {
tags: ['hn-monitor-telegram:pending-thread-body'],
limit: 1
});
if (!item?.content) return null;
try {
return JSON.parse(item.content) as PendingThreadBody | null;
} catch {
return null;
}
}
async function savePendingThreadBody(ctx: WorkforceCtx, pending: PendingThreadBody): Promise<void> {
await ctx.memory.save(JSON.stringify(pending), { tags: ['hn-monitor-telegram:pending-thread-body'], scope: 'workspace' });
}
async function clearPendingThreadBody(ctx: WorkforceCtx): Promise<void> {
await ctx.memory.save('null', { tags: ['hn-monitor-telegram:pending-thread-body'], scope: 'workspace' });
}
/** Recalls recent posted digests, newest first, dropping any malformed record. */
async function loadPosts(ctx: WorkforceCtx): Promise<PostRecord[]> {
const items = await ctx.memory.recall('hn-monitor posted digest', {
tags: ['hn-monitor-telegram:post'],
scope: 'workspace',
limit: 60
});
const posts: PostRecord[] = [];
for (const item of items) {
try {
posts.push(JSON.parse(item.content) as PostRecord);
} catch {
// skip records that aren't valid JSON
}
}
return posts.sort((a, b) => (b.postedAt ?? '').localeCompare(a.postedAt ?? ''));
}
Binary file added hn-monitor-telegram/avatar.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added hn-monitor-telegram/banner.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added hn-monitor-telegram/card.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading