1/**
2 * Mycelium Core
3 *
4 * Intelligent cross-land signal routing.
5 * Reads signal metadata + destination land profiles.
6 * Routes where signals would be useful. Ignores where they wouldn't.
7 */
8
9import log from "../../seed/log.js";
10import Node from "../../seed/models/node.js";
11import { SYSTEM_ROLE } from "../../seed/protocol.js";
12import { getLandIdentity } from "../../canopy/identity.js";
13import { parseJsonSafe } from "../../seed/orchestrators/helpers.js";
14
15let _runChat = null;
16export function setRunChat(fn) { _runChat = fn; }
17
18// ─────────────────────────────────────────────────────────────────────────
19// CONFIG
20// ─────────────────────────────────────────────────────────────────────────
21
22const DEFAULTS = {
23 routingThreshold: 0.5,
24 maxHopsPerSignal: 3,
25 routingMode: "selective",
26 routingInterval: 60000,
27 maxSignalsPerCycle: 100,
28 maxRoutingLogEntries: 200,
29};
30
31export async function getMyceliumConfig() {
32 const configNode = await Node.findOne({ systemRole: SYSTEM_ROLE.CONFIG }).select("metadata").lean();
33 if (!configNode) return { ...DEFAULTS };
34 const meta = configNode.metadata instanceof Map
35 ? configNode.metadata.get("mycelium") || {}
36 : configNode.metadata?.mycelium || {};
37 return { ...DEFAULTS, ...meta };
38}
39
40export function getThisLandId() {
41 try {
42 return getLandIdentity().landId;
43 } catch {
44 return null;
45 }
46}
47
48// ─────────────────────────────────────────────────────────────────────────
49// SIGNAL BUFFER
50// ─────────────────────────────────────────────────────────────────────────
51
52const _signalBuffer = [];
53
54export function bufferSignal(signal) {
55 _signalBuffer.push(signal);
56}
57
58export function drainBuffer(max) {
59 return _signalBuffer.splice(0, max);
60}
61
62export function bufferSize() {
63 return _signalBuffer.length;
64}
65
66// ─────────────────────────────────────────────────────────────────────────
67// PEER PROFILING
68// ─────────────────────────────────────────────────────────────────────────
69
70/**
71 * Build a routing profile for a peer from its LandPeer document.
72 * Zero network calls. Reads what heartbeat already cached.
73 */
74export function buildPeerProfile(peer) {
75 return {
76 domain: peer.domain,
77 landId: peer.landId,
78 baseUrl: peer.baseUrl || `https://${peer.domain}`,
79 extensions: new Set(peer.extensions || []),
80 status: peer.status,
81 healthy: peer.status === "active" || peer.status === "degraded",
82 lastSeen: peer.lastSeenAt || peer.lastSuccessAt,
83 };
84}
85
86// ─────────────────────────────────────────────────────────────────────────
87// SIGNAL SCORING
88// ─────────────────────────────────────────────────────────────────────────
89
90/**
91 * Extract extension namespaces from a signal payload.
92 * Same logic as gap-detection but we're matching, not reporting.
93 */
94function extractSignalNamespaces(payload) {
95 const namespaces = new Set();
96 if (payload?.metadata && typeof payload.metadata === "object") {
97 for (const key of Object.keys(payload.metadata)) {
98 if (!key.startsWith("_")) namespaces.add(key);
99 }
100 }
101 if (payload?.extensionData && typeof payload.extensionData === "object") {
102 for (const key of Object.keys(payload.extensionData)) {
103 if (!key.startsWith("_")) namespaces.add(key);
104 }
105 }
106 return namespaces;
107}
108
109/**
110 * Score how relevant a signal is for a destination peer.
111 * Returns 0.0 to 1.0.
112 */
113export function scoreSignalForPeer(signal, peerProfile, sourceLandId) {
114 // Never route back to source
115 if (peerProfile.landId === sourceLandId) return 0;
116
117 // Dead or unreachable peers get nothing
118 if (!peerProfile.healthy) return 0;
119
120 let score = 0;
121 const reasons = [];
122
123 const signalNamespaces = extractSignalNamespaces(signal.payload);
124 const signalTags = new Set(signal.payload?.tags || []);
125
126 // Extension match: does the peer have extensions that would process this signal?
127 if (signalNamespaces.size > 0 && peerProfile.extensions.size > 0) {
128 let matches = 0;
129 for (const ns of signalNamespaces) {
130 if (peerProfile.extensions.has(ns)) matches++;
131 }
132 if (matches > 0) {
133 const matchScore = (matches / signalNamespaces.size) * 0.3;
134 score += matchScore;
135 reasons.push(`ext match ${matches}/${signalNamespaces.size}`);
136 }
137 }
138
139 // Tag match: do signal tags intersect with peer's extension names?
140 // Extensions often match tag categories (fitness extension cares about fitness tags)
141 if (signalTags.size > 0 && peerProfile.extensions.size > 0) {
142 let tagMatches = 0;
143 for (const tag of signalTags) {
144 if (peerProfile.extensions.has(tag)) tagMatches++;
145 }
146 if (tagMatches > 0) {
147 score += (tagMatches / signalTags.size) * 0.2;
148 reasons.push(`tag match ${tagMatches}/${signalTags.size}`);
149 }
150 }
151
152 // Gap match: peer is missing extensions this signal carries data for.
153 // This is the reverse of extension match. The signal has data the peer can't process
154 // but WANTS to process (they'd install the extension if they knew).
155 if (signalNamespaces.size > 0) {
156 let gapMatches = 0;
157 for (const ns of signalNamespaces) {
158 if (!peerProfile.extensions.has(ns)) gapMatches++;
159 }
160 // Only count gaps if the peer has SOME relevant extensions (not totally unrelated)
161 if (gapMatches > 0 && score > 0) {
162 score += Math.min(gapMatches * 0.1, 0.3);
163 reasons.push(`gap signal ${gapMatches} namespaces`);
164 }
165 }
166
167 return { score: Math.min(score, 1.0), reasons };
168}
169
170// ─────────────────────────────────────────────────────────────────────────
171// ROUTING
172// ─────────────────────────────────────────────────────────────────────────
173
174/**
175 * Route a batch of signals to qualifying peers.
176 * Returns routing decisions for logging.
177 */
178export async function routeBatch(signals, config) {
179 let LandPeer;
180 try {
181 const mod = await import("../../canopy/models/landPeer.js");
182 LandPeer = mod.default;
183 } catch {
184 return [];
185 }
186
187 const peers = await LandPeer.find({ status: { $in: ["active", "degraded"] } }).lean();
188 if (peers.length === 0) return [];
189
190 const profiles = peers.map(buildPeerProfile);
191 const thisLandId = getThisLandId();
192 const decisions = [];
193
194 let signCanopyToken;
195 let getPeerBaseUrl;
196 try {
197 signCanopyToken = (await import("../../canopy/identity.js")).signCanopyToken;
198 getPeerBaseUrl = (await import("../../canopy/peers.js")).getPeerBaseUrl;
199 } catch {
200 return [];
201 }
202
203 for (const signal of signals) {
204 const sourceLandId = signal.payload?._sourceLandId || signal.source;
205
206 for (const profile of profiles) {
207 const { score, reasons } = scoreSignalForPeer(signal, profile, sourceLandId);
208
209 if (score < config.routingThreshold) continue;
210
211 // Deliver
212 try {
213 const token = await signCanopyToken("system", profile.domain);
214 const baseUrl = getPeerBaseUrl({ baseUrl: profile.baseUrl, domain: profile.domain });
215 const url = `${baseUrl}/api/v1/node/${signal.nodeId}/cascade`;
216
217 // Build routed payload with hop tracking + loop prevention
218 const routedPayload = {
219 ...signal.payload,
220 _myceliumHops: (signal.payload._myceliumHops || 0) + 1,
221 _myceliumRouted: [...(signal.payload._myceliumRouted || []), thisLandId],
222 _sourceLandId: sourceLandId || thisLandId,
223 };
224
225 const res = await fetch(url, {
226 method: "POST",
227 headers: {
228 "Content-Type": "application/json",
229 "Authorization": `Bearer ${token}`,
230 },
231 body: JSON.stringify({
232 signalId: signal.signalId,
233 payload: routedPayload,
234 source: signal.nodeId,
235 depth: (signal.depth || 0) + 1,
236 }),
237 signal: AbortSignal.timeout(15000),
238 });
239
240 decisions.push({
241 signalId: signal.signalId,
242 destination: profile.domain,
243 score: Math.round(score * 100) / 100,
244 reasons,
245 delivered: res.ok,
246 httpStatus: res.status,
247 timestamp: new Date().toISOString(),
248 });
249
250 if (res.ok) {
251 log.debug("Mycelium", `Routed ${signal.signalId.slice(0, 8)} to ${profile.domain} (score: ${score.toFixed(2)})`);
252 }
253 } catch (err) {
254 decisions.push({
255 signalId: signal.signalId,
256 destination: profile.domain,
257 score: Math.round(score * 100) / 100,
258 reasons,
259 delivered: false,
260 error: err.message,
261 timestamp: new Date().toISOString(),
262 });
263 }
264 }
265 }
266
267 return decisions;
268}
269
270/**
271 * AI routing for ambiguous signals (selective mode).
272 * Sends the batch to the AI for scoring.
273 */
274export async function aiRoute(signals, profiles, userId) {
275 if (!_runChat || signals.length === 0 || profiles.length === 0) return [];
276
277 const signalSummary = signals.map(s => ({
278 signalId: s.signalId,
279 tags: s.payload?.tags || [],
280 namespaces: [...extractSignalNamespaces(s.payload)],
281 }));
282
283 const peerSummary = profiles.map(p => ({
284 domain: p.domain,
285 extensions: [...p.extensions].slice(0, 20),
286 healthy: p.healthy,
287 }));
288
289 const prompt =
290 `You are a mycelium routing node deciding which signals go to which lands.\n\n` +
291 `Signals:\n${JSON.stringify(signalSummary, null, 0)}\n\n` +
292 `Connected lands:\n${JSON.stringify(peerSummary, null, 0)}\n\n` +
293 `For each signal, which lands should receive it? Return JSON array:\n` +
294 `[{ "signalId": "...", "destinations": ["domain1"], "reasoning": "brief" }]\n` +
295 `Only route where the destination has extensions or context to process the signal.`;
296
297 try {
298 const { answer } = await _runChat({
299 userId: userId || "system",
300 username: "mycelium",
301 message: prompt,
302 mode: "home:default",
303 slot: "mycelium",
304 });
305 if (!answer) return [];
306 return parseJsonSafe(answer) || [];
307 } catch {
308 return [];
309 }
310}
311
312// ─────────────────────────────────────────────────────────────────────────
313// ROUTING LOG
314// ─────────────────────────────────────────────────────────────────────────
315
316/**
317 * Write routing decisions to the mycelium routing log.
318 * Stored on the land root's metadata.mycelium.routingLog (rolling).
319 */
320export async function logDecisions(decisions) {
321 if (decisions.length === 0) return;
322
323 const config = await getMyceliumConfig();
324 const landRoot = await Node.findOne({ systemRole: SYSTEM_ROLE.LAND_ROOT }).select("_id").lean();
325 if (!landRoot) return;
326
327 await Node.findByIdAndUpdate(landRoot._id, {
328 $push: {
329 "metadata.mycelium.routingLog": {
330 $each: decisions,
331 $slice: -(config.maxRoutingLogEntries),
332 },
333 },
334 $set: {
335 "metadata.mycelium.lastRoutingCycle": new Date().toISOString(),
336 },
337 $inc: {
338 "metadata.mycelium.totalRouted": decisions.filter(d => d.delivered).length,
339 },
340 });
341}
342
343// ─────────────────────────────────────────────────────────────────────────
344// STATUS
345// ─────────────────────────────────────────────────────────────────────────
346
347export async function getMyceliumStatus() {
348 let LandPeer;
349 try {
350 LandPeer = (await import("../../canopy/models/landPeer.js")).default;
351 } catch {
352 return { peers: 0, mode: "unknown" };
353 }
354
355 const config = await getMyceliumConfig();
356 const peers = await LandPeer.countDocuments({ status: { $in: ["active", "degraded"] } });
357
358 const landRoot = await Node.findOne({ systemRole: SYSTEM_ROLE.LAND_ROOT }).select("metadata").lean();
359 const meta = landRoot?.metadata instanceof Map
360 ? landRoot.metadata.get("mycelium") || {}
361 : landRoot?.metadata?.mycelium || {};
362
363 return {
364 peers,
365 routingMode: config.routingMode,
366 routingThreshold: config.routingThreshold,
367 maxHopsPerSignal: config.maxHopsPerSignal,
368 signalsBuffered: bufferSize(),
369 totalRouted: meta.totalRouted || 0,
370 lastCycle: meta.lastRoutingCycle || null,
371 };
372}
373
374export async function getRoutingLog(limit = 50) {
375 const landRoot = await Node.findOne({ systemRole: SYSTEM_ROLE.LAND_ROOT }).select("metadata").lean();
376 const meta = landRoot?.metadata instanceof Map
377 ? landRoot.metadata.get("mycelium") || {}
378 : landRoot?.metadata?.mycelium || {};
379 return (meta.routingLog || []).slice(-limit).reverse();
380}
381
1import log from "../../seed/log.js";
2import tools from "./tools.js";
3import {
4 setRunChat,
5 getMyceliumConfig,
6 getThisLandId,
7 bufferSignal,
8 drainBuffer,
9 routeBatch,
10 logDecisions,
11 getMyceliumStatus,
12} from "./core.js";
13
14export async function init(core) {
15 const BG = core.llm.LLM_PRIORITY.BACKGROUND;
16
17 core.llm.registerRootLlmSlot?.("mycelium");
18
19 setRunChat(async (opts) => {
20 if (opts.userId && opts.userId !== "SYSTEM" && !await core.llm.userHasLlm(opts.userId)) return { answer: null };
21 return core.llm.runChat({ ...opts, llmPriority: BG });
22 });
23
24 const config = await getMyceliumConfig();
25 const thisLandId = getThisLandId();
26
27 // ── onCascade: buffer incoming signals for batch routing ───────────
28 core.hooks.register("onCascade", async (hookData) => {
29 const { nodeId, signalId, payload, source, depth } = hookData;
30 if (!payload || !signalId) return;
31
32 // Loop prevention: if this land already routed this signal, skip
33 if (Array.isArray(payload._myceliumRouted) && payload._myceliumRouted.includes(thisLandId)) return;
34
35 // Hop limit: if too many hops, stop
36 if ((payload._myceliumHops || 0) >= config.maxHopsPerSignal) return;
37
38 // Buffer for batch routing
39 bufferSignal({ nodeId, signalId, payload, source, depth: depth || 0 });
40 }, "mycelium");
41
42 // ── Background job: batch routing every interval ───────────────────
43 let routingTimer = null;
44
45 async function routingCycle() {
46 try {
47 const signals = drainBuffer(config.maxSignalsPerCycle);
48 if (signals.length === 0) return;
49
50 const decisions = await routeBatch(signals, config);
51 await logDecisions(decisions);
52
53 const routed = decisions.filter(d => d.delivered).length;
54 if (routed > 0) {
55 log.verbose("Mycelium", `Routing cycle: ${routed}/${decisions.length} delivered from ${signals.length} signals`);
56 }
57 } catch (err) {
58 log.error("Mycelium", `Routing cycle failed: ${err.message}`);
59 }
60 }
61
62 // ── enrichContext: inject mycelium status at land root ──────────────
63 core.hooks.register("enrichContext", async ({ context, node }) => {
64 if (!node.systemRole) return; // only at land root
65 try {
66 const status = await getMyceliumStatus();
67 if (status.totalRouted > 0 || status.peers > 0) {
68 context.mycelium = {
69 peers: status.peers,
70 totalRouted: status.totalRouted,
71 mode: status.routingMode,
72 buffered: status.signalsBuffered,
73 };
74 }
75 } catch (err) {
76 log.debug("Mycelium", "enrichContext status injection failed:", err.message);
77 }
78 }, "mycelium");
79
80 const { default: router } = await import("./routes.js");
81
82 return {
83 router,
84 tools,
85 jobs: [
86 {
87 name: "mycelium-routing",
88 start: () => {
89 routingTimer = setInterval(routingCycle, config.routingInterval);
90 log.info("Mycelium", `Routing started (interval: ${config.routingInterval / 1000}s, mode: ${config.routingMode}, threshold: ${config.routingThreshold})`);
91 },
92 stop: () => {
93 if (routingTimer) clearInterval(routingTimer);
94 log.info("Mycelium", "Routing stopped");
95 },
96 },
97 ],
98 exports: {
99 getMyceliumStatus,
100 },
101 };
102}
103
1export default {
2 name: "mycelium",
3 version: "1.0.1",
4 builtFor: "seed",
5 scope: "confined",
6 description:
7 "The intelligent underground network. Routes cascade signals between peered lands based on " +
8 "observed need. Not a server. An extension any land can install to become a routing node. " +
9 "Reads signal metadata and destination land profiles (extension lists from heartbeat, gap " +
10 "detection data, evolution patterns) and makes intelligent routing decisions. A nutrition " +
11 "signal from Land B routes to Land A because Land A has been flagging missing nutrition data. " +
12 "It does not route to Land C because Land C has no relevant context. The router pays for its " +
13 "own routing intelligence. The source pays for producing the signal. The destination pays for " +
14 "processing it. Three levels: personal (your own trees), community (a lab or team), public " +
15 "(infrastructure for the network). The most connected node knows the most about the network. " +
16 ".flow is the water table. Canopy is trees reaching out. Mycelium is the forest underground.",
17
18 needs: {
19 services: ["llm", "hooks"],
20 extensions: ["propagation"],
21 },
22
23 optional: {
24 extensions: ["gap-detection", "evolution", "pulse", "perspective-filter", "codebook"],
25 },
26
27 provides: {
28 models: {},
29 routes: "./routes.js",
30 tools: true,
31 jobs: true,
32 orchestrator: false,
33 energyActions: {},
34 sessionTypes: {},
35 env: [],
36
37 cli: [
38 {
39 command: "mycelium [action]",
40 description: "Mycelium routing status. Actions: routes, peers, health.",
41 method: "GET",
42 endpoint: "/mycelium",
43 subcommands: {
44 "routes": {
45 method: "GET",
46 endpoint: "/mycelium/routes",
47 description: "Recent routing decisions with reasoning",
48 },
49 "peers": {
50 method: "GET",
51 endpoint: "/mycelium/peers",
52 description: "Connected lands with profiles",
53 },
54 "health": {
55 method: "GET",
56 endpoint: "/mycelium/health",
57 description: "Per-peer health assessment",
58 },
59 },
60 },
61 ],
62
63 hooks: {
64 fires: [],
65 listens: ["onCascade"],
66 },
67 },
68};
69
1import express from "express";
2import authenticate from "../../seed/middleware/authenticate.js";
3import { sendOk, sendError, ERR } from "../../seed/protocol.js";
4import { getMyceliumStatus, getRoutingLog, buildPeerProfile } from "./core.js";
5
6const router = express.Router();
7
8// GET /mycelium - status overview
9router.get("/mycelium", authenticate, async (req, res) => {
10 try {
11 const status = await getMyceliumStatus();
12 sendOk(res, status);
13 } catch (err) {
14 sendError(res, 500, ERR.INTERNAL, err.message);
15 }
16});
17
18// GET /mycelium/routes - recent routing decisions
19router.get("/mycelium/routes", authenticate, async (req, res) => {
20 try {
21 const limit = Math.min(parseInt(req.query.limit || "50", 10), 200);
22 const log = await getRoutingLog(limit);
23 sendOk(res, { count: log.length, decisions: log });
24 } catch (err) {
25 sendError(res, 500, ERR.INTERNAL, err.message);
26 }
27});
28
29// GET /mycelium/peers - connected lands with profiles
30router.get("/mycelium/peers", authenticate, async (req, res) => {
31 try {
32 let LandPeer;
33 try {
34 LandPeer = (await import("../../canopy/models/landPeer.js")).default;
35 } catch {
36 return sendOk(res, { peers: [] });
37 }
38
39 const peers = await LandPeer.find({ status: { $in: ["active", "degraded"] } }).lean();
40 const profiles = peers.map(p => {
41 const profile = buildPeerProfile(p);
42 return {
43 domain: profile.domain,
44 extensions: [...profile.extensions],
45 status: profile.status,
46 healthy: profile.healthy,
47 lastSeen: profile.lastSeen,
48 };
49 });
50
51 sendOk(res, { count: profiles.length, peers: profiles });
52 } catch (err) {
53 sendError(res, 500, ERR.INTERNAL, err.message);
54 }
55});
56
57// GET /mycelium/health - per-peer health
58router.get("/mycelium/health", authenticate, async (req, res) => {
59 try {
60 let LandPeer;
61 try {
62 LandPeer = (await import("../../canopy/models/landPeer.js")).default;
63 } catch {
64 return sendOk(res, { peers: [] });
65 }
66
67 const peers = await LandPeer.find().lean();
68 const health = peers.map(p => ({
69 domain: p.domain,
70 status: p.status,
71 consecutiveFailures: p.consecutiveFailures || 0,
72 lastSuccess: p.lastSuccessAt,
73 lastSeen: p.lastSeenAt,
74 extensionCount: (p.extensions || []).length,
75 }));
76
77 sendOk(res, {
78 total: health.length,
79 active: health.filter(h => h.status === "active").length,
80 degraded: health.filter(h => h.status === "degraded").length,
81 unreachable: health.filter(h => h.status === "unreachable").length,
82 peers: health,
83 });
84 } catch (err) {
85 sendError(res, 500, ERR.INTERNAL, err.message);
86 }
87});
88
89export default router;
90
1import { z } from "zod";
2import { getMyceliumStatus, getRoutingLog } from "./core.js";
3
4export default [
5 {
6 name: "mycelium-status",
7 description: "Mycelium routing status. Connected peers, signals routed, routing mode, buffer size.",
8 schema: {
9 userId: z.string().describe("Injected by server. Ignore."),
10 chatId: z.string().nullable().optional().describe("Injected by server. Ignore."),
11 sessionId: z.string().nullable().optional().describe("Injected by server. Ignore."),
12 },
13 annotations: { readOnlyHint: true, destructiveHint: false, idempotentHint: true, openWorldHint: false },
14 handler: async () => {
15 try {
16 const status = await getMyceliumStatus();
17 return { content: [{ type: "text", text: JSON.stringify(status, null, 2) }] };
18 } catch (err) {
19 return { content: [{ type: "text", text: `Failed: ${err.message}` }] };
20 }
21 },
22 },
23 {
24 name: "mycelium-routes",
25 description: "Recent routing decisions. Which signals went where and why.",
26 schema: {
27 limit: z.number().optional().default(20).describe("Max decisions to show."),
28 userId: z.string().describe("Injected by server. Ignore."),
29 chatId: z.string().nullable().optional().describe("Injected by server. Ignore."),
30 sessionId: z.string().nullable().optional().describe("Injected by server. Ignore."),
31 },
32 annotations: { readOnlyHint: true, destructiveHint: false, idempotentHint: true, openWorldHint: false },
33 handler: async ({ limit }) => {
34 try {
35 const decisions = await getRoutingLog(limit || 20);
36 if (decisions.length === 0) {
37 return { content: [{ type: "text", text: "No routing decisions yet." }] };
38 }
39 return { content: [{ type: "text", text: JSON.stringify({ count: decisions.length, decisions }, null, 2) }] };
40 } catch (err) {
41 return { content: [{ type: "text", text: `Failed: ${err.message}` }] };
42 }
43 },
44 },
45];
46
Loading comments...