1// Matrix channel type handler.
2// Uses the Matrix Client-Server API directly (no SDK, just fetch).
3// Output: sends messages to a Matrix room via PUT /_matrix/client/v3/rooms/:roomId/send.
4// Input: polls /sync for new messages (long-polling, same pattern as the Matrix spec recommends for bots).
5// Config: homeserver, accessToken, botUserId, roomId. From env or per-channel.
6
7import log from "../../seed/log.js";
8
9function validateConfig(config, direction) {
10 const homeserver = config.homeserver || process.env.MATRIX_HOMESERVER;
11 const accessToken = config.accessToken || process.env.MATRIX_ACCESS_TOKEN;
12
13 if (!homeserver) {
14 throw new Error("Matrix requires a homeserver URL in config or MATRIX_HOMESERVER env var");
15 }
16
17 if (!accessToken) {
18 throw new Error("Matrix requires an accessToken in config or MATRIX_ACCESS_TOKEN env var");
19 }
20
21 if (!config.roomId || typeof config.roomId !== "string") {
22 throw new Error("Matrix requires a roomId (e.g., !roomid:yourdomain.com)");
23 }
24}
25
26function buildEncryptedConfig(config, direction) {
27 const secrets = {};
28 const metadata = {};
29
30 if (config.homeserver) secrets.homeserver = config.homeserver;
31 if (config.accessToken) secrets.accessToken = config.accessToken;
32
33 metadata.roomId = config.roomId;
34 metadata.botUserId = config.botUserId || process.env.MATRIX_BOT_USER_ID || null;
35
36 return {
37 secrets,
38 metadata,
39 displayIdentifier: config.roomId,
40 };
41}
42
43function getMatrixCreds(secrets) {
44 return {
45 homeserver: (secrets.homeserver || process.env.MATRIX_HOMESERVER || "").replace(/\/$/, ""),
46 accessToken: secrets.accessToken || process.env.MATRIX_ACCESS_TOKEN,
47 botUserId: process.env.MATRIX_BOT_USER_ID || null,
48 };
49}
50
51async function matrixApi(creds, method, path, body) {
52 const url = `${creds.homeserver}${path}`;
53 const opts = {
54 method,
55 headers: {
56 "Authorization": `Bearer ${creds.accessToken}`,
57 "Content-Type": "application/json",
58 },
59 };
60 if (body) opts.body = JSON.stringify(body);
61
62 const res = await fetch(url, opts);
63 const data = await res.json().catch(() => ({}));
64
65 if (!res.ok) {
66 throw new Error(`Matrix API ${method} ${path}: ${data.error || res.status}`);
67 }
68 return data;
69}
70
71async function send(secrets, metadata, notification) {
72 const creds = getMatrixCreds(secrets);
73 if (!creds.homeserver || !creds.accessToken) throw new Error("Matrix credentials not configured");
74 if (!metadata.roomId) throw new Error("Matrix roomId not configured");
75
76 const text = notification.title
77 ? `**${notification.title}**\n\n${notification.content}`
78 : notification.content;
79
80 // Matrix event ID must be unique per request
81 const txnId = `tree_${Date.now()}_${Math.random().toString(36).slice(2, 8)}`;
82
83 await matrixApi(creds, "PUT",
84 `/_matrix/client/v3/rooms/${encodeURIComponent(metadata.roomId)}/send/m.room.message/${txnId}`,
85 {
86 msgtype: "m.text",
87 body: text,
88 // Markdown-formatted version
89 format: "org.matrix.custom.html",
90 formatted_body: text.replace(/\*\*(.*?)\*\*/g, "<strong>$1</strong>").replace(/\n/g, "<br>"),
91 },
92 );
93}
94
95async function registerInput(channel, secrets) {
96 log.info("GatewayMatrix",
97 `Matrix input registered for channel ${channel._id} in room ${channel.config?.metadata?.roomId}. ` +
98 `Sync polling will start with the background job.`,
99 );
100}
101
102async function unregisterInput(channel, secrets) {
103 log.verbose("GatewayMatrix", `Matrix input unregistered for channel ${channel._id}`);
104}
105
106// Exported for syncJob.js
107export { matrixApi, getMatrixCreds };
108
109export default {
110 allowedDirections: ["input", "output", "input-output"],
111 validateConfig,
112 buildEncryptedConfig,
113 send,
114 registerInput,
115 unregisterInput,
116};
117
1import log from "../../seed/log.js";
2import handler from "./handler.js";
3import { getExtension } from "../loader.js";
4import { startupScan, stopAllSyncLoops } from "./syncJob.js";
5
6export async function init(core) {
7 const gateway = getExtension("gateway");
8 if (!gateway?.exports?.registerChannelType) {
9 throw new Error("gateway-matrix requires the gateway extension to be loaded first");
10 }
11
12 gateway.exports.registerChannelType("matrix", handler);
13 log.verbose("GatewayMatrix", "Registered Matrix channel type");
14
15 return {
16 jobs: [
17 {
18 name: "gateway-matrix-sync",
19 start: () => { startupScan(); },
20 stop: () => { stopAllSyncLoops(); },
21 },
22 ],
23 };
24}
25
1export default {
2 name: "gateway-matrix",
3 version: "1.0.1",
4 builtFor: "treeos-connect",
5 description:
6 "Registers the Matrix channel type with the gateway core, enabling trees to communicate " +
7 "through the Matrix open federation protocol. Output channels send notifications to a " +
8 "Matrix room using the Client-Server API (PUT to /_matrix/client/v3/rooms/:roomId/send). " +
9 "Messages are sent as m.room.message events with both plain text and org.matrix.custom.html " +
10 "formatting. No SDK dependency: the handler uses the Matrix REST API directly via fetch." +
11 "\n\n" +
12 "Input channels poll for new messages using the Matrix /sync endpoint with long-polling, " +
13 "the same mechanism the Matrix spec recommends for bots. One sync loop runs per unique " +
14 "homeserver and access token combination. Multiple channels sharing the same credentials " +
15 "share a single sync loop. On startup, the extension scans the database for enabled " +
16 "Matrix input channels, decrypts their access tokens, and starts sync loops automatically. " +
17 "The initial sync fetches the since token without processing old messages, so only new " +
18 "messages after boot are handled. Events are filtered to m.room.message with m.text " +
19 "msgtype. The bot's own messages are ignored using the configured MATRIX_BOT_USER_ID to " +
20 "prevent loops." +
21 "\n\n" +
22 "Input-output channels close the loop: when an inbound message produces a reply from " +
23 "the tree orchestrator, the bot posts the response back to the same Matrix room. " +
24 "Credentials can be set globally via environment variables (MATRIX_HOMESERVER, " +
25 "MATRIX_ACCESS_TOKEN, MATRIX_BOT_USER_ID) or per channel for multi-server deployments. " +
26 "Because Matrix is an open, self-hostable protocol, this channel type is the most " +
27 "aligned with TreeOS's federation philosophy: your land, your Matrix homeserver, your " +
28 "tree, your data. No third-party platform controls the connection. The sync loop retries " +
29 "gracefully on errors with a 10-second backoff and aborts cleanly on shutdown.",
30
31 needs: {
32 extensions: ["gateway"],
33 },
34
35 optional: {},
36
37 provides: {
38 models: {},
39 routes: false,
40 tools: false,
41 jobs: true,
42 orchestrator: false,
43 energyActions: {},
44 sessionTypes: {},
45 env: [
46 { key: "MATRIX_HOMESERVER", required: false, description: "Matrix homeserver URL (e.g., https://matrix.yourdomain.com)" },
47 { key: "MATRIX_ACCESS_TOKEN", required: false, secret: true, description: "Matrix bot access token" },
48 { key: "MATRIX_BOT_USER_ID", required: false, description: "Matrix bot user ID (e.g., @treebot:yourdomain.com)" },
49 ],
50 cli: [],
51 },
52};
53
1// Matrix sync poller.
2// Long-polls the /sync endpoint to receive new messages.
3// One poller per unique homeserver+token pair (like Discord's one bot per token).
4// When a message arrives in a room that matches a gateway channel, processes it.
5
6import log from "../../seed/log.js";
7import { matrixApi, getMatrixCreds } from "./handler.js";
8
9// Map<channelId, { roomId, creds, channelDoc }>
10const activeChannels = new Map();
11
12// Map<credKey, { nextBatch, abortController, running }>
13const syncLoops = new Map();
14
15function credKey(creds) {
16 // One sync loop per homeserver+token combo
17 return `${creds.homeserver}::${creds.accessToken.slice(-8)}`;
18}
19
20/**
21 * Start tracking a channel for incoming messages.
22 */
23export function connectChannel(channelId, channel, secrets) {
24 const creds = getMatrixCreds(secrets);
25 const roomId = channel.config?.metadata?.roomId;
26 if (!roomId) return;
27
28 activeChannels.set(channelId, { roomId, creds, channelDoc: channel });
29
30 const key = credKey(creds);
31 if (!syncLoops.has(key)) {
32 startSyncLoop(key, creds);
33 }
34}
35
36/**
37 * Stop tracking a channel.
38 */
39export function disconnectChannel(channelId) {
40 activeChannels.delete(channelId);
41
42 // If no more channels use this sync loop, stop it
43 // (check if any remaining channel shares the same credKey)
44 // For simplicity, leave the loop running. It's cheap when idle.
45}
46
47/**
48 * Scan all enabled Matrix input channels and connect them.
49 */
50export async function startupScan() {
51 try {
52 const { getExtension } = await import("../loader.js");
53 const GatewayChannel = getExtension("gateway")?.exports?.GatewayChannel;
54 const channels = await GatewayChannel.find({
55 type: "matrix",
56 enabled: true,
57 direction: { $in: ["input", "input-output"] },
58 }).lean();
59
60 if (channels.length === 0) return;
61
62 const gateway = getExtension("gateway");
63 if (!gateway?.exports?.getChannelWithSecrets) return;
64
65 for (const ch of channels) {
66 try {
67 const full = await gateway.exports.getChannelWithSecrets(ch._id);
68 if (!full) continue;
69 connectChannel(ch._id.toString(), ch, full.config?.decryptedSecrets || {});
70 } catch (err) {
71 log.warn("GatewayMatrix", `Failed to connect channel ${ch._id}: ${err.message}`);
72 }
73 }
74
75 log.verbose("GatewayMatrix", `Sync poller: ${activeChannels.size} channel(s) connected`);
76 } catch (err) {
77 log.error("GatewayMatrix", `Startup scan failed: ${err.message}`);
78 }
79}
80
81export function stopAllSyncLoops() {
82 for (const [key, loop] of syncLoops) {
83 loop.running = false;
84 if (loop.abortController) loop.abortController.abort();
85 }
86 syncLoops.clear();
87 activeChannels.clear();
88}
89
90// ─────────────────────────────────────────────────────────────────────────
91// SYNC LOOP
92// ─────────────────────────────────────────────────────────────────────────
93
94function startSyncLoop(key, creds) {
95 const state = { nextBatch: null, abortController: null, running: true };
96 syncLoops.set(key, state);
97
98 (async () => {
99 // Initial sync (get the since token without processing old messages)
100 try {
101 const initial = await matrixApi(creds, "GET",
102 "/_matrix/client/v3/sync?timeout=0&filter=" + encodeURIComponent(JSON.stringify({
103 room: { timeline: { limit: 0 } },
104 })),
105 );
106 state.nextBatch = initial.next_batch;
107 } catch (err) {
108 log.error("GatewayMatrix", `Initial sync failed: ${err.message}`);
109 syncLoops.delete(key);
110 return;
111 }
112
113 log.verbose("GatewayMatrix", `Sync loop started for ${creds.homeserver}`);
114
115 while (state.running) {
116 try {
117 state.abortController = new AbortController();
118 const timeout = 30000; // 30s long poll
119
120 const filter = JSON.stringify({
121 room: {
122 timeline: { limit: 10 },
123 // Only care about message events
124 types: ["m.room.message"],
125 },
126 });
127
128 const url = `/_matrix/client/v3/sync?timeout=${timeout}&since=${state.nextBatch}&filter=${encodeURIComponent(filter)}`;
129
130 const res = await fetch(`${creds.homeserver}${url}`, {
131 headers: { "Authorization": `Bearer ${creds.accessToken}` },
132 signal: state.abortController.signal,
133 });
134
135 if (!res.ok) {
136 log.warn("GatewayMatrix", `Sync error ${res.status}, retrying in 10s`);
137 await sleep(10000);
138 continue;
139 }
140
141 const data = await res.json();
142 state.nextBatch = data.next_batch;
143
144 // Process room events
145 const rooms = data.rooms?.join || {};
146 for (const [roomId, roomData] of Object.entries(rooms)) {
147 const events = roomData.timeline?.events || [];
148 for (const event of events) {
149 await handleMatrixEvent(roomId, event, creds);
150 }
151 }
152 } catch (err) {
153 if (err.name === "AbortError") break;
154 log.warn("GatewayMatrix", `Sync error: ${err.message}. Retrying in 10s.`);
155 await sleep(10000);
156 }
157 }
158
159 syncLoops.delete(key);
160 log.verbose("GatewayMatrix", `Sync loop stopped for ${creds.homeserver}`);
161 })();
162}
163
164async function handleMatrixEvent(roomId, event, creds) {
165 // Only process m.room.message with m.text msgtype
166 if (event.type !== "m.room.message") return;
167 if (event.content?.msgtype !== "m.text") return;
168
169 // Ignore bot's own messages
170 const botUserId = creds.botUserId || process.env.MATRIX_BOT_USER_ID;
171 if (botUserId && event.sender === botUserId) return;
172
173 const text = event.content.body?.trim();
174 if (!text) return;
175
176 // Find the channel(s) watching this room
177 for (const [channelId, info] of activeChannels) {
178 if (info.roomId !== roomId) continue;
179
180 const senderName = event.sender?.split(":")[0]?.replace("@", "") || "unknown";
181 const senderPlatformId = event.sender || "";
182
183 log.verbose("GatewayMatrix",
184 `Matrix message in ${roomId} from ${senderName}: "${text.slice(0, 80)}"`,
185 );
186
187 try {
188 const { getExtension } = await import("../loader.js");
189 const gateway = getExtension("gateway");
190 if (!gateway?.exports?.processGatewayMessage) return;
191
192 const result = await gateway.exports.processGatewayMessage(channelId, {
193 senderName,
194 senderPlatformId,
195 messageText: text,
196 });
197
198 // Reply in the room if input-output
199 const channel = info.channelDoc;
200 if (result.reply && channel.direction === "input-output") {
201 const txnId = `tree_${Date.now()}_${Math.random().toString(36).slice(2, 8)}`;
202 await matrixApi(creds, "PUT",
203 `/_matrix/client/v3/rooms/${encodeURIComponent(roomId)}/send/m.room.message/${txnId}`,
204 { msgtype: "m.text", body: result.reply },
205 );
206 }
207 } catch (err) {
208 log.error("GatewayMatrix", `Error processing Matrix message in ${roomId}: ${err.message}`);
209 }
210 }
211}
212
213function sleep(ms) {
214 return new Promise(resolve => setTimeout(resolve, ms));
215}
216
Loading comments...