EXTENSION for treeos-connect
gateway-x
X (Twitter) channel type for the gateway. Three modes: output-only post generation (the tree becomes a content engine), input-output conversation (threaded replies on X), and input-only listening (mentions, keywords, hashtags become cascade signals). Every X channel binds to a TreeOS user for energy, permissions, and rate limiting.
v1.0.1 by TreeOS Site 0 downloads 4 files 491 lines 15.4 KB published 38d ago
treeos ext install gateway-x
View changelog

Manifest

Provides

  • jobs

Requires

  • extensions: gateway
SHA256: ef42f1e302a2ca53c9dd66604785bfe3068d18f8fe78d72dafae454b7364b219

Dependents

1 package depend on this

PackageTypeRelationship
treeos-connect v1.0.3bundleincludes

Environment Variables

KeyRequiredDescription
X_API_KEY secret No X API Key (OAuth 1.0a consumer key)
X_API_SECRET secret No X API Secret (OAuth 1.0a consumer secret)
X_WEBHOOK_SECRET secret auto No Shared secret for verifying X Account Activity API webhooks

Source Code

1// X (Twitter) channel type handler.
2//
3// Three modes, one handler:
4//
5// OUTPUT: Post generation. The tree writes, the extension publishes.
6//   Cascade signals compress into post-length insights. The tree's
7//   transpiration: structured output evaporating into the cloud layer.
8//
9// INPUT-OUTPUT: Conversation. Someone replies on X. The webhook catches it.
10//   processGatewayMessage fires. The AI responds. Posted as a threaded reply.
11//   The tree is having a public conversation. History lives as notes.
12//
13// INPUT: Listening. Mentions, keywords, hashtags. Every matching post
14//   becomes rain falling on the land. Cascade signals from the clouds.
15//
16// Identity binding: every X channel maps an X account to a TreeOS user.
17// The user's energy budget covers API calls. The user's permissions
18// determine which trees the X account can interact with. Without this
19// binding, unauthenticated replies trigger AI with no rate limiting.
20// The gateway core handles this. We just map X account ID in config.
21//
22// X API v2 with OAuth 1.0a User Context (per-user tokens).
23// Rate limits: 1500 posts/month (free), 3000 (basic), 300K (pro).
24
25import log from "../../seed/log.js";
26import crypto from "crypto";
27
28// ─────────────────────────────────────────────────────────────────────────
29// OAUTH 1.0a SIGNING
30// ─────────────────────────────────────────────────────────────────────────
31
32function percentEncode(str) {
33  return encodeURIComponent(str)
34    .replace(/!/g, "%21")
35    .replace(/\*/g, "%2A")
36    .replace(/'/g, "%27")
37    .replace(/\(/g, "%28")
38    .replace(/\)/g, "%29");
39}
40
41function generateOAuthHeader(method, url, params, consumerKey, consumerSecret, accessToken, tokenSecret) {
42  const nonce = crypto.randomBytes(16).toString("hex");
43  const timestamp = Math.floor(Date.now() / 1000).toString();
44
45  const oauthParams = {
46    oauth_consumer_key: consumerKey,
47    oauth_nonce: nonce,
48    oauth_signature_method: "HMAC-SHA1",
49    oauth_timestamp: timestamp,
50    oauth_token: accessToken,
51    oauth_version: "1.0",
52  };
53
54  // Combine oauth params and request params, sort, encode
55  const allParams = { ...oauthParams, ...params };
56  const paramString = Object.keys(allParams)
57    .sort()
58    .map(k => `${percentEncode(k)}=${percentEncode(allParams[k])}`)
59    .join("&");
60
61  const baseString = `${method.toUpperCase()}&${percentEncode(url)}&${percentEncode(paramString)}`;
62  const signingKey = `${percentEncode(consumerSecret)}&${percentEncode(tokenSecret)}`;
63  const signature = crypto.createHmac("sha1", signingKey).update(baseString).digest("base64");
64
65  oauthParams.oauth_signature = signature;
66
67  const header = "OAuth " + Object.keys(oauthParams)
68    .sort()
69    .map(k => `${percentEncode(k)}="${percentEncode(oauthParams[k])}"`)
70    .join(", ");
71
72  return header;
73}
74
75// ─────────────────────────────────────────────────────────────────────────
76// X API CLIENT
77// ─────────────────────────────────────────────────────────────────────────
78
79function getCreds(secrets) {
80  return {
81    apiKey: secrets.apiKey || process.env.X_API_KEY,
82    apiSecret: secrets.apiSecret || process.env.X_API_SECRET,
83    accessToken: secrets.accessToken,
84    tokenSecret: secrets.tokenSecret,
85  };
86}
87
88async function xApi(creds, method, path, body) {
89  const url = `https://api.x.com${path}`;
90
91  const authHeader = generateOAuthHeader(
92    method, url, {},
93    creds.apiKey, creds.apiSecret,
94    creds.accessToken, creds.tokenSecret,
95  );
96
97  const opts = {
98    method,
99    headers: {
100      "Authorization": authHeader,
101      "Content-Type": "application/json",
102    },
103  };
104  if (body) opts.body = JSON.stringify(body);
105
106  const res = await fetch(url, opts);
107  const data = await res.json().catch(() => ({}));
108
109  if (!res.ok) {
110    const detail = data.detail || data.errors?.[0]?.message || data.title || `${res.status}`;
111    throw new Error(`X API ${method} ${path}: ${detail}`);
112  }
113  return data;
114}
115
116// ─────────────────────────────────────────────────────────────────────────
117// VALIDATION
118// ─────────────────────────────────────────────────────────────────────────
119
120function validateConfig(config, direction) {
121  // Per-user OAuth tokens are always required (X API v2 user context)
122  if (!config.accessToken || typeof config.accessToken !== "string") {
123    throw new Error("X channel requires an accessToken (OAuth 1.0a user access token)");
124  }
125  if (!config.tokenSecret || typeof config.tokenSecret !== "string") {
126    throw new Error("X channel requires a tokenSecret (OAuth 1.0a user token secret)");
127  }
128
129  const apiKey = config.apiKey || process.env.X_API_KEY;
130  const apiSecret = config.apiSecret || process.env.X_API_SECRET;
131  if (!apiKey || !apiSecret) {
132    throw new Error("X channel requires X_API_KEY and X_API_SECRET in .env or channel config");
133  }
134
135  // Input-only listening can optionally filter by keywords/hashtags
136  if (direction === "input" && config.listenFilter) {
137    if (typeof config.listenFilter !== "string" || config.listenFilter.length > 500) {
138      throw new Error("listenFilter must be a string under 500 characters");
139    }
140  }
141}
142
143function buildEncryptedConfig(config, direction) {
144  const secrets = {
145    accessToken: config.accessToken,
146    tokenSecret: config.tokenSecret,
147  };
148  if (config.apiKey) secrets.apiKey = config.apiKey;
149  if (config.apiSecret) secrets.apiSecret = config.apiSecret;
150
151  const metadata = {
152    xUserId: config.xUserId || null,
153    xUsername: config.xUsername || null,
154  };
155
156  // Input listening filter (keywords, hashtags, mentions)
157  if (config.listenFilter) {
158    metadata.listenFilter = config.listenFilter;
159  }
160
161  return {
162    secrets,
163    metadata,
164    displayIdentifier: config.xUsername ? `@${config.xUsername}` : config.xUserId || "x",
165  };
166}
167
168// ─────────────────────────────────────────────────────────────────────────
169// SENDER (post / reply)
170// ─────────────────────────────────────────────────────────────────────────
171
172async function send(secrets, metadata, notification) {
173  const creds = getCreds(secrets);
174  if (!creds.accessToken || !creds.tokenSecret) throw new Error("X OAuth credentials not configured");
175  if (!creds.apiKey || !creds.apiSecret) throw new Error("X API key not configured");
176
177  // Format as a post. 280 char limit for the main body.
178  let text = notification.content || "";
179  if (notification.title && text.length + notification.title.length + 3 < 280) {
180    text = `${notification.title}\n\n${text}`;
181  }
182  if (text.length > 280) text = text.slice(0, 277) + "...";
183
184  const body = { text };
185
186  // If this notification has a reply context (set by input processing), thread it
187  if (notification._replyToTweetId) {
188    body.reply = { in_reply_to_tweet_id: notification._replyToTweetId };
189  }
190
191  const result = await xApi(creds, "POST", "/2/tweets", body);
192  return result;
193}
194
195// ─────────────────────────────────────────────────────────────────────────
196// INPUT LIFECYCLE
197// ─────────────────────────────────────────────────────────────────────────
198
199async function registerInput(channel, secrets) {
200  // Input uses polling (see pollJob.js). The Account Activity API
201  // requires an enterprise tier. Polling /2/users/:id/mentions is
202  // available on basic tier. The poll job handles this.
203  const username = channel.config?.metadata?.xUsername || channel.config?.metadata?.xUserId;
204  log.info("GatewayX",
205    `X input registered for channel ${channel._id} (@${username}). ` +
206    `Poll job will check for new mentions and replies.`,
207  );
208}
209
210async function unregisterInput(channel, secrets) {
211  log.verbose("GatewayX", `X input unregistered for channel ${channel._id}`);
212}
213
214// Exported for pollJob.js
215export { xApi, getCreds };
216
217export default {
218  allowedDirections: ["input", "output", "input-output"],
219  validateConfig,
220  buildEncryptedConfig,
221  send,
222  registerInput,
223  unregisterInput,
224};
225
1import log from "../../seed/log.js";
2import handler from "./handler.js";
3import { getExtension } from "../loader.js";
4import { startupScan, stopPolling } from "./pollJob.js";
5
6export async function init(core) {
7  const gateway = getExtension("gateway");
8  if (!gateway?.exports?.registerChannelType) {
9    throw new Error("gateway-x requires the gateway extension to be loaded first");
10  }
11
12  gateway.exports.registerChannelType("x", handler);
13  log.verbose("GatewayX", "Registered X channel type");
14
15  return {
16    jobs: [
17      {
18        name: "gateway-x-poll",
19        start: () => { startupScan(); },
20        stop: () => { stopPolling(); },
21      },
22    ],
23  };
24}
25
1export default {
2  name: "gateway-x",
3  version: "1.0.1",
4  builtFor: "treeos-connect",
5  description:
6    "X (Twitter) channel type for the gateway. Three modes: output-only post generation " +
7    "(the tree becomes a content engine), input-output conversation (threaded replies on X), " +
8    "and input-only listening (mentions, keywords, hashtags become cascade signals). " +
9    "Every X channel binds to a TreeOS user for energy, permissions, and rate limiting.",
10
11  needs: {
12    extensions: ["gateway"],
13  },
14
15  optional: {},
16
17  provides: {
18    models: {},
19    routes: false,
20    tools: false,
21    jobs: true,
22    orchestrator: false,
23    energyActions: {},
24    sessionTypes: {},
25    env: [
26      { key: "X_API_KEY", required: false, secret: true, description: "X API Key (OAuth 1.0a consumer key)" },
27      { key: "X_API_SECRET", required: false, secret: true, description: "X API Secret (OAuth 1.0a consumer secret)" },
28      { key: "X_WEBHOOK_SECRET", required: false, secret: true, autoGenerate: true, description: "Shared secret for verifying X Account Activity API webhooks" },
29    ],
30    cli: [],
31  },
32};
33
1// X mention and reply poller.
2//
3// The Account Activity API (webhooks) requires enterprise tier.
4// Polling is available on all tiers. We poll:
5//   - /2/users/:id/mentions (input and input-output channels)
6//   - /2/tweets/search/recent with query (input-only listening channels)
7//
8// One poll loop per unique X user. Checks every 60 seconds.
9// Stores the latest seen tweet ID per channel to avoid reprocessing.
10//
11// Rate limits: 10 requests/15min for mentions, 60/15min for search.
12// Poll interval of 60s stays well within limits.
13
14import log from "../../seed/log.js";
15import { xApi, getCreds } from "./handler.js";
16
17// Map<channelId, { xUserId, creds, channelDoc, sinceId }>
18const activeChannels = new Map();
19
20let pollTimer = null;
21const POLL_INTERVAL_MS = 60000; // 60 seconds
22
23/**
24 * Connect a channel for polling.
25 */
26export function connectChannel(channelId, channel, secrets) {
27  const creds = getCreds(secrets);
28  const xUserId = channel.config?.metadata?.xUserId;
29  const listenFilter = channel.config?.metadata?.listenFilter;
30
31  if (!xUserId && !listenFilter) {
32    log.warn("GatewayX", `Channel ${channelId} has no xUserId or listenFilter, skipping`);
33    return;
34  }
35
36  activeChannels.set(channelId, {
37    xUserId,
38    listenFilter,
39    creds,
40    channelDoc: channel,
41    sinceId: null,
42  });
43}
44
45export function disconnectChannel(channelId) {
46  activeChannels.delete(channelId);
47}
48
49/**
50 * Scan all enabled X input channels and connect them.
51 */
52export async function startupScan() {
53  try {
54    const { getExtension } = await import("../loader.js");
55    const GatewayChannel = getExtension("gateway")?.exports?.GatewayChannel;
56    const channels = await GatewayChannel.find({
57      type: "x",
58      enabled: true,
59      direction: { $in: ["input", "input-output"] },
60    }).lean();
61
62    if (channels.length === 0) return;
63
64    const gateway = getExtension("gateway");
65    if (!gateway?.exports?.getChannelWithSecrets) return;
66
67    for (const ch of channels) {
68      try {
69        const full = await gateway.exports.getChannelWithSecrets(ch._id);
70        if (!full) continue;
71        connectChannel(ch._id.toString(), ch, full.config?.decryptedSecrets || {});
72      } catch (err) {
73        log.warn("GatewayX", `Failed to connect channel ${ch._id}: ${err.message}`);
74      }
75    }
76
77    if (activeChannels.size > 0) {
78      startPolling();
79      log.verbose("GatewayX", `Poll job: ${activeChannels.size} channel(s) connected`);
80    }
81  } catch (err) {
82    log.error("GatewayX", `Startup scan failed: ${err.message}`);
83  }
84}
85
86export function stopPolling() {
87  if (pollTimer) {
88    clearInterval(pollTimer);
89    pollTimer = null;
90  }
91  activeChannels.clear();
92}
93
94function startPolling() {
95  if (pollTimer) return;
96  pollTimer = setInterval(pollAll, POLL_INTERVAL_MS);
97  if (pollTimer.unref) pollTimer.unref();
98  // First poll immediately
99  pollAll();
100}
101
102// ─────────────────────────────────────────────────────────────────────────
103// POLL LOOP
104// ─────────────────────────────────────────────────────────────────────────
105
106async function pollAll() {
107  for (const [channelId, info] of activeChannels) {
108    try {
109      if (info.listenFilter && !info.xUserId) {
110        // Input-only listening: search recent tweets
111        await pollSearch(channelId, info);
112      } else if (info.xUserId) {
113        // Mentions (input or input-output)
114        await pollMentions(channelId, info);
115      }
116    } catch (err) {
117      log.debug("GatewayX", `Poll error for channel ${channelId}: ${err.message}`);
118    }
119  }
120}
121
122/**
123 * Poll /2/users/:id/mentions for new mentions and replies.
124 */
125async function pollMentions(channelId, info) {
126  let path = `/2/users/${info.xUserId}/mentions?max_results=10&tweet.fields=author_id,conversation_id,in_reply_to_user_id,text,created_at`;
127  if (info.sinceId) path += `&since_id=${info.sinceId}`;
128
129  const data = await xApi(info.creds, "GET", path);
130  const tweets = data.data;
131  if (!tweets || tweets.length === 0) return;
132
133  // Update since_id to newest
134  info.sinceId = tweets[0].id;
135
136  // On first poll, just set the watermark. Don't process old mentions.
137  if (!info.sinceId && tweets.length > 0) {
138    info.sinceId = tweets[0].id;
139    return;
140  }
141
142  for (const tweet of tweets.reverse()) { // oldest first
143    await processTweet(channelId, info, tweet);
144  }
145}
146
147/**
148 * Poll /2/tweets/search/recent for keyword/hashtag listening.
149 */
150async function pollSearch(channelId, info) {
151  const query = info.listenFilter;
152  if (!query) return;
153
154  let path = `/2/tweets/search/recent?query=${encodeURIComponent(query)}&max_results=10&tweet.fields=author_id,text,created_at`;
155  if (info.sinceId) path += `&since_id=${info.sinceId}`;
156
157  const data = await xApi(info.creds, "GET", path);
158  const tweets = data.data;
159  if (!tweets || tweets.length === 0) return;
160
161  // Update watermark
162  info.sinceId = tweets[0].id;
163
164  for (const tweet of tweets.reverse()) {
165    await processTweet(channelId, info, tweet);
166  }
167}
168
169/**
170 * Process a single tweet: send it through processGatewayMessage.
171 */
172async function processTweet(channelId, info, tweet) {
173  const senderName = tweet.author_id || "unknown";
174  const senderPlatformId = tweet.author_id || "";
175  const messageText = tweet.text?.trim();
176  if (!messageText) return;
177
178  log.verbose("GatewayX",
179    `Tweet on channel ${channelId} from ${senderName}: "${messageText.slice(0, 80)}"`,
180  );
181
182  try {
183    const { getExtension } = await import("../loader.js");
184    const gateway = getExtension("gateway");
185    if (!gateway?.exports?.processGatewayMessage) return;
186
187    const result = await gateway.exports.processGatewayMessage(channelId, {
188      senderName,
189      senderPlatformId,
190      messageText,
191    });
192
193    // Reply as a threaded tweet if input-output
194    const channel = info.channelDoc;
195    if (result.reply && channel.direction === "input-output") {
196      let replyText = result.reply;
197      if (replyText.length > 280) replyText = replyText.slice(0, 277) + "...";
198
199      await xApi(info.creds, "POST", "/2/tweets", {
200        text: replyText,
201        reply: { in_reply_to_tweet_id: tweet.id },
202      });
203    }
204  } catch (err) {
205    log.error("GatewayX", `Error processing tweet on channel ${channelId}: ${err.message}`);
206  }
207}
208

Versions

Version Published Downloads
1.0.1 38d ago 0
1.0.0 48d ago 0
0 stars
0 flags
React from the CLI: treeos ext star gateway-x

Comments

Loading comments...

Post comments from the CLI: treeos ext comment gateway-x "your comment"
Max 3 comments per extension. One star and one flag per user.