EXTENSION for treeos-cascade
pulse
How the land knows its own health. Reads .flow on a timer, counts results by status, tracks rates over time, and writes a health summary to a .pulse node under the land root. The background job runs on a configurable interval (default 60 seconds), queries .flow for all results since the last check, counts succeeded/failed/rejected/queued/partial/awaiting, calculates failure rate, tracks which nodes produce the most failures, and tracks which cross-land connections are healthy or degraded. The summary writes to .pulse as a note so the AI can read it through enrichContext when asked about land health. The AI does not need a dashboard. It reads the pulse node and tells you what is happening in natural language. Also fires afterNote on the .pulse node so other extensions can react to health changes. If failure rate spikes, another extension could pause cascade, alert the operator, or trigger a compression run on overloaded trees.
v1.0.1 by TreeOS Site 0 downloads 5 files 561 lines 16.7 KB published 38d ago
treeos ext install pulse
View changelog

Manifest

Provides

  • routes
  • jobs
  • 1 CLI commands

Requires

  • models: Node
SHA256: c6ec8e2a41493eac9c94a358399fa136b515e95d132d1cd042130bbc29c6e209

Dependents

1 package depend on this

PackageTypeRelationship
treeos-cascade v1.0.1bundleincludes

CLI Commands

CommandMethodDescription
pulseGETLand health. No action shows latest snapshot. Actions: history, peers.
pulse historyGETLast 10 health snapshots. Trend view.
pulse peersGETPeer-specific health. Healthy, degraded, mixed.

Hooks

Listens To

  • enrichContext

Source Code

1/**
2 * Pulse Core
3 *
4 * Queries .flow partitions, counts cascade results by status,
5 * tracks failure sources, calculates rates, and builds a health summary.
6 */
7
8import log from "../../seed/log.js";
9import Node from "../../seed/models/node.js";
10import Note from "../../seed/models/note.js";
11import { SYSTEM_ROLE, SYSTEM_OWNER, CASCADE, CONTENT_TYPE } from "../../seed/protocol.js";
12import { hooks } from "../../seed/hooks.js";
13import { v4 as uuidv4 } from "uuid";
14
15// ─────────────────────────────────────────────────────────────────────────
16// PULSE NODE
17// ─────────────────────────────────────────────────────────────────────────
18
19let pulseNodeId = null;
20
21/**
22 * Find or create the .pulse node under the land root.
23 * Regular node (no systemRole). Created once at install.
24 */
25export async function ensurePulseNode() {
26  if (pulseNodeId) {
27    const exists = await Node.findById(pulseNodeId).select("_id").lean();
28    if (exists) return pulseNodeId;
29  }
30
31  const landRoot = await Node.findOne({ systemRole: SYSTEM_ROLE.LAND_ROOT }).select("_id children").lean();
32  if (!landRoot) throw new Error("Land root not found");
33
34  // Check if .pulse already exists as a child
35  for (const childId of landRoot.children || []) {
36    const child = await Node.findById(childId).select("_id name").lean();
37    if (child && child.name === ".pulse") {
38      pulseNodeId = child._id;
39      return pulseNodeId;
40    }
41  }
42
43  // Create .pulse under land root
44  const { createSystemNode } = await import("../../seed/tree/treeManagement.js");
45  const pulseNode = await createSystemNode({
46    name: ".pulse",
47    parentId: landRoot._id,
48    metadata: new Map([["pulse", { installedAt: new Date().toISOString() }]]),
49  });
50
51  pulseNodeId = String(pulseNode._id);
52  log.info("Pulse", `Created .pulse node: ${pulseNodeId}`);
53  return pulseNodeId;
54}
55
56export function getPulseNodeId() {
57  return pulseNodeId;
58}
59
60// ─────────────────────────────────────────────────────────────────────────
61// PULSE CONFIG (stored on .config metadata.pulse)
62// ─────────────────────────────────────────────────────────────────────────
63
64export async function getPulseConfig() {
65  const configNode = await Node.findOne({ systemRole: SYSTEM_ROLE.CONFIG }).select("metadata").lean();
66  if (!configNode) return defaults();
67
68  const meta = configNode.metadata instanceof Map
69    ? configNode.metadata.get("pulse") || {}
70    : configNode.metadata?.pulse || {};
71
72  return {
73    intervalMs: meta.intervalMs ?? 60000,
74    maxNotesRetained: meta.maxNotesRetained ?? 100,
75    failureRateThreshold: meta.failureRateThreshold ?? 0.3,
76  };
77}
78
79function defaults() {
80  return {
81    intervalMs: 60000,
82    maxNotesRetained: 100,
83    failureRateThreshold: 0.3,
84  };
85}
86
87// ─────────────────────────────────────────────────────────────────────────
88// HEALTH QUERY
89// ─────────────────────────────────────────────────────────────────────────
90
91// Track the last check time so we only count new results
92let lastCheckTime = null;
93
94/**
95 * Query .flow for results since the last check.
96 * Returns raw counts and source-level failure tracking.
97 */
98export async function queryFlowSince(since) {
99  const flowNode = await Node.findOne({ systemRole: SYSTEM_ROLE.FLOW }).select("_id").lean();
100  if (!flowNode) return emptySnapshot();
101
102  // Load today's and yesterday's partitions (covers the boundary)
103  const today = new Date().toISOString().slice(0, 10);
104  const yesterday = new Date(Date.now() - 86400000).toISOString().slice(0, 10);
105
106  const partitions = await Node.find({
107    parent: flowNode._id,
108    name: { $in: [today, yesterday] },
109  }).select("metadata").lean();
110
111  const counts = {
112    [CASCADE.SUCCEEDED]: 0,
113    [CASCADE.FAILED]: 0,
114    [CASCADE.REJECTED]: 0,
115    [CASCADE.QUEUED]: 0,
116    [CASCADE.PARTIAL]: 0,
117    [CASCADE.AWAITING]: 0,
118  };
119
120  const failureSources = {};  // nodeId -> failure count
121  const peerResults = {};     // peer domain -> { ok, fail }
122  let totalSignals = 0;
123  let totalResults = 0;
124
125  for (const partition of partitions) {
126    const results = partition.metadata instanceof Map
127      ? partition.metadata.get("results") || {}
128      : partition.metadata?.results || {};
129
130    for (const [signalId, entries] of Object.entries(results)) {
131      if (!Array.isArray(entries)) continue;
132
133      for (const entry of entries) {
134        // Filter by time if since is set
135        if (since && entry.timestamp) {
136          const ts = new Date(entry.timestamp).getTime();
137          if (ts < since) continue;
138        }
139
140        totalResults++;
141
142        if (entry.status && counts[entry.status] !== undefined) {
143          counts[entry.status]++;
144        }
145
146        // Track failure sources
147        if (entry.status === CASCADE.FAILED || entry.status === CASCADE.REJECTED) {
148          const src = entry.source || "unknown";
149          failureSources[src] = (failureSources[src] || 0) + 1;
150        }
151
152        // Track cross-land peer results
153        if (entry.payload?.peer) {
154          const domain = entry.payload.peer;
155          if (!peerResults[domain]) peerResults[domain] = { ok: 0, fail: 0 };
156          if (entry.status === CASCADE.SUCCEEDED) {
157            peerResults[domain].ok++;
158          } else {
159            peerResults[domain].fail++;
160          }
161        }
162      }
163
164      totalSignals++;
165    }
166  }
167
168  return { counts, failureSources, peerResults, totalSignals, totalResults };
169}
170
171function emptySnapshot() {
172  return {
173    counts: {
174      [CASCADE.SUCCEEDED]: 0,
175      [CASCADE.FAILED]: 0,
176      [CASCADE.REJECTED]: 0,
177      [CASCADE.QUEUED]: 0,
178      [CASCADE.PARTIAL]: 0,
179      [CASCADE.AWAITING]: 0,
180    },
181    failureSources: {},
182    peerResults: {},
183    totalSignals: 0,
184    totalResults: 0,
185  };
186}
187
188// ─────────────────────────────────────────────────────────────────────────
189// HEALTH SNAPSHOT
190// ─────────────────────────────────────────────────────────────────────────
191
192/**
193 * Build a health snapshot from .flow data since the last check.
194 * Updates lastCheckTime.
195 */
196export async function buildHealthSnapshot() {
197  const since = lastCheckTime;
198  lastCheckTime = Date.now();
199
200  const data = await queryFlowSince(since);
201  const total = data.totalResults;
202  const failures = data.counts[CASCADE.FAILED] + data.counts[CASCADE.REJECTED];
203  const failureRate = total > 0 ? failures / total : 0;
204
205  // Top failure sources (sorted by count, top 10)
206  const topFailures = Object.entries(data.failureSources)
207    .sort((a, b) => b[1] - a[1])
208    .slice(0, 10)
209    .map(([nodeId, count]) => ({ nodeId, count }));
210
211  // Peer health
212  const peers = Object.entries(data.peerResults).map(([domain, stats]) => ({
213    domain,
214    ok: stats.ok,
215    fail: stats.fail,
216    status: stats.fail === 0 ? "healthy" : stats.fail > stats.ok ? "degraded" : "mixed",
217  }));
218
219  const config = await getPulseConfig();
220
221  return {
222    timestamp: new Date().toISOString(),
223    window: since ? `${Math.round((Date.now() - since) / 1000)}s` : "full",
224    signals: data.totalSignals,
225    results: data.totalResults,
226    counts: data.counts,
227    failureRate: Math.round(failureRate * 1000) / 1000,
228    failureRateThreshold: config.failureRateThreshold,
229    elevated: failureRate > config.failureRateThreshold,
230    topFailureSources: topFailures,
231    peers,
232  };
233}
234
235// ─────────────────────────────────────────────────────────────────────────
236// SUMMARY WRITER
237// ─────────────────────────────────────────────────────────────────────────
238
239/**
240 * Build a human-readable summary from a health snapshot.
241 */
242export function formatSummary(snapshot) {
243  const lines = [];
244  lines.push(`Land Health Pulse (${snapshot.timestamp})`);
245  lines.push(`Window: ${snapshot.window}`);
246  lines.push(`Signals: ${snapshot.signals}, Results: ${snapshot.results}`);
247  lines.push("");
248
249  lines.push("Status Counts:");
250  for (const [status, count] of Object.entries(snapshot.counts)) {
251    if (count > 0) lines.push(`  ${status}: ${count}`);
252  }
253  lines.push("");
254
255  lines.push(`Failure Rate: ${(snapshot.failureRate * 100).toFixed(1)}%`);
256  if (snapshot.elevated) {
257    lines.push(`WARNING: Failure rate exceeds threshold (${(snapshot.failureRateThreshold * 100).toFixed(0)}%)`);
258  }
259
260  if (snapshot.topFailureSources.length > 0) {
261    lines.push("");
262    lines.push("Top Failure Sources:");
263    for (const src of snapshot.topFailureSources) {
264      lines.push(`  ${src.nodeId}: ${src.count} failures`);
265    }
266  }
267
268  if (snapshot.peers.length > 0) {
269    lines.push("");
270    lines.push("Peer Connections:");
271    for (const peer of snapshot.peers) {
272      lines.push(`  ${peer.domain}: ${peer.status} (${peer.ok} ok, ${peer.fail} fail)`);
273    }
274  }
275
276  return lines.join("\n");
277}
278
279/**
280 * Write the health snapshot to .pulse as a note and update metadata.
281 * Fires afterNote so other extensions can react to health changes.
282 */
283export async function writeSnapshot(snapshot) {
284  const nodeId = await ensurePulseNode();
285  const summary = formatSummary(snapshot);
286  const config = await getPulseConfig();
287
288  // Write structured data to .pulse metadata for fast access
289  await Node.findByIdAndUpdate(nodeId, {
290    $set: {
291      "metadata.pulse.latestSnapshot": snapshot,
292      "metadata.pulse.lastUpdated": snapshot.timestamp,
293    },
294  });
295
296  // Write summary as a note so the AI can read it through enrichContext
297  const note = new Note({
298    _id: uuidv4(),
299    contentType: CONTENT_TYPE.TEXT,
300    content: summary,
301    userId: SYSTEM_OWNER,
302    nodeId,
303    metadata: new Map([
304      ["source", "pulse"],
305      ["elevated", snapshot.elevated],
306      ["failureRate", snapshot.failureRate],
307    ]),
308  });
309  await note.save();
310
311  // Fire afterNote so other extensions can react
312  hooks.run("afterNote", {
313    note,
314    nodeId,
315    userId: SYSTEM_OWNER,
316    contentType: CONTENT_TYPE.TEXT,
317    sizeKB: Math.ceil(Buffer.byteLength(summary, "utf8") / 1024),
318    action: "create",
319  }).catch(() => {});
320
321  // Prune old pulse notes if over retention limit
322  const noteCount = await Note.countDocuments({ nodeId });
323  if (noteCount > config.maxNotesRetained) {
324    const oldest = await Note.find({ nodeId })
325      .sort({ createdAt: 1 })
326      .limit(noteCount - config.maxNotesRetained)
327      .select("_id");
328    const ids = oldest.map((n) => n._id);
329    await Note.deleteMany({ _id: { $in: ids } });
330  }
331
332  return { snapshot, noteId: note._id };
333}
334
335/**
336 * Get the latest health snapshot from .pulse metadata (no .flow query).
337 */
338export async function getLatestSnapshot() {
339  const nodeId = await ensurePulseNode();
340  const node = await Node.findById(nodeId).select("metadata").lean();
341  if (!node) return null;
342
343  const meta = node.metadata instanceof Map
344    ? node.metadata.get("pulse") || {}
345    : node.metadata?.pulse || {};
346
347  return meta.latestSnapshot || null;
348}
349
1import log from "../../seed/log.js";
2import { ensurePulseNode, getLatestSnapshot, getPulseNodeId } from "./core.js";
3import { startPulseJob, stopPulseJob } from "./job.js";
4
5export async function init(core) {
6  // Create .pulse node under land root if it doesn't exist
7  await ensurePulseNode();
8
9  // Inject health data into AI context when at or near land root
10  core.hooks.register("enrichContext", async ({ context, node, meta }) => {
11    // Only inject at land root level (when the AI is in land management context)
12    const pulseNodeId = getPulseNodeId();
13    if (!pulseNodeId) return;
14
15    const snapshot = await getLatestSnapshot();
16    if (snapshot) {
17      context.landHealth = {
18        failureRate: snapshot.failureRate,
19        elevated: snapshot.elevated,
20        signals: snapshot.signals,
21        results: snapshot.results,
22        lastUpdated: snapshot.timestamp,
23      };
24    }
25  }, "pulse");
26
27  const { default: router } = await import("./routes.js");
28
29  return {
30    router,
31    jobs: [
32      {
33        name: "pulse-health-check",
34        start: () => { startPulseJob(); },
35        stop: () => { stopPulseJob(); },
36      },
37    ],
38    exports: {
39      getLatestSnapshot,
40      getPulseNodeId,
41    },
42  };
43}
44
1/**
2 * Pulse Background Job
3 *
4 * Runs on a configurable interval (default 60 seconds).
5 * Queries .flow for all results since the last check,
6 * builds a health snapshot, and writes it to .pulse.
7 */
8
9import log from "../../seed/log.js";
10import { buildHealthSnapshot, writeSnapshot, getPulseConfig } from "./core.js";
11
12let jobTimer = null;
13
14async function tick() {
15  try {
16    const snapshot = await buildHealthSnapshot();
17
18    // Only write if there's any activity (avoid spamming empty notes on quiet lands)
19    if (snapshot.results > 0 || snapshot.elevated) {
20      await writeSnapshot(snapshot);
21    }
22  } catch (err) {
23    log.error("Pulse", `Health check failed: ${err.message}`);
24  }
25}
26
27export async function startPulseJob() {
28  if (jobTimer) clearInterval(jobTimer);
29
30  const config = await getPulseConfig();
31  const interval = config.intervalMs;
32
33  jobTimer = setInterval(tick, interval);
34  log.info("Pulse", `Health check started (interval: ${interval / 1000}s)`);
35
36  // Run once immediately on startup
37  tick();
38
39  return jobTimer;
40}
41
42export function stopPulseJob() {
43  if (jobTimer) {
44    clearInterval(jobTimer);
45    jobTimer = null;
46    log.info("Pulse", "Health check stopped");
47  }
48}
49
1export default {
2  name: "pulse",
3  version: "1.0.1",
4  builtFor: "treeos-cascade",
5  description:
6    "How the land knows its own health. Reads .flow on a timer, counts results by status, " +
7    "tracks rates over time, and writes a health summary to a .pulse node under the land root. " +
8    "The background job runs on a configurable interval (default 60 seconds), queries .flow for " +
9    "all results since the last check, counts succeeded/failed/rejected/queued/partial/awaiting, " +
10    "calculates failure rate, tracks which nodes produce the most failures, and tracks which " +
11    "cross-land connections are healthy or degraded. The summary writes to .pulse as a note so " +
12    "the AI can read it through enrichContext when asked about land health. The AI does not need " +
13    "a dashboard. It reads the pulse node and tells you what is happening in natural language. " +
14    "Also fires afterNote on the .pulse node so other extensions can react to health changes. " +
15    "If failure rate spikes, another extension could pause cascade, alert the operator, or trigger " +
16    "a compression run on overloaded trees.",
17
18  needs: {
19    models: ["Node"],
20  },
21
22  optional: {},
23
24  provides: {
25    models: {},
26    routes: "./routes.js",
27    tools: false,
28    jobs: true,
29    orchestrator: false,
30    energyActions: {},
31    sessionTypes: {},
32    env: [],
33
34    cli: [
35      {
36        command: "pulse [action]", scope: ["tree","land"],
37        description: "Land health. No action shows latest snapshot. Actions: history, peers.",
38        method: "GET",
39        endpoint: "/pulse",
40        subcommands: {
41          "history": {
42            method: "GET",
43            endpoint: "/pulse/history",
44            description: "Last 10 health snapshots. Trend view.",
45          },
46          "peers": {
47            method: "GET",
48            endpoint: "/pulse/peers",
49            description: "Peer-specific health. Healthy, degraded, mixed.",
50          },
51        },
52      },
53    ],
54
55    hooks: {
56      fires: [],
57      listens: ["enrichContext"],
58    },
59  },
60};
61
1import express from "express";
2import authenticate from "../../seed/middleware/authenticate.js";
3import { sendOk, sendError, ERR } from "../../seed/protocol.js";
4import { getLatestSnapshot, ensurePulseNode } from "./core.js";
5import Node from "../../seed/models/node.js";
6import Note from "../../seed/models/note.js";
7
8const router = express.Router();
9
10// GET /pulse - latest health snapshot (CLI endpoint)
11router.get("/pulse", authenticate, async (req, res) => {
12  try {
13    const snapshot = await getLatestSnapshot();
14    if (!snapshot) {
15      return sendOk(res, { message: "No pulse data yet. Health check has not run." });
16    }
17    sendOk(res, snapshot);
18  } catch (err) {
19    sendError(res, 500, ERR.INTERNAL, err.message);
20  }
21});
22
23// GET /pulse/history - last 10 snapshots
24router.get("/pulse/history", authenticate, async (req, res) => {
25  try {
26    const nodeId = await ensurePulseNode();
27    const notes = await Note.find({ nodeId, contentType: "text" })
28      .sort({ createdAt: -1 })
29      .limit(10)
30      .select("content createdAt metadata")
31      .lean();
32
33    const snapshots = notes.map((n) => ({
34      timestamp: n.createdAt,
35      elevated: n.metadata instanceof Map ? n.metadata.get("elevated") : n.metadata?.elevated,
36      failureRate: n.metadata instanceof Map ? n.metadata.get("failureRate") : n.metadata?.failureRate,
37      summary: n.content,
38    }));
39
40    sendOk(res, { count: snapshots.length, history: snapshots });
41  } catch (err) {
42    sendError(res, 500, ERR.INTERNAL, err.message);
43  }
44});
45
46// GET /pulse/peers - peer-specific health
47router.get("/pulse/peers", authenticate, async (req, res) => {
48  try {
49    const snapshot = await getLatestSnapshot();
50    if (!snapshot) return sendOk(res, { peers: [] });
51    sendOk(res, { peers: snapshot.peers || [] });
52  } catch (err) {
53    sendError(res, 500, ERR.INTERNAL, err.message);
54  }
55});
56
57export default router;
58

Versions

Version Published Downloads
1.0.1 38d ago 0
1.0.0 48d ago 0
0 stars
0 flags
React from the CLI: treeos ext star pulse

Comments

Loading comments...

Post comments from the CLI: treeos ext comment pulse "your comment"
Max 3 comments per extension. One star and one flag per user.