1/**
2 * Pulse Core
3 *
4 * Queries .flow partitions, counts cascade results by status,
5 * tracks failure sources, calculates rates, and builds a health summary.
6 */
7
8import log from "../../seed/log.js";
9import Node from "../../seed/models/node.js";
10import Note from "../../seed/models/note.js";
11import { SYSTEM_ROLE, SYSTEM_OWNER, CASCADE, CONTENT_TYPE } from "../../seed/protocol.js";
12import { hooks } from "../../seed/hooks.js";
13import { v4 as uuidv4 } from "uuid";
14
15// ─────────────────────────────────────────────────────────────────────────
16// PULSE NODE
17// ─────────────────────────────────────────────────────────────────────────
18
19let pulseNodeId = null;
20
21/**
22 * Find or create the .pulse node under the land root.
23 * Regular node (no systemRole). Created once at install.
24 */
25export async function ensurePulseNode() {
26 if (pulseNodeId) {
27 const exists = await Node.findById(pulseNodeId).select("_id").lean();
28 if (exists) return pulseNodeId;
29 }
30
31 const landRoot = await Node.findOne({ systemRole: SYSTEM_ROLE.LAND_ROOT }).select("_id children").lean();
32 if (!landRoot) throw new Error("Land root not found");
33
34 // Check if .pulse already exists as a child
35 for (const childId of landRoot.children || []) {
36 const child = await Node.findById(childId).select("_id name").lean();
37 if (child && child.name === ".pulse") {
38 pulseNodeId = child._id;
39 return pulseNodeId;
40 }
41 }
42
43 // Create .pulse under land root
44 const { createSystemNode } = await import("../../seed/tree/treeManagement.js");
45 const pulseNode = await createSystemNode({
46 name: ".pulse",
47 parentId: landRoot._id,
48 metadata: new Map([["pulse", { installedAt: new Date().toISOString() }]]),
49 });
50
51 pulseNodeId = String(pulseNode._id);
52 log.info("Pulse", `Created .pulse node: ${pulseNodeId}`);
53 return pulseNodeId;
54}
55
56export function getPulseNodeId() {
57 return pulseNodeId;
58}
59
60// ─────────────────────────────────────────────────────────────────────────
61// PULSE CONFIG (stored on .config metadata.pulse)
62// ─────────────────────────────────────────────────────────────────────────
63
64export async function getPulseConfig() {
65 const configNode = await Node.findOne({ systemRole: SYSTEM_ROLE.CONFIG }).select("metadata").lean();
66 if (!configNode) return defaults();
67
68 const meta = configNode.metadata instanceof Map
69 ? configNode.metadata.get("pulse") || {}
70 : configNode.metadata?.pulse || {};
71
72 return {
73 intervalMs: meta.intervalMs ?? 60000,
74 maxNotesRetained: meta.maxNotesRetained ?? 100,
75 failureRateThreshold: meta.failureRateThreshold ?? 0.3,
76 };
77}
78
79function defaults() {
80 return {
81 intervalMs: 60000,
82 maxNotesRetained: 100,
83 failureRateThreshold: 0.3,
84 };
85}
86
87// ─────────────────────────────────────────────────────────────────────────
88// HEALTH QUERY
89// ─────────────────────────────────────────────────────────────────────────
90
91// Track the last check time so we only count new results
92let lastCheckTime = null;
93
94/**
95 * Query .flow for results since the last check.
96 * Returns raw counts and source-level failure tracking.
97 */
98export async function queryFlowSince(since) {
99 const flowNode = await Node.findOne({ systemRole: SYSTEM_ROLE.FLOW }).select("_id").lean();
100 if (!flowNode) return emptySnapshot();
101
102 // Load today's and yesterday's partitions (covers the boundary)
103 const today = new Date().toISOString().slice(0, 10);
104 const yesterday = new Date(Date.now() - 86400000).toISOString().slice(0, 10);
105
106 const partitions = await Node.find({
107 parent: flowNode._id,
108 name: { $in: [today, yesterday] },
109 }).select("metadata").lean();
110
111 const counts = {
112 [CASCADE.SUCCEEDED]: 0,
113 [CASCADE.FAILED]: 0,
114 [CASCADE.REJECTED]: 0,
115 [CASCADE.QUEUED]: 0,
116 [CASCADE.PARTIAL]: 0,
117 [CASCADE.AWAITING]: 0,
118 };
119
120 const failureSources = {}; // nodeId -> failure count
121 const peerResults = {}; // peer domain -> { ok, fail }
122 let totalSignals = 0;
123 let totalResults = 0;
124
125 for (const partition of partitions) {
126 const results = partition.metadata instanceof Map
127 ? partition.metadata.get("results") || {}
128 : partition.metadata?.results || {};
129
130 for (const [signalId, entries] of Object.entries(results)) {
131 if (!Array.isArray(entries)) continue;
132
133 for (const entry of entries) {
134 // Filter by time if since is set
135 if (since && entry.timestamp) {
136 const ts = new Date(entry.timestamp).getTime();
137 if (ts < since) continue;
138 }
139
140 totalResults++;
141
142 if (entry.status && counts[entry.status] !== undefined) {
143 counts[entry.status]++;
144 }
145
146 // Track failure sources
147 if (entry.status === CASCADE.FAILED || entry.status === CASCADE.REJECTED) {
148 const src = entry.source || "unknown";
149 failureSources[src] = (failureSources[src] || 0) + 1;
150 }
151
152 // Track cross-land peer results
153 if (entry.payload?.peer) {
154 const domain = entry.payload.peer;
155 if (!peerResults[domain]) peerResults[domain] = { ok: 0, fail: 0 };
156 if (entry.status === CASCADE.SUCCEEDED) {
157 peerResults[domain].ok++;
158 } else {
159 peerResults[domain].fail++;
160 }
161 }
162 }
163
164 totalSignals++;
165 }
166 }
167
168 return { counts, failureSources, peerResults, totalSignals, totalResults };
169}
170
171function emptySnapshot() {
172 return {
173 counts: {
174 [CASCADE.SUCCEEDED]: 0,
175 [CASCADE.FAILED]: 0,
176 [CASCADE.REJECTED]: 0,
177 [CASCADE.QUEUED]: 0,
178 [CASCADE.PARTIAL]: 0,
179 [CASCADE.AWAITING]: 0,
180 },
181 failureSources: {},
182 peerResults: {},
183 totalSignals: 0,
184 totalResults: 0,
185 };
186}
187
188// ─────────────────────────────────────────────────────────────────────────
189// HEALTH SNAPSHOT
190// ─────────────────────────────────────────────────────────────────────────
191
192/**
193 * Build a health snapshot from .flow data since the last check.
194 * Updates lastCheckTime.
195 */
196export async function buildHealthSnapshot() {
197 const since = lastCheckTime;
198 lastCheckTime = Date.now();
199
200 const data = await queryFlowSince(since);
201 const total = data.totalResults;
202 const failures = data.counts[CASCADE.FAILED] + data.counts[CASCADE.REJECTED];
203 const failureRate = total > 0 ? failures / total : 0;
204
205 // Top failure sources (sorted by count, top 10)
206 const topFailures = Object.entries(data.failureSources)
207 .sort((a, b) => b[1] - a[1])
208 .slice(0, 10)
209 .map(([nodeId, count]) => ({ nodeId, count }));
210
211 // Peer health
212 const peers = Object.entries(data.peerResults).map(([domain, stats]) => ({
213 domain,
214 ok: stats.ok,
215 fail: stats.fail,
216 status: stats.fail === 0 ? "healthy" : stats.fail > stats.ok ? "degraded" : "mixed",
217 }));
218
219 const config = await getPulseConfig();
220
221 return {
222 timestamp: new Date().toISOString(),
223 window: since ? `${Math.round((Date.now() - since) / 1000)}s` : "full",
224 signals: data.totalSignals,
225 results: data.totalResults,
226 counts: data.counts,
227 failureRate: Math.round(failureRate * 1000) / 1000,
228 failureRateThreshold: config.failureRateThreshold,
229 elevated: failureRate > config.failureRateThreshold,
230 topFailureSources: topFailures,
231 peers,
232 };
233}
234
235// ─────────────────────────────────────────────────────────────────────────
236// SUMMARY WRITER
237// ─────────────────────────────────────────────────────────────────────────
238
239/**
240 * Build a human-readable summary from a health snapshot.
241 */
242export function formatSummary(snapshot) {
243 const lines = [];
244 lines.push(`Land Health Pulse (${snapshot.timestamp})`);
245 lines.push(`Window: ${snapshot.window}`);
246 lines.push(`Signals: ${snapshot.signals}, Results: ${snapshot.results}`);
247 lines.push("");
248
249 lines.push("Status Counts:");
250 for (const [status, count] of Object.entries(snapshot.counts)) {
251 if (count > 0) lines.push(` ${status}: ${count}`);
252 }
253 lines.push("");
254
255 lines.push(`Failure Rate: ${(snapshot.failureRate * 100).toFixed(1)}%`);
256 if (snapshot.elevated) {
257 lines.push(`WARNING: Failure rate exceeds threshold (${(snapshot.failureRateThreshold * 100).toFixed(0)}%)`);
258 }
259
260 if (snapshot.topFailureSources.length > 0) {
261 lines.push("");
262 lines.push("Top Failure Sources:");
263 for (const src of snapshot.topFailureSources) {
264 lines.push(` ${src.nodeId}: ${src.count} failures`);
265 }
266 }
267
268 if (snapshot.peers.length > 0) {
269 lines.push("");
270 lines.push("Peer Connections:");
271 for (const peer of snapshot.peers) {
272 lines.push(` ${peer.domain}: ${peer.status} (${peer.ok} ok, ${peer.fail} fail)`);
273 }
274 }
275
276 return lines.join("\n");
277}
278
279/**
280 * Write the health snapshot to .pulse as a note and update metadata.
281 * Fires afterNote so other extensions can react to health changes.
282 */
283export async function writeSnapshot(snapshot) {
284 const nodeId = await ensurePulseNode();
285 const summary = formatSummary(snapshot);
286 const config = await getPulseConfig();
287
288 // Write structured data to .pulse metadata for fast access
289 await Node.findByIdAndUpdate(nodeId, {
290 $set: {
291 "metadata.pulse.latestSnapshot": snapshot,
292 "metadata.pulse.lastUpdated": snapshot.timestamp,
293 },
294 });
295
296 // Write summary as a note so the AI can read it through enrichContext
297 const note = new Note({
298 _id: uuidv4(),
299 contentType: CONTENT_TYPE.TEXT,
300 content: summary,
301 userId: SYSTEM_OWNER,
302 nodeId,
303 metadata: new Map([
304 ["source", "pulse"],
305 ["elevated", snapshot.elevated],
306 ["failureRate", snapshot.failureRate],
307 ]),
308 });
309 await note.save();
310
311 // Fire afterNote so other extensions can react
312 hooks.run("afterNote", {
313 note,
314 nodeId,
315 userId: SYSTEM_OWNER,
316 contentType: CONTENT_TYPE.TEXT,
317 sizeKB: Math.ceil(Buffer.byteLength(summary, "utf8") / 1024),
318 action: "create",
319 }).catch(() => {});
320
321 // Prune old pulse notes if over retention limit
322 const noteCount = await Note.countDocuments({ nodeId });
323 if (noteCount > config.maxNotesRetained) {
324 const oldest = await Note.find({ nodeId })
325 .sort({ createdAt: 1 })
326 .limit(noteCount - config.maxNotesRetained)
327 .select("_id");
328 const ids = oldest.map((n) => n._id);
329 await Note.deleteMany({ _id: { $in: ids } });
330 }
331
332 return { snapshot, noteId: note._id };
333}
334
335/**
336 * Get the latest health snapshot from .pulse metadata (no .flow query).
337 */
338export async function getLatestSnapshot() {
339 const nodeId = await ensurePulseNode();
340 const node = await Node.findById(nodeId).select("metadata").lean();
341 if (!node) return null;
342
343 const meta = node.metadata instanceof Map
344 ? node.metadata.get("pulse") || {}
345 : node.metadata?.pulse || {};
346
347 return meta.latestSnapshot || null;
348}
349
1import log from "../../seed/log.js";
2import { ensurePulseNode, getLatestSnapshot, getPulseNodeId } from "./core.js";
3import { startPulseJob, stopPulseJob } from "./job.js";
4
5export async function init(core) {
6 // Create .pulse node under land root if it doesn't exist
7 await ensurePulseNode();
8
9 // Inject health data into AI context when at or near land root
10 core.hooks.register("enrichContext", async ({ context, node, meta }) => {
11 // Only inject at land root level (when the AI is in land management context)
12 const pulseNodeId = getPulseNodeId();
13 if (!pulseNodeId) return;
14
15 const snapshot = await getLatestSnapshot();
16 if (snapshot) {
17 context.landHealth = {
18 failureRate: snapshot.failureRate,
19 elevated: snapshot.elevated,
20 signals: snapshot.signals,
21 results: snapshot.results,
22 lastUpdated: snapshot.timestamp,
23 };
24 }
25 }, "pulse");
26
27 const { default: router } = await import("./routes.js");
28
29 return {
30 router,
31 jobs: [
32 {
33 name: "pulse-health-check",
34 start: () => { startPulseJob(); },
35 stop: () => { stopPulseJob(); },
36 },
37 ],
38 exports: {
39 getLatestSnapshot,
40 getPulseNodeId,
41 },
42 };
43}
44
1/**
2 * Pulse Background Job
3 *
4 * Runs on a configurable interval (default 60 seconds).
5 * Queries .flow for all results since the last check,
6 * builds a health snapshot, and writes it to .pulse.
7 */
8
9import log from "../../seed/log.js";
10import { buildHealthSnapshot, writeSnapshot, getPulseConfig } from "./core.js";
11
12let jobTimer = null;
13
14async function tick() {
15 try {
16 const snapshot = await buildHealthSnapshot();
17
18 // Only write if there's any activity (avoid spamming empty notes on quiet lands)
19 if (snapshot.results > 0 || snapshot.elevated) {
20 await writeSnapshot(snapshot);
21 }
22 } catch (err) {
23 log.error("Pulse", `Health check failed: ${err.message}`);
24 }
25}
26
27export async function startPulseJob() {
28 if (jobTimer) clearInterval(jobTimer);
29
30 const config = await getPulseConfig();
31 const interval = config.intervalMs;
32
33 jobTimer = setInterval(tick, interval);
34 log.info("Pulse", `Health check started (interval: ${interval / 1000}s)`);
35
36 // Run once immediately on startup
37 tick();
38
39 return jobTimer;
40}
41
42export function stopPulseJob() {
43 if (jobTimer) {
44 clearInterval(jobTimer);
45 jobTimer = null;
46 log.info("Pulse", "Health check stopped");
47 }
48}
49
1export default {
2 name: "pulse",
3 version: "1.0.1",
4 builtFor: "treeos-cascade",
5 description:
6 "How the land knows its own health. Reads .flow on a timer, counts results by status, " +
7 "tracks rates over time, and writes a health summary to a .pulse node under the land root. " +
8 "The background job runs on a configurable interval (default 60 seconds), queries .flow for " +
9 "all results since the last check, counts succeeded/failed/rejected/queued/partial/awaiting, " +
10 "calculates failure rate, tracks which nodes produce the most failures, and tracks which " +
11 "cross-land connections are healthy or degraded. The summary writes to .pulse as a note so " +
12 "the AI can read it through enrichContext when asked about land health. The AI does not need " +
13 "a dashboard. It reads the pulse node and tells you what is happening in natural language. " +
14 "Also fires afterNote on the .pulse node so other extensions can react to health changes. " +
15 "If failure rate spikes, another extension could pause cascade, alert the operator, or trigger " +
16 "a compression run on overloaded trees.",
17
18 needs: {
19 models: ["Node"],
20 },
21
22 optional: {},
23
24 provides: {
25 models: {},
26 routes: "./routes.js",
27 tools: false,
28 jobs: true,
29 orchestrator: false,
30 energyActions: {},
31 sessionTypes: {},
32 env: [],
33
34 cli: [
35 {
36 command: "pulse [action]", scope: ["tree","land"],
37 description: "Land health. No action shows latest snapshot. Actions: history, peers.",
38 method: "GET",
39 endpoint: "/pulse",
40 subcommands: {
41 "history": {
42 method: "GET",
43 endpoint: "/pulse/history",
44 description: "Last 10 health snapshots. Trend view.",
45 },
46 "peers": {
47 method: "GET",
48 endpoint: "/pulse/peers",
49 description: "Peer-specific health. Healthy, degraded, mixed.",
50 },
51 },
52 },
53 ],
54
55 hooks: {
56 fires: [],
57 listens: ["enrichContext"],
58 },
59 },
60};
61
1import express from "express";
2import authenticate from "../../seed/middleware/authenticate.js";
3import { sendOk, sendError, ERR } from "../../seed/protocol.js";
4import { getLatestSnapshot, ensurePulseNode } from "./core.js";
5import Node from "../../seed/models/node.js";
6import Note from "../../seed/models/note.js";
7
8const router = express.Router();
9
10// GET /pulse - latest health snapshot (CLI endpoint)
11router.get("/pulse", authenticate, async (req, res) => {
12 try {
13 const snapshot = await getLatestSnapshot();
14 if (!snapshot) {
15 return sendOk(res, { message: "No pulse data yet. Health check has not run." });
16 }
17 sendOk(res, snapshot);
18 } catch (err) {
19 sendError(res, 500, ERR.INTERNAL, err.message);
20 }
21});
22
23// GET /pulse/history - last 10 snapshots
24router.get("/pulse/history", authenticate, async (req, res) => {
25 try {
26 const nodeId = await ensurePulseNode();
27 const notes = await Note.find({ nodeId, contentType: "text" })
28 .sort({ createdAt: -1 })
29 .limit(10)
30 .select("content createdAt metadata")
31 .lean();
32
33 const snapshots = notes.map((n) => ({
34 timestamp: n.createdAt,
35 elevated: n.metadata instanceof Map ? n.metadata.get("elevated") : n.metadata?.elevated,
36 failureRate: n.metadata instanceof Map ? n.metadata.get("failureRate") : n.metadata?.failureRate,
37 summary: n.content,
38 }));
39
40 sendOk(res, { count: snapshots.length, history: snapshots });
41 } catch (err) {
42 sendError(res, 500, ERR.INTERNAL, err.message);
43 }
44});
45
46// GET /pulse/peers - peer-specific health
47router.get("/pulse/peers", authenticate, async (req, res) => {
48 try {
49 const snapshot = await getLatestSnapshot();
50 if (!snapshot) return sendOk(res, { peers: [] });
51 sendOk(res, { peers: snapshot.peers || [] });
52 } catch (err) {
53 sendError(res, 500, ERR.INTERNAL, err.message);
54 }
55});
56
57export default router;
58
Loading comments...