1// X (Twitter) channel type handler.
2//
3// Three modes, one handler:
4//
5// OUTPUT: Post generation. The tree writes, the extension publishes.
6// Cascade signals compress into post-length insights. The tree's
7// transpiration: structured output evaporating into the cloud layer.
8//
9// INPUT-OUTPUT: Conversation. Someone replies on X. The webhook catches it.
10// processGatewayMessage fires. The AI responds. Posted as a threaded reply.
11// The tree is having a public conversation. History lives as notes.
12//
13// INPUT: Listening. Mentions, keywords, hashtags. Every matching post
14// becomes rain falling on the land. Cascade signals from the clouds.
15//
16// Identity binding: every X channel maps an X account to a TreeOS user.
17// The user's energy budget covers API calls. The user's permissions
18// determine which trees the X account can interact with. Without this
19// binding, unauthenticated replies trigger AI with no rate limiting.
20// The gateway core handles this. We just map X account ID in config.
21//
22// X API v2 with OAuth 1.0a User Context (per-user tokens).
23// Rate limits: 1500 posts/month (free), 3000 (basic), 300K (pro).
24
25import log from "../../seed/log.js";
26import crypto from "crypto";
27
28// ─────────────────────────────────────────────────────────────────────────
29// OAUTH 1.0a SIGNING
30// ─────────────────────────────────────────────────────────────────────────
31
32function percentEncode(str) {
33 return encodeURIComponent(str)
34 .replace(/!/g, "%21")
35 .replace(/\*/g, "%2A")
36 .replace(/'/g, "%27")
37 .replace(/\(/g, "%28")
38 .replace(/\)/g, "%29");
39}
40
41function generateOAuthHeader(method, url, params, consumerKey, consumerSecret, accessToken, tokenSecret) {
42 const nonce = crypto.randomBytes(16).toString("hex");
43 const timestamp = Math.floor(Date.now() / 1000).toString();
44
45 const oauthParams = {
46 oauth_consumer_key: consumerKey,
47 oauth_nonce: nonce,
48 oauth_signature_method: "HMAC-SHA1",
49 oauth_timestamp: timestamp,
50 oauth_token: accessToken,
51 oauth_version: "1.0",
52 };
53
54 // Combine oauth params and request params, sort, encode
55 const allParams = { ...oauthParams, ...params };
56 const paramString = Object.keys(allParams)
57 .sort()
58 .map(k => `${percentEncode(k)}=${percentEncode(allParams[k])}`)
59 .join("&");
60
61 const baseString = `${method.toUpperCase()}&${percentEncode(url)}&${percentEncode(paramString)}`;
62 const signingKey = `${percentEncode(consumerSecret)}&${percentEncode(tokenSecret)}`;
63 const signature = crypto.createHmac("sha1", signingKey).update(baseString).digest("base64");
64
65 oauthParams.oauth_signature = signature;
66
67 const header = "OAuth " + Object.keys(oauthParams)
68 .sort()
69 .map(k => `${percentEncode(k)}="${percentEncode(oauthParams[k])}"`)
70 .join(", ");
71
72 return header;
73}
74
75// ─────────────────────────────────────────────────────────────────────────
76// X API CLIENT
77// ─────────────────────────────────────────────────────────────────────────
78
79function getCreds(secrets) {
80 return {
81 apiKey: secrets.apiKey || process.env.X_API_KEY,
82 apiSecret: secrets.apiSecret || process.env.X_API_SECRET,
83 accessToken: secrets.accessToken,
84 tokenSecret: secrets.tokenSecret,
85 };
86}
87
88async function xApi(creds, method, path, body) {
89 const url = `https://api.x.com${path}`;
90
91 const authHeader = generateOAuthHeader(
92 method, url, {},
93 creds.apiKey, creds.apiSecret,
94 creds.accessToken, creds.tokenSecret,
95 );
96
97 const opts = {
98 method,
99 headers: {
100 "Authorization": authHeader,
101 "Content-Type": "application/json",
102 },
103 };
104 if (body) opts.body = JSON.stringify(body);
105
106 const res = await fetch(url, opts);
107 const data = await res.json().catch(() => ({}));
108
109 if (!res.ok) {
110 const detail = data.detail || data.errors?.[0]?.message || data.title || `${res.status}`;
111 throw new Error(`X API ${method} ${path}: ${detail}`);
112 }
113 return data;
114}
115
116// ─────────────────────────────────────────────────────────────────────────
117// VALIDATION
118// ─────────────────────────────────────────────────────────────────────────
119
120function validateConfig(config, direction) {
121 // Per-user OAuth tokens are always required (X API v2 user context)
122 if (!config.accessToken || typeof config.accessToken !== "string") {
123 throw new Error("X channel requires an accessToken (OAuth 1.0a user access token)");
124 }
125 if (!config.tokenSecret || typeof config.tokenSecret !== "string") {
126 throw new Error("X channel requires a tokenSecret (OAuth 1.0a user token secret)");
127 }
128
129 const apiKey = config.apiKey || process.env.X_API_KEY;
130 const apiSecret = config.apiSecret || process.env.X_API_SECRET;
131 if (!apiKey || !apiSecret) {
132 throw new Error("X channel requires X_API_KEY and X_API_SECRET in .env or channel config");
133 }
134
135 // Input-only listening can optionally filter by keywords/hashtags
136 if (direction === "input" && config.listenFilter) {
137 if (typeof config.listenFilter !== "string" || config.listenFilter.length > 500) {
138 throw new Error("listenFilter must be a string under 500 characters");
139 }
140 }
141}
142
143function buildEncryptedConfig(config, direction) {
144 const secrets = {
145 accessToken: config.accessToken,
146 tokenSecret: config.tokenSecret,
147 };
148 if (config.apiKey) secrets.apiKey = config.apiKey;
149 if (config.apiSecret) secrets.apiSecret = config.apiSecret;
150
151 const metadata = {
152 xUserId: config.xUserId || null,
153 xUsername: config.xUsername || null,
154 };
155
156 // Input listening filter (keywords, hashtags, mentions)
157 if (config.listenFilter) {
158 metadata.listenFilter = config.listenFilter;
159 }
160
161 return {
162 secrets,
163 metadata,
164 displayIdentifier: config.xUsername ? `@${config.xUsername}` : config.xUserId || "x",
165 };
166}
167
168// ─────────────────────────────────────────────────────────────────────────
169// SENDER (post / reply)
170// ─────────────────────────────────────────────────────────────────────────
171
172async function send(secrets, metadata, notification) {
173 const creds = getCreds(secrets);
174 if (!creds.accessToken || !creds.tokenSecret) throw new Error("X OAuth credentials not configured");
175 if (!creds.apiKey || !creds.apiSecret) throw new Error("X API key not configured");
176
177 // Format as a post. 280 char limit for the main body.
178 let text = notification.content || "";
179 if (notification.title && text.length + notification.title.length + 3 < 280) {
180 text = `${notification.title}\n\n${text}`;
181 }
182 if (text.length > 280) text = text.slice(0, 277) + "...";
183
184 const body = { text };
185
186 // If this notification has a reply context (set by input processing), thread it
187 if (notification._replyToTweetId) {
188 body.reply = { in_reply_to_tweet_id: notification._replyToTweetId };
189 }
190
191 const result = await xApi(creds, "POST", "/2/tweets", body);
192 return result;
193}
194
195// ─────────────────────────────────────────────────────────────────────────
196// INPUT LIFECYCLE
197// ─────────────────────────────────────────────────────────────────────────
198
199async function registerInput(channel, secrets) {
200 // Input uses polling (see pollJob.js). The Account Activity API
201 // requires an enterprise tier. Polling /2/users/:id/mentions is
202 // available on basic tier. The poll job handles this.
203 const username = channel.config?.metadata?.xUsername || channel.config?.metadata?.xUserId;
204 log.info("GatewayX",
205 `X input registered for channel ${channel._id} (@${username}). ` +
206 `Poll job will check for new mentions and replies.`,
207 );
208}
209
210async function unregisterInput(channel, secrets) {
211 log.verbose("GatewayX", `X input unregistered for channel ${channel._id}`);
212}
213
214// Exported for pollJob.js
215export { xApi, getCreds };
216
217export default {
218 allowedDirections: ["input", "output", "input-output"],
219 validateConfig,
220 buildEncryptedConfig,
221 send,
222 registerInput,
223 unregisterInput,
224};
225
1import log from "../../seed/log.js";
2import handler from "./handler.js";
3import { getExtension } from "../loader.js";
4import { startupScan, stopPolling } from "./pollJob.js";
5
6export async function init(core) {
7 const gateway = getExtension("gateway");
8 if (!gateway?.exports?.registerChannelType) {
9 throw new Error("gateway-x requires the gateway extension to be loaded first");
10 }
11
12 gateway.exports.registerChannelType("x", handler);
13 log.verbose("GatewayX", "Registered X channel type");
14
15 return {
16 jobs: [
17 {
18 name: "gateway-x-poll",
19 start: () => { startupScan(); },
20 stop: () => { stopPolling(); },
21 },
22 ],
23 };
24}
25
1export default {
2 name: "gateway-x",
3 version: "1.0.1",
4 builtFor: "treeos-connect",
5 description:
6 "X (Twitter) channel type for the gateway. Three modes: output-only post generation " +
7 "(the tree becomes a content engine), input-output conversation (threaded replies on X), " +
8 "and input-only listening (mentions, keywords, hashtags become cascade signals). " +
9 "Every X channel binds to a TreeOS user for energy, permissions, and rate limiting.",
10
11 needs: {
12 extensions: ["gateway"],
13 },
14
15 optional: {},
16
17 provides: {
18 models: {},
19 routes: false,
20 tools: false,
21 jobs: true,
22 orchestrator: false,
23 energyActions: {},
24 sessionTypes: {},
25 env: [
26 { key: "X_API_KEY", required: false, secret: true, description: "X API Key (OAuth 1.0a consumer key)" },
27 { key: "X_API_SECRET", required: false, secret: true, description: "X API Secret (OAuth 1.0a consumer secret)" },
28 { key: "X_WEBHOOK_SECRET", required: false, secret: true, autoGenerate: true, description: "Shared secret for verifying X Account Activity API webhooks" },
29 ],
30 cli: [],
31 },
32};
33
1// X mention and reply poller.
2//
3// The Account Activity API (webhooks) requires enterprise tier.
4// Polling is available on all tiers. We poll:
5// - /2/users/:id/mentions (input and input-output channels)
6// - /2/tweets/search/recent with query (input-only listening channels)
7//
8// One poll loop per unique X user. Checks every 60 seconds.
9// Stores the latest seen tweet ID per channel to avoid reprocessing.
10//
11// Rate limits: 10 requests/15min for mentions, 60/15min for search.
12// Poll interval of 60s stays well within limits.
13
14import log from "../../seed/log.js";
15import { xApi, getCreds } from "./handler.js";
16
17// Map<channelId, { xUserId, creds, channelDoc, sinceId }>
18const activeChannels = new Map();
19
20let pollTimer = null;
21const POLL_INTERVAL_MS = 60000; // 60 seconds
22
23/**
24 * Connect a channel for polling.
25 */
26export function connectChannel(channelId, channel, secrets) {
27 const creds = getCreds(secrets);
28 const xUserId = channel.config?.metadata?.xUserId;
29 const listenFilter = channel.config?.metadata?.listenFilter;
30
31 if (!xUserId && !listenFilter) {
32 log.warn("GatewayX", `Channel ${channelId} has no xUserId or listenFilter, skipping`);
33 return;
34 }
35
36 activeChannels.set(channelId, {
37 xUserId,
38 listenFilter,
39 creds,
40 channelDoc: channel,
41 sinceId: null,
42 });
43}
44
45export function disconnectChannel(channelId) {
46 activeChannels.delete(channelId);
47}
48
49/**
50 * Scan all enabled X input channels and connect them.
51 */
52export async function startupScan() {
53 try {
54 const { getExtension } = await import("../loader.js");
55 const GatewayChannel = getExtension("gateway")?.exports?.GatewayChannel;
56 const channels = await GatewayChannel.find({
57 type: "x",
58 enabled: true,
59 direction: { $in: ["input", "input-output"] },
60 }).lean();
61
62 if (channels.length === 0) return;
63
64 const gateway = getExtension("gateway");
65 if (!gateway?.exports?.getChannelWithSecrets) return;
66
67 for (const ch of channels) {
68 try {
69 const full = await gateway.exports.getChannelWithSecrets(ch._id);
70 if (!full) continue;
71 connectChannel(ch._id.toString(), ch, full.config?.decryptedSecrets || {});
72 } catch (err) {
73 log.warn("GatewayX", `Failed to connect channel ${ch._id}: ${err.message}`);
74 }
75 }
76
77 if (activeChannels.size > 0) {
78 startPolling();
79 log.verbose("GatewayX", `Poll job: ${activeChannels.size} channel(s) connected`);
80 }
81 } catch (err) {
82 log.error("GatewayX", `Startup scan failed: ${err.message}`);
83 }
84}
85
86export function stopPolling() {
87 if (pollTimer) {
88 clearInterval(pollTimer);
89 pollTimer = null;
90 }
91 activeChannels.clear();
92}
93
94function startPolling() {
95 if (pollTimer) return;
96 pollTimer = setInterval(pollAll, POLL_INTERVAL_MS);
97 if (pollTimer.unref) pollTimer.unref();
98 // First poll immediately
99 pollAll();
100}
101
102// ─────────────────────────────────────────────────────────────────────────
103// POLL LOOP
104// ─────────────────────────────────────────────────────────────────────────
105
106async function pollAll() {
107 for (const [channelId, info] of activeChannels) {
108 try {
109 if (info.listenFilter && !info.xUserId) {
110 // Input-only listening: search recent tweets
111 await pollSearch(channelId, info);
112 } else if (info.xUserId) {
113 // Mentions (input or input-output)
114 await pollMentions(channelId, info);
115 }
116 } catch (err) {
117 log.debug("GatewayX", `Poll error for channel ${channelId}: ${err.message}`);
118 }
119 }
120}
121
122/**
123 * Poll /2/users/:id/mentions for new mentions and replies.
124 */
125async function pollMentions(channelId, info) {
126 let path = `/2/users/${info.xUserId}/mentions?max_results=10&tweet.fields=author_id,conversation_id,in_reply_to_user_id,text,created_at`;
127 if (info.sinceId) path += `&since_id=${info.sinceId}`;
128
129 const data = await xApi(info.creds, "GET", path);
130 const tweets = data.data;
131 if (!tweets || tweets.length === 0) return;
132
133 // Update since_id to newest
134 info.sinceId = tweets[0].id;
135
136 // On first poll, just set the watermark. Don't process old mentions.
137 if (!info.sinceId && tweets.length > 0) {
138 info.sinceId = tweets[0].id;
139 return;
140 }
141
142 for (const tweet of tweets.reverse()) { // oldest first
143 await processTweet(channelId, info, tweet);
144 }
145}
146
147/**
148 * Poll /2/tweets/search/recent for keyword/hashtag listening.
149 */
150async function pollSearch(channelId, info) {
151 const query = info.listenFilter;
152 if (!query) return;
153
154 let path = `/2/tweets/search/recent?query=${encodeURIComponent(query)}&max_results=10&tweet.fields=author_id,text,created_at`;
155 if (info.sinceId) path += `&since_id=${info.sinceId}`;
156
157 const data = await xApi(info.creds, "GET", path);
158 const tweets = data.data;
159 if (!tweets || tweets.length === 0) return;
160
161 // Update watermark
162 info.sinceId = tweets[0].id;
163
164 for (const tweet of tweets.reverse()) {
165 await processTweet(channelId, info, tweet);
166 }
167}
168
169/**
170 * Process a single tweet: send it through processGatewayMessage.
171 */
172async function processTweet(channelId, info, tweet) {
173 const senderName = tweet.author_id || "unknown";
174 const senderPlatformId = tweet.author_id || "";
175 const messageText = tweet.text?.trim();
176 if (!messageText) return;
177
178 log.verbose("GatewayX",
179 `Tweet on channel ${channelId} from ${senderName}: "${messageText.slice(0, 80)}"`,
180 );
181
182 try {
183 const { getExtension } = await import("../loader.js");
184 const gateway = getExtension("gateway");
185 if (!gateway?.exports?.processGatewayMessage) return;
186
187 const result = await gateway.exports.processGatewayMessage(channelId, {
188 senderName,
189 senderPlatformId,
190 messageText,
191 });
192
193 // Reply as a threaded tweet if input-output
194 const channel = info.channelDoc;
195 if (result.reply && channel.direction === "input-output") {
196 let replyText = result.reply;
197 if (replyText.length > 280) replyText = replyText.slice(0, 277) + "...";
198
199 await xApi(info.creds, "POST", "/2/tweets", {
200 text: replyText,
201 reply: { in_reply_to_tweet_id: tweet.id },
202 });
203 }
204 } catch (err) {
205 log.error("GatewayX", `Error processing tweet on channel ${channelId}: ${err.message}`);
206 }
207}
208
Loading comments...