EXTENSION for TreeOS
stream
Send multiple messages while the AI is working. Corrections, additions, and cancellations reach the AI mid-tool-loop. The AI adjusts without restarting. Messages coalesce during idle periods (500ms debounce).
v1.0.1 by TreeOS Site 0 downloads 3 files 175 lines 5.5 KB published 38d ago
treeos ext install stream
View changelog

Manifest

Requires

  • services: hooks, websocket
SHA256: f7da091cf84605c62857ecf44a56dd7c61873b8146e4f7eb7bcca00f43e72c25

Source Code

1/**
2 * Stream Accumulator
3 *
4 * Per-session message buffer. When the AI is working, incoming messages
5 * accumulate here. The tool loop checkpoint reads them and injects them
6 * into the conversation. Stateless. No DB. Memory only.
7 */
8
9const buffers = new Map(); // visitorId -> { messages: [], interrupt: false }
10
11export function pushMessage(visitorId, message) {
12  if (!buffers.has(visitorId)) {
13    buffers.set(visitorId, { messages: [], interrupt: false });
14  }
15  const buf = buffers.get(visitorId);
16  buf.messages.push({ content: message, timestamp: Date.now() });
17  buf.interrupt = true;
18}
19
20export function checkInterrupt(visitorId) {
21  const buf = buffers.get(visitorId);
22  if (!buf || !buf.interrupt) return null;
23  buf.interrupt = false;
24  const messages = buf.messages.splice(0);
25  return messages;
26}
27
28export function clear(visitorId) {
29  buffers.delete(visitorId);
30}
31
1/**
2 * Stream
3 *
4 * Mid-flight message injection. The user sends messages while the AI
5 * is working. Instead of queueing (wait for finish) or aborting (lose work),
6 * the messages accumulate and reach the AI at the next tool loop checkpoint.
7 *
8 * Three behaviors:
9 * - Append: "also add stretching" while building exercises
10 * - Correct: "actually 3 days not 4" while building a plan
11 * - Cancel: "stop" or "cancel" terminates the tool loop
12 *
13 * Idle debounce: rapid messages coalesce within a 500ms window.
14 * "eggs and coffee" + "also had a banana" become one message, one LLM call.
15 */
16
17import log from "../../seed/log.js";
18import { pushMessage, checkInterrupt } from "./accumulator.js";
19
20const DEBOUNCE_MS = 500;
21
22export async function init(core) {
23  const debounceTimers = new Map(); // visitorId -> timer
24
25  core.websocket.registerSocketHandler("register", async ({ socket }) => {
26    const visitorId = socket.visitorId;
27    if (!visitorId) return;
28
29    // ── In-flight interception ──
30    // Called by the kernel when a message arrives while processing.
31    // Accumulates instead of queueing or aborting.
32    socket._onStreamMessage = (message, _chatMode, generation) => {
33      pushMessage(visitorId, message);
34      socket.emit("messageQueued", {
35        message,
36        status: "will be incorporated",
37        generation,
38      });
39      log.debug("Stream", `Accumulated mid-flight for ${visitorId}: "${message.slice(0, 60)}"`);
40    };
41
42    // ── Idle debounce ──
43    // Called by the kernel when session is idle. Returns true to swallow
44    // the message (accumulating, timer running). When the timer fires,
45    // drains all accumulated messages and processes them as one.
46    let _debounceBypass = false; // prevent re-entry on debounce fire
47
48    socket._onStreamIdle = (message, chatMode, generation) => {
49      // When the debounce timer fires, it re-enters the chat handler.
50      // Skip debounce on re-entry so the combined message processes normally.
51      if (_debounceBypass) {
52        _debounceBypass = false;
53        return false; // fall through to normal processing
54      }
55
56      pushMessage(visitorId, message);
57
58      const existing = debounceTimers.get(visitorId);
59      if (existing) clearTimeout(existing);
60
61      const timer = setTimeout(() => {
62        debounceTimers.delete(visitorId);
63        const pending = checkInterrupt(visitorId);
64        if (!pending || pending.length === 0) return;
65
66        const combined = pending.map(m => m.content).join("\n");
67
68        // Re-enter the chat handler with the combined message.
69        // Set bypass flag so we don't debounce our own combined message.
70        if (socket._chatHandler) {
71          _debounceBypass = true;
72          socket._chatHandler({
73            message: combined,
74            username: socket.username,
75            generation,
76            mode: chatMode,
77          });
78        }
79      }, DEBOUNCE_MS);
80
81      debounceTimers.set(visitorId, timer);
82      log.debug("Stream", `Debouncing for ${visitorId} (${DEBOUNCE_MS}ms): "${message.slice(0, 60)}"`);
83      return true; // swallow, waiting for debounce window
84    };
85
86    // ── Tool loop checkpoint ──
87    // Called by the kernel between tool iterations.
88    // Reads accumulated messages and injects them into the conversation.
89    socket._streamCheckpoint = async () => {
90      const pending = checkInterrupt(visitorId);
91      if (!pending || pending.length === 0) return null;
92
93      const lastMsg = pending[pending.length - 1].content.toLowerCase().trim();
94
95      // Cancel
96      if (/^(stop|cancel|nevermind|abort|quit)$/.test(lastMsg)) {
97        log.debug("Stream", `Cancel detected for ${visitorId}`);
98        return { abort: true };
99      }
100
101      // Inject
102      const updates = pending.map(m => m.content).join("\n");
103      log.debug("Stream", `Injecting ${pending.length} message(s) for ${visitorId}`);
104      return {
105        inject: `[User update while you were working: "${updates}". ` +
106                `Adjust your remaining work accordingly. ` +
107                `Do not restart. Continue from where you are.]`,
108      };
109    };
110  });
111
112  log.info("Stream", "Loaded. Messages reach the AI mid-flight.");
113  return {};
114}
115
1export default {
2  name: "stream",
3  version: "1.0.1",
4  builtFor: "TreeOS",
5  description:
6    "Send multiple messages while the AI is working. Corrections, additions, " +
7    "and cancellations reach the AI mid-tool-loop. The AI adjusts without restarting. " +
8    "Messages coalesce during idle periods (500ms debounce).",
9
10  needs: {
11    services: ["hooks", "websocket"],
12  },
13
14  optional: {},
15
16  provides: {
17    models: {},
18    routes: false,
19    tools: false,
20    jobs: false,
21    modes: false,
22
23    hooks: {
24      fires: [],
25      listens: [],
26    },
27  },
28};
29

Versions

Version Published Downloads
1.0.1 38d ago 0
1.0.0 48d ago 0
0 stars
0 flags
React from the CLI: treeos ext star stream

Comments

Loading comments...

Post comments from the CLI: treeos ext comment stream "your comment"
Max 3 comments per extension. One star and one flag per user.