EXTENSION for treeos-connect
gateway-matrix
Registers the Matrix channel type with the gateway core, enabling trees to communicate through the Matrix open federation protocol. Output channels send notifications to a Matrix room using the Client-Server API (PUT to /_matrix/client/v3/rooms/:roomId/send). Messages are sent as m.room.message events with both plain text and org.matrix.custom.html formatting. No SDK dependency: the handler uses the Matrix REST API directly via fetch. Input channels poll for new messages using the Matrix /sync endpoint with long-polling, the same mechanism the Matrix spec recommends for bots. One sync loop runs per unique homeserver and access token combination. Multiple channels sharing the same credentials share a single sync loop. On startup, the extension scans the database for enabled Matrix input channels, decrypts their access tokens, and starts sync loops automatically. The initial sync fetches the since token without processing old messages, so only new messages after boot are handled. Events are filtered to m.room.message with m.text msgtype. The bot's own messages are ignored using the configured MATRIX_BOT_USER_ID to prevent loops. Input-output channels close the loop: when an inbound message produces a reply from the tree orchestrator, the bot posts the response back to the same Matrix room. Credentials can be set globally via environment variables (MATRIX_HOMESERVER, MATRIX_ACCESS_TOKEN, MATRIX_BOT_USER_ID) or per channel for multi-server deployments. Because Matrix is an open, self-hostable protocol, this channel type is the most aligned with TreeOS's federation philosophy: your land, your Matrix homeserver, your tree, your data. No third-party platform controls the connection. The sync loop retries gracefully on errors with a 10-second backoff and aborts cleanly on shutdown.
v1.0.1 by TreeOS Site 0 downloads 4 files 411 lines 13.8 KB published 38d ago
treeos ext install gateway-matrix
View changelog

Manifest

Provides

  • jobs

Requires

  • extensions: gateway
SHA256: f257a67e6e299188c6ee36ac1db71911a4448c7de4cb84d55c12b6efea902691

Dependents

1 package depend on this

PackageTypeRelationship
treeos-connect v1.0.3bundleincludes

Environment Variables

KeyRequiredDescription
MATRIX_HOMESERVER No Matrix homeserver URL (e.g., https://matrix.yourdomain.com)
MATRIX_ACCESS_TOKEN secret No Matrix bot access token
MATRIX_BOT_USER_ID No Matrix bot user ID (e.g., @treebot:yourdomain.com)

Source Code

1// Matrix channel type handler.
2// Uses the Matrix Client-Server API directly (no SDK, just fetch).
3// Output: sends messages to a Matrix room via PUT /_matrix/client/v3/rooms/:roomId/send.
4// Input: polls /sync for new messages (long-polling, same pattern as the Matrix spec recommends for bots).
5// Config: homeserver, accessToken, botUserId, roomId. From env or per-channel.
6
7import log from "../../seed/log.js";
8
9function validateConfig(config, direction) {
10  const homeserver = config.homeserver || process.env.MATRIX_HOMESERVER;
11  const accessToken = config.accessToken || process.env.MATRIX_ACCESS_TOKEN;
12
13  if (!homeserver) {
14    throw new Error("Matrix requires a homeserver URL in config or MATRIX_HOMESERVER env var");
15  }
16
17  if (!accessToken) {
18    throw new Error("Matrix requires an accessToken in config or MATRIX_ACCESS_TOKEN env var");
19  }
20
21  if (!config.roomId || typeof config.roomId !== "string") {
22    throw new Error("Matrix requires a roomId (e.g., !roomid:yourdomain.com)");
23  }
24}
25
26function buildEncryptedConfig(config, direction) {
27  const secrets = {};
28  const metadata = {};
29
30  if (config.homeserver) secrets.homeserver = config.homeserver;
31  if (config.accessToken) secrets.accessToken = config.accessToken;
32
33  metadata.roomId = config.roomId;
34  metadata.botUserId = config.botUserId || process.env.MATRIX_BOT_USER_ID || null;
35
36  return {
37    secrets,
38    metadata,
39    displayIdentifier: config.roomId,
40  };
41}
42
43function getMatrixCreds(secrets) {
44  return {
45    homeserver: (secrets.homeserver || process.env.MATRIX_HOMESERVER || "").replace(/\/$/, ""),
46    accessToken: secrets.accessToken || process.env.MATRIX_ACCESS_TOKEN,
47    botUserId: process.env.MATRIX_BOT_USER_ID || null,
48  };
49}
50
51async function matrixApi(creds, method, path, body) {
52  const url = `${creds.homeserver}${path}`;
53  const opts = {
54    method,
55    headers: {
56      "Authorization": `Bearer ${creds.accessToken}`,
57      "Content-Type": "application/json",
58    },
59  };
60  if (body) opts.body = JSON.stringify(body);
61
62  const res = await fetch(url, opts);
63  const data = await res.json().catch(() => ({}));
64
65  if (!res.ok) {
66    throw new Error(`Matrix API ${method} ${path}: ${data.error || res.status}`);
67  }
68  return data;
69}
70
71async function send(secrets, metadata, notification) {
72  const creds = getMatrixCreds(secrets);
73  if (!creds.homeserver || !creds.accessToken) throw new Error("Matrix credentials not configured");
74  if (!metadata.roomId) throw new Error("Matrix roomId not configured");
75
76  const text = notification.title
77    ? `**${notification.title}**\n\n${notification.content}`
78    : notification.content;
79
80  // Matrix event ID must be unique per request
81  const txnId = `tree_${Date.now()}_${Math.random().toString(36).slice(2, 8)}`;
82
83  await matrixApi(creds, "PUT",
84    `/_matrix/client/v3/rooms/${encodeURIComponent(metadata.roomId)}/send/m.room.message/${txnId}`,
85    {
86      msgtype: "m.text",
87      body: text,
88      // Markdown-formatted version
89      format: "org.matrix.custom.html",
90      formatted_body: text.replace(/\*\*(.*?)\*\*/g, "<strong>$1</strong>").replace(/\n/g, "<br>"),
91    },
92  );
93}
94
95async function registerInput(channel, secrets) {
96  log.info("GatewayMatrix",
97    `Matrix input registered for channel ${channel._id} in room ${channel.config?.metadata?.roomId}. ` +
98    `Sync polling will start with the background job.`,
99  );
100}
101
102async function unregisterInput(channel, secrets) {
103  log.verbose("GatewayMatrix", `Matrix input unregistered for channel ${channel._id}`);
104}
105
106// Exported for syncJob.js
107export { matrixApi, getMatrixCreds };
108
109export default {
110  allowedDirections: ["input", "output", "input-output"],
111  validateConfig,
112  buildEncryptedConfig,
113  send,
114  registerInput,
115  unregisterInput,
116};
117
1import log from "../../seed/log.js";
2import handler from "./handler.js";
3import { getExtension } from "../loader.js";
4import { startupScan, stopAllSyncLoops } from "./syncJob.js";
5
6export async function init(core) {
7  const gateway = getExtension("gateway");
8  if (!gateway?.exports?.registerChannelType) {
9    throw new Error("gateway-matrix requires the gateway extension to be loaded first");
10  }
11
12  gateway.exports.registerChannelType("matrix", handler);
13  log.verbose("GatewayMatrix", "Registered Matrix channel type");
14
15  return {
16    jobs: [
17      {
18        name: "gateway-matrix-sync",
19        start: () => { startupScan(); },
20        stop: () => { stopAllSyncLoops(); },
21      },
22    ],
23  };
24}
25
1export default {
2  name: "gateway-matrix",
3  version: "1.0.1",
4  builtFor: "treeos-connect",
5  description:
6    "Registers the Matrix channel type with the gateway core, enabling trees to communicate " +
7    "through the Matrix open federation protocol. Output channels send notifications to a " +
8    "Matrix room using the Client-Server API (PUT to /_matrix/client/v3/rooms/:roomId/send). " +
9    "Messages are sent as m.room.message events with both plain text and org.matrix.custom.html " +
10    "formatting. No SDK dependency: the handler uses the Matrix REST API directly via fetch." +
11    "\n\n" +
12    "Input channels poll for new messages using the Matrix /sync endpoint with long-polling, " +
13    "the same mechanism the Matrix spec recommends for bots. One sync loop runs per unique " +
14    "homeserver and access token combination. Multiple channels sharing the same credentials " +
15    "share a single sync loop. On startup, the extension scans the database for enabled " +
16    "Matrix input channels, decrypts their access tokens, and starts sync loops automatically. " +
17    "The initial sync fetches the since token without processing old messages, so only new " +
18    "messages after boot are handled. Events are filtered to m.room.message with m.text " +
19    "msgtype. The bot's own messages are ignored using the configured MATRIX_BOT_USER_ID to " +
20    "prevent loops." +
21    "\n\n" +
22    "Input-output channels close the loop: when an inbound message produces a reply from " +
23    "the tree orchestrator, the bot posts the response back to the same Matrix room. " +
24    "Credentials can be set globally via environment variables (MATRIX_HOMESERVER, " +
25    "MATRIX_ACCESS_TOKEN, MATRIX_BOT_USER_ID) or per channel for multi-server deployments. " +
26    "Because Matrix is an open, self-hostable protocol, this channel type is the most " +
27    "aligned with TreeOS's federation philosophy: your land, your Matrix homeserver, your " +
28    "tree, your data. No third-party platform controls the connection. The sync loop retries " +
29    "gracefully on errors with a 10-second backoff and aborts cleanly on shutdown.",
30
31  needs: {
32    extensions: ["gateway"],
33  },
34
35  optional: {},
36
37  provides: {
38    models: {},
39    routes: false,
40    tools: false,
41    jobs: true,
42    orchestrator: false,
43    energyActions: {},
44    sessionTypes: {},
45    env: [
46      { key: "MATRIX_HOMESERVER", required: false, description: "Matrix homeserver URL (e.g., https://matrix.yourdomain.com)" },
47      { key: "MATRIX_ACCESS_TOKEN", required: false, secret: true, description: "Matrix bot access token" },
48      { key: "MATRIX_BOT_USER_ID", required: false, description: "Matrix bot user ID (e.g., @treebot:yourdomain.com)" },
49    ],
50    cli: [],
51  },
52};
53
1// Matrix sync poller.
2// Long-polls the /sync endpoint to receive new messages.
3// One poller per unique homeserver+token pair (like Discord's one bot per token).
4// When a message arrives in a room that matches a gateway channel, processes it.
5
6import log from "../../seed/log.js";
7import { matrixApi, getMatrixCreds } from "./handler.js";
8
9// Map<channelId, { roomId, creds, channelDoc }>
10const activeChannels = new Map();
11
12// Map<credKey, { nextBatch, abortController, running }>
13const syncLoops = new Map();
14
15function credKey(creds) {
16  // One sync loop per homeserver+token combo
17  return `${creds.homeserver}::${creds.accessToken.slice(-8)}`;
18}
19
20/**
21 * Start tracking a channel for incoming messages.
22 */
23export function connectChannel(channelId, channel, secrets) {
24  const creds = getMatrixCreds(secrets);
25  const roomId = channel.config?.metadata?.roomId;
26  if (!roomId) return;
27
28  activeChannels.set(channelId, { roomId, creds, channelDoc: channel });
29
30  const key = credKey(creds);
31  if (!syncLoops.has(key)) {
32    startSyncLoop(key, creds);
33  }
34}
35
36/**
37 * Stop tracking a channel.
38 */
39export function disconnectChannel(channelId) {
40  activeChannels.delete(channelId);
41
42  // If no more channels use this sync loop, stop it
43  // (check if any remaining channel shares the same credKey)
44  // For simplicity, leave the loop running. It's cheap when idle.
45}
46
47/**
48 * Scan all enabled Matrix input channels and connect them.
49 */
50export async function startupScan() {
51  try {
52    const { getExtension } = await import("../loader.js");
53    const GatewayChannel = getExtension("gateway")?.exports?.GatewayChannel;
54    const channels = await GatewayChannel.find({
55      type: "matrix",
56      enabled: true,
57      direction: { $in: ["input", "input-output"] },
58    }).lean();
59
60    if (channels.length === 0) return;
61
62    const gateway = getExtension("gateway");
63    if (!gateway?.exports?.getChannelWithSecrets) return;
64
65    for (const ch of channels) {
66      try {
67        const full = await gateway.exports.getChannelWithSecrets(ch._id);
68        if (!full) continue;
69        connectChannel(ch._id.toString(), ch, full.config?.decryptedSecrets || {});
70      } catch (err) {
71        log.warn("GatewayMatrix", `Failed to connect channel ${ch._id}: ${err.message}`);
72      }
73    }
74
75    log.verbose("GatewayMatrix", `Sync poller: ${activeChannels.size} channel(s) connected`);
76  } catch (err) {
77    log.error("GatewayMatrix", `Startup scan failed: ${err.message}`);
78  }
79}
80
81export function stopAllSyncLoops() {
82  for (const [key, loop] of syncLoops) {
83    loop.running = false;
84    if (loop.abortController) loop.abortController.abort();
85  }
86  syncLoops.clear();
87  activeChannels.clear();
88}
89
90// ─────────────────────────────────────────────────────────────────────────
91// SYNC LOOP
92// ─────────────────────────────────────────────────────────────────────────
93
94function startSyncLoop(key, creds) {
95  const state = { nextBatch: null, abortController: null, running: true };
96  syncLoops.set(key, state);
97
98  (async () => {
99    // Initial sync (get the since token without processing old messages)
100    try {
101      const initial = await matrixApi(creds, "GET",
102        "/_matrix/client/v3/sync?timeout=0&filter=" + encodeURIComponent(JSON.stringify({
103          room: { timeline: { limit: 0 } },
104        })),
105      );
106      state.nextBatch = initial.next_batch;
107    } catch (err) {
108      log.error("GatewayMatrix", `Initial sync failed: ${err.message}`);
109      syncLoops.delete(key);
110      return;
111    }
112
113    log.verbose("GatewayMatrix", `Sync loop started for ${creds.homeserver}`);
114
115    while (state.running) {
116      try {
117        state.abortController = new AbortController();
118        const timeout = 30000; // 30s long poll
119
120        const filter = JSON.stringify({
121          room: {
122            timeline: { limit: 10 },
123            // Only care about message events
124            types: ["m.room.message"],
125          },
126        });
127
128        const url = `/_matrix/client/v3/sync?timeout=${timeout}&since=${state.nextBatch}&filter=${encodeURIComponent(filter)}`;
129
130        const res = await fetch(`${creds.homeserver}${url}`, {
131          headers: { "Authorization": `Bearer ${creds.accessToken}` },
132          signal: state.abortController.signal,
133        });
134
135        if (!res.ok) {
136          log.warn("GatewayMatrix", `Sync error ${res.status}, retrying in 10s`);
137          await sleep(10000);
138          continue;
139        }
140
141        const data = await res.json();
142        state.nextBatch = data.next_batch;
143
144        // Process room events
145        const rooms = data.rooms?.join || {};
146        for (const [roomId, roomData] of Object.entries(rooms)) {
147          const events = roomData.timeline?.events || [];
148          for (const event of events) {
149            await handleMatrixEvent(roomId, event, creds);
150          }
151        }
152      } catch (err) {
153        if (err.name === "AbortError") break;
154        log.warn("GatewayMatrix", `Sync error: ${err.message}. Retrying in 10s.`);
155        await sleep(10000);
156      }
157    }
158
159    syncLoops.delete(key);
160    log.verbose("GatewayMatrix", `Sync loop stopped for ${creds.homeserver}`);
161  })();
162}
163
164async function handleMatrixEvent(roomId, event, creds) {
165  // Only process m.room.message with m.text msgtype
166  if (event.type !== "m.room.message") return;
167  if (event.content?.msgtype !== "m.text") return;
168
169  // Ignore bot's own messages
170  const botUserId = creds.botUserId || process.env.MATRIX_BOT_USER_ID;
171  if (botUserId && event.sender === botUserId) return;
172
173  const text = event.content.body?.trim();
174  if (!text) return;
175
176  // Find the channel(s) watching this room
177  for (const [channelId, info] of activeChannels) {
178    if (info.roomId !== roomId) continue;
179
180    const senderName = event.sender?.split(":")[0]?.replace("@", "") || "unknown";
181    const senderPlatformId = event.sender || "";
182
183    log.verbose("GatewayMatrix",
184      `Matrix message in ${roomId} from ${senderName}: "${text.slice(0, 80)}"`,
185    );
186
187    try {
188      const { getExtension } = await import("../loader.js");
189      const gateway = getExtension("gateway");
190      if (!gateway?.exports?.processGatewayMessage) return;
191
192      const result = await gateway.exports.processGatewayMessage(channelId, {
193        senderName,
194        senderPlatformId,
195        messageText: text,
196      });
197
198      // Reply in the room if input-output
199      const channel = info.channelDoc;
200      if (result.reply && channel.direction === "input-output") {
201        const txnId = `tree_${Date.now()}_${Math.random().toString(36).slice(2, 8)}`;
202        await matrixApi(creds, "PUT",
203          `/_matrix/client/v3/rooms/${encodeURIComponent(roomId)}/send/m.room.message/${txnId}`,
204          { msgtype: "m.text", body: result.reply },
205        );
206      }
207    } catch (err) {
208      log.error("GatewayMatrix", `Error processing Matrix message in ${roomId}: ${err.message}`);
209    }
210  }
211}
212
213function sleep(ms) {
214  return new Promise(resolve => setTimeout(resolve, ms));
215}
216

Versions

Version Published Downloads
1.0.1 38d ago 0
1.0.0 48d ago 0
0 stars
0 flags
React from the CLI: treeos ext star gateway-matrix

Comments

Loading comments...

Post comments from the CLI: treeos ext comment gateway-matrix "your comment"
Max 3 comments per extension. One star and one flag per user.