f7da091cf84605c62857ecf44a56dd7c61873b8146e4f7eb7bcca00f43e72c251/**
2 * Stream Accumulator
3 *
4 * Per-session message buffer. When the AI is working, incoming messages
5 * accumulate here. The tool loop checkpoint reads them and injects them
6 * into the conversation. Stateless. No DB. Memory only.
7 */
8
9const buffers = new Map(); // visitorId -> { messages: [], interrupt: false }
10
11export function pushMessage(visitorId, message) {
12 if (!buffers.has(visitorId)) {
13 buffers.set(visitorId, { messages: [], interrupt: false });
14 }
15 const buf = buffers.get(visitorId);
16 buf.messages.push({ content: message, timestamp: Date.now() });
17 buf.interrupt = true;
18}
19
20export function checkInterrupt(visitorId) {
21 const buf = buffers.get(visitorId);
22 if (!buf || !buf.interrupt) return null;
23 buf.interrupt = false;
24 const messages = buf.messages.splice(0);
25 return messages;
26}
27
28export function clear(visitorId) {
29 buffers.delete(visitorId);
30}
311/**
2 * Stream
3 *
4 * Mid-flight message injection. The user sends messages while the AI
5 * is working. Instead of queueing (wait for finish) or aborting (lose work),
6 * the messages accumulate and reach the AI at the next tool loop checkpoint.
7 *
8 * Three behaviors:
9 * - Append: "also add stretching" while building exercises
10 * - Correct: "actually 3 days not 4" while building a plan
11 * - Cancel: "stop" or "cancel" terminates the tool loop
12 *
13 * Idle debounce: rapid messages coalesce within a 500ms window.
14 * "eggs and coffee" + "also had a banana" become one message, one LLM call.
15 */
16
17import log from "../../seed/log.js";
18import { pushMessage, checkInterrupt } from "./accumulator.js";
19
20const DEBOUNCE_MS = 500;
21
22export async function init(core) {
23 const debounceTimers = new Map(); // visitorId -> timer
24
25 core.websocket.registerSocketHandler("register", async ({ socket }) => {
26 const visitorId = socket.visitorId;
27 if (!visitorId) return;
28
29 // ── In-flight interception ──
30 // Called by the kernel when a message arrives while processing.
31 // Accumulates instead of queueing or aborting.
32 socket._onStreamMessage = (message, _chatMode, generation) => {
33 pushMessage(visitorId, message);
34 socket.emit("messageQueued", {
35 message,
36 status: "will be incorporated",
37 generation,
38 });
39 log.debug("Stream", `Accumulated mid-flight for ${visitorId}: "${message.slice(0, 60)}"`);
40 };
41
42 // ── Idle debounce ──
43 // Called by the kernel when session is idle. Returns true to swallow
44 // the message (accumulating, timer running). When the timer fires,
45 // drains all accumulated messages and processes them as one.
46 let _debounceBypass = false; // prevent re-entry on debounce fire
47
48 socket._onStreamIdle = (message, chatMode, generation) => {
49 // When the debounce timer fires, it re-enters the chat handler.
50 // Skip debounce on re-entry so the combined message processes normally.
51 if (_debounceBypass) {
52 _debounceBypass = false;
53 return false; // fall through to normal processing
54 }
55
56 pushMessage(visitorId, message);
57
58 const existing = debounceTimers.get(visitorId);
59 if (existing) clearTimeout(existing);
60
61 const timer = setTimeout(() => {
62 debounceTimers.delete(visitorId);
63 const pending = checkInterrupt(visitorId);
64 if (!pending || pending.length === 0) return;
65
66 const combined = pending.map(m => m.content).join("\n");
67
68 // Re-enter the chat handler with the combined message.
69 // Set bypass flag so we don't debounce our own combined message.
70 if (socket._chatHandler) {
71 _debounceBypass = true;
72 socket._chatHandler({
73 message: combined,
74 username: socket.username,
75 generation,
76 mode: chatMode,
77 });
78 }
79 }, DEBOUNCE_MS);
80
81 debounceTimers.set(visitorId, timer);
82 log.debug("Stream", `Debouncing for ${visitorId} (${DEBOUNCE_MS}ms): "${message.slice(0, 60)}"`);
83 return true; // swallow, waiting for debounce window
84 };
85
86 // ── Tool loop checkpoint ──
87 // Called by the kernel between tool iterations.
88 // Reads accumulated messages and injects them into the conversation.
89 socket._streamCheckpoint = async () => {
90 const pending = checkInterrupt(visitorId);
91 if (!pending || pending.length === 0) return null;
92
93 const lastMsg = pending[pending.length - 1].content.toLowerCase().trim();
94
95 // Cancel
96 if (/^(stop|cancel|nevermind|abort|quit)$/.test(lastMsg)) {
97 log.debug("Stream", `Cancel detected for ${visitorId}`);
98 return { abort: true };
99 }
100
101 // Inject
102 const updates = pending.map(m => m.content).join("\n");
103 log.debug("Stream", `Injecting ${pending.length} message(s) for ${visitorId}`);
104 return {
105 inject: `[User update while you were working: "${updates}". ` +
106 `Adjust your remaining work accordingly. ` +
107 `Do not restart. Continue from where you are.]`,
108 };
109 };
110 });
111
112 log.info("Stream", "Loaded. Messages reach the AI mid-flight.");
113 return {};
114}
1151export default {
2 name: "stream",
3 version: "1.0.1",
4 builtFor: "TreeOS",
5 description:
6 "Send multiple messages while the AI is working. Corrections, additions, " +
7 "and cancellations reach the AI mid-tool-loop. The AI adjusts without restarting. " +
8 "Messages coalesce during idle periods (500ms debounce).",
9
10 needs: {
11 services: ["hooks", "websocket"],
12 },
13
14 optional: {},
15
16 provides: {
17 models: {},
18 routes: false,
19 tools: false,
20 jobs: false,
21 modes: false,
22
23 hooks: {
24 fires: [],
25 listens: [],
26 },
27 },
28};
29
| Version | Published | Downloads |
|---|---|---|
| 1.0.1 | 38d ago | 0 |
| 1.0.0 | 48d ago | 0 |
treeos ext star stream
Post comments from the CLI: treeos ext comment stream "your comment"
Max 3 comments per extension. One star and one flag per user.
Loading comments...