1// Reddit channel type handler.
2//
3// Reddit API uses OAuth2 with password grant (script apps) or refresh tokens.
4// Rate limit: 100 requests per minute per OAuth client. 10 requests per minute
5// for search. Poll interval of 90 seconds keeps us well under.
6//
7// Three modes:
8// OUTPUT: Submit self-posts to a subreddit via /api/submit.
9// INPUT-OUTPUT: Poll comment replies on the bot's posts, respond in threads.
10// INPUT: Monitor subreddit/new or /search for keywords. Rain from the clouds.
11//
12// Auth: Reddit "script" app type (personal use) with username/password grant.
13// Per-channel override possible for multi-account setups.
14
15import log from "../../seed/log.js";
16
17// ─────────────────────────────────────────────────────────────────────────
18// REDDIT OAUTH2
19// ─────────────────────────────────────────────────────────────────────────
20
21// Token cache: one token per client_id (shared across channels with same creds)
22const tokenCache = new Map(); // credKey -> { token, expiresAt }
23
24function credKey(clientId, username) {
25 return `${clientId}:${username}`;
26}
27
28async function getAccessToken(creds) {
29 const key = credKey(creds.clientId, creds.username);
30 const cached = tokenCache.get(key);
31 if (cached && cached.expiresAt > Date.now() + 60000) {
32 return cached.token;
33 }
34
35 const auth = Buffer.from(`${creds.clientId}:${creds.clientSecret}`).toString("base64");
36 const params = new URLSearchParams();
37 params.append("grant_type", "password");
38 params.append("username", creds.username);
39 params.append("password", creds.password);
40
41 const res = await fetch("https://www.reddit.com/api/v1/access_token", {
42 method: "POST",
43 headers: {
44 "Authorization": `Basic ${auth}`,
45 "Content-Type": "application/x-www-form-urlencoded",
46 "User-Agent": `TreeOS-Gateway/1.0 (by /u/${creds.username})`,
47 },
48 body: params.toString(),
49 });
50
51 const data = await res.json();
52 if (!res.ok || data.error) {
53 throw new Error(`Reddit OAuth failed: ${data.error || res.status}`);
54 }
55
56 const token = data.access_token;
57 const expiresAt = Date.now() + (data.expires_in || 3600) * 1000;
58 tokenCache.set(key, { token, expiresAt });
59 return token;
60}
61
62// ─────────────────────────────────────────────────────────────────────────
63// REDDIT API CLIENT
64// ─────────────────────────────────────────────────────────────────────────
65
66function getCreds(secrets) {
67 return {
68 clientId: secrets.clientId || process.env.REDDIT_CLIENT_ID,
69 clientSecret: secrets.clientSecret || process.env.REDDIT_CLIENT_SECRET,
70 username: secrets.username || process.env.REDDIT_USERNAME,
71 password: secrets.password || process.env.REDDIT_PASSWORD,
72 };
73}
74
75async function redditApi(creds, method, path, body) {
76 const token = await getAccessToken(creds);
77 const url = `https://oauth.reddit.com${path}`;
78
79 const opts = {
80 method,
81 headers: {
82 "Authorization": `Bearer ${token}`,
83 "User-Agent": `TreeOS-Gateway/1.0 (by /u/${creds.username})`,
84 "Content-Type": "application/json",
85 },
86 };
87
88 if (body && method === "POST") {
89 // Reddit API uses form-encoded for most endpoints
90 if (body instanceof URLSearchParams) {
91 opts.headers["Content-Type"] = "application/x-www-form-urlencoded";
92 opts.body = body.toString();
93 } else {
94 opts.body = JSON.stringify(body);
95 }
96 }
97
98 const res = await fetch(url, opts);
99 const data = await res.json().catch(() => ({}));
100
101 if (!res.ok) {
102 throw new Error(`Reddit API ${method} ${path}: ${data.message || data.error || res.status}`);
103 }
104 return data;
105}
106
107// ─────────────────────────────────────────────────────────────────────────
108// VALIDATION
109// ─────────────────────────────────────────────────────────────────────────
110
111function validateConfig(config, direction) {
112 const hasOutput = direction === "output" || direction === "input-output";
113 const hasInput = direction === "input" || direction === "input-output";
114
115 const cid = config.clientId || process.env.REDDIT_CLIENT_ID;
116 const cs = config.clientSecret || process.env.REDDIT_CLIENT_SECRET;
117 const user = config.username || process.env.REDDIT_USERNAME;
118 const pass = config.password || process.env.REDDIT_PASSWORD;
119
120 if (!cid || !cs || !user || !pass) {
121 throw new Error(
122 "Reddit requires REDDIT_CLIENT_ID, REDDIT_CLIENT_SECRET, REDDIT_USERNAME, REDDIT_PASSWORD " +
123 "in .env or per-channel config"
124 );
125 }
126
127 if (hasOutput) {
128 if (!config.subreddit || typeof config.subreddit !== "string") {
129 throw new Error("Reddit output requires a subreddit name (without r/ prefix)");
130 }
131 }
132
133 if (hasInput && config.monitorFilter) {
134 if (typeof config.monitorFilter !== "string" || config.monitorFilter.length > 500) {
135 throw new Error("monitorFilter must be a string under 500 characters");
136 }
137 }
138
139 if (hasInput && !config.subreddit && !config.monitorFilter) {
140 throw new Error("Reddit input requires either a subreddit to monitor or a monitorFilter query");
141 }
142}
143
144function buildEncryptedConfig(config, direction) {
145 const secrets = {};
146 const metadata = {};
147
148 // Per-channel credential overrides
149 if (config.clientId) secrets.clientId = config.clientId;
150 if (config.clientSecret) secrets.clientSecret = config.clientSecret;
151 if (config.username) secrets.username = config.username;
152 if (config.password) secrets.password = config.password;
153
154 if (config.subreddit) metadata.subreddit = config.subreddit.replace(/^r\//, "");
155 if (config.monitorFilter) metadata.monitorFilter = config.monitorFilter;
156
157 return {
158 secrets,
159 metadata,
160 displayIdentifier: config.subreddit ? `r/${config.subreddit.replace(/^r\//, "")}` : config.monitorFilter || "reddit",
161 };
162}
163
164// ─────────────────────────────────────────────────────────────────────────
165// SENDER (submit post or reply to comment)
166// ─────────────────────────────────────────────────────────────────────────
167
168async function send(secrets, metadata, notification) {
169 const creds = getCreds(secrets);
170 if (!creds.clientId || !creds.clientSecret || !creds.username || !creds.password) {
171 throw new Error("Reddit credentials not configured");
172 }
173 const subreddit = metadata.subreddit;
174 if (!subreddit) throw new Error("No subreddit configured for output");
175
176 const title = notification.title || "Tree update";
177 const text = notification.content || "";
178
179 // If this is a reply to a comment (set by input processing)
180 if (notification._replyToCommentFullname) {
181 const params = new URLSearchParams();
182 params.append("api_type", "json");
183 params.append("thing_id", notification._replyToCommentFullname);
184 params.append("text", text.length > 10000 ? text.slice(0, 9997) + "..." : text);
185 await redditApi(creds, "POST", "/api/comment", params);
186 return;
187 }
188
189 // Submit a new self-post
190 const params = new URLSearchParams();
191 params.append("api_type", "json");
192 params.append("kind", "self");
193 params.append("sr", subreddit);
194 params.append("title", title.length > 300 ? title.slice(0, 297) + "..." : title);
195 params.append("text", text.length > 40000 ? text.slice(0, 39997) + "..." : text);
196
197 const result = await redditApi(creds, "POST", "/api/submit", params);
198
199 // Track the post ID for comment monitoring
200 const postUrl = result?.json?.data?.url;
201 const postName = result?.json?.data?.name; // fullname like t3_xxxxx
202 if (postName) {
203 log.verbose("GatewayReddit", `Posted to r/${subreddit}: ${postUrl || postName}`);
204 }
205}
206
207// ─────────────────────────────────────────────────────────────────────────
208// INPUT LIFECYCLE
209// ─────────────────────────────────────────────────────────────────────────
210
211async function registerInput(channel, secrets) {
212 const sub = channel.config?.metadata?.subreddit;
213 const filter = channel.config?.metadata?.monitorFilter;
214 log.info("GatewayReddit",
215 `Reddit input registered for channel ${channel._id}` +
216 (sub ? ` monitoring r/${sub}` : "") +
217 (filter ? ` searching "${filter}"` : "") +
218 `. Poll job will check for new posts and comments.`,
219 );
220}
221
222async function unregisterInput(channel, secrets) {
223 log.verbose("GatewayReddit", `Reddit input unregistered for channel ${channel._id}`);
224}
225
226// Exported for pollJob.js
227export { redditApi, getCreds };
228
229export default {
230 allowedDirections: ["input", "output", "input-output"],
231 validateConfig,
232 buildEncryptedConfig,
233 send,
234 registerInput,
235 unregisterInput,
236};
237
1import log from "../../seed/log.js";
2import handler from "./handler.js";
3import { getExtension } from "../loader.js";
4import { startupScan, stopPolling } from "./pollJob.js";
5
6export async function init(core) {
7 const gateway = getExtension("gateway");
8 if (!gateway?.exports?.registerChannelType) {
9 throw new Error("gateway-reddit requires the gateway extension to be loaded first");
10 }
11
12 gateway.exports.registerChannelType("reddit", handler);
13 log.verbose("GatewayReddit", "Registered Reddit channel type");
14
15 return {
16 jobs: [
17 {
18 name: "gateway-reddit-poll",
19 start: () => { startupScan(); },
20 stop: () => { stopPolling(); },
21 },
22 ],
23 };
24}
25
1export default {
2 name: "gateway-reddit",
3 version: "1.0.1",
4 builtFor: "treeos-connect",
5 description:
6 "Registers the Reddit channel type with the gateway core, enabling trees to publish " +
7 "content to subreddits, respond to comments, and monitor discussions across Reddit. " +
8 "Output channels submit self-posts to a configured subreddit via the Reddit API's " +
9 "/api/submit endpoint. The tree can publish research, summaries, dream outputs, or " +
10 "any notification type as a Reddit post." +
11 "\n\n" +
12 "Input channels operate in two modes depending on configuration. Subreddit monitoring " +
13 "polls /r/{subreddit}/new for new posts and routes each post's text through the gateway " +
14 "pipeline as an inbound message. Keyword monitoring polls /search with a custom query " +
15 "string, catching relevant discussions across all of Reddit regardless of subreddit. " +
16 "Both modes use a timestamp watermark to process only new content since the last poll. " +
17 "Input-output channels monitor comments in the configured subreddit. When a comment " +
18 "arrives, it is processed through the tree orchestrator, and the AI's reply is posted " +
19 "back as a threaded comment via /api/comment. The tree becomes an active participant " +
20 "in subreddit discussions." +
21 "\n\n" +
22 "Authentication uses Reddit's OAuth2 password grant for script-type apps. The token " +
23 "cache is keyed by client ID and username, with automatic refresh before expiration. " +
24 "Credentials can be set globally via environment variables (REDDIT_CLIENT_ID, " +
25 "REDDIT_CLIENT_SECRET, REDDIT_USERNAME, REDDIT_PASSWORD) or overridden per channel " +
26 "for multi-account setups. The poll job runs on a 90-second interval, well within " +
27 "Reddit's rate limits of 100 authenticated requests per minute. Posts are truncated " +
28 "to Reddit's 40,000 character limit for self-posts and 300 characters for titles. " +
29 "Comment replies are capped at 10,000 characters. On startup, the extension scans " +
30 "for enabled Reddit input channels and begins polling automatically.",
31
32 needs: {
33 extensions: ["gateway"],
34 },
35
36 optional: {},
37
38 provides: {
39 models: {},
40 routes: false,
41 tools: false,
42 jobs: true,
43 orchestrator: false,
44 energyActions: {},
45 sessionTypes: {},
46 env: [
47 { key: "REDDIT_CLIENT_ID", required: false, description: "Reddit app client ID (from reddit.com/prefs/apps)" },
48 { key: "REDDIT_CLIENT_SECRET", required: false, secret: true, description: "Reddit app client secret" },
49 { key: "REDDIT_USERNAME", required: false, description: "Reddit bot account username" },
50 { key: "REDDIT_PASSWORD", required: false, secret: true, description: "Reddit bot account password" },
51 ],
52 cli: [],
53 },
54};
55
1// Reddit poll job.
2//
3// Polls two endpoints depending on channel config:
4//
5// 1. /r/{subreddit}/comments (input-output): monitors comments on posts
6// in the subreddit. When the bot is the post author, incoming comments
7// are gateway messages. The AI responds as a threaded comment.
8//
9// 2. /search?q={query} (input-only listening): monitors keyword/topic
10// matches across Reddit. Each matching post becomes a gateway message.
11// Rain from the cloud layer.
12//
13// 3. /r/{subreddit}/new (input): monitors new posts in a subreddit.
14// Each new post text becomes a gateway message.
15//
16// Rate limit: 100 req/min for authenticated, 10/min for search.
17// Poll interval: 90 seconds. Well within limits.
18
19import log from "../../seed/log.js";
20import { redditApi, getCreds } from "./handler.js";
21
22// Map<channelId, { creds, channelDoc, sinceTimestamp, trackedPosts }>
23const activeChannels = new Map();
24
25let pollTimer = null;
26const POLL_INTERVAL_MS = 90000; // 90 seconds (Reddit is stricter than X)
27
28export function connectChannel(channelId, channel, secrets) {
29 const creds = getCreds(secrets);
30 const subreddit = channel.config?.metadata?.subreddit;
31 const monitorFilter = channel.config?.metadata?.monitorFilter;
32
33 if (!subreddit && !monitorFilter) {
34 log.warn("GatewayReddit", `Channel ${channelId} has no subreddit or monitorFilter, skipping`);
35 return;
36 }
37
38 activeChannels.set(channelId, {
39 creds,
40 channelDoc: channel,
41 subreddit,
42 monitorFilter,
43 sinceTimestamp: Math.floor(Date.now() / 1000),
44 trackedPosts: new Set(), // fullnames of posts we authored (for comment tracking)
45 });
46}
47
48export function disconnectChannel(channelId) {
49 activeChannels.delete(channelId);
50}
51
52export async function startupScan() {
53 try {
54 const { getExtension } = await import("../loader.js");
55 const GatewayChannel = getExtension("gateway")?.exports?.GatewayChannel;
56 const channels = await GatewayChannel.find({
57 type: "reddit",
58 enabled: true,
59 direction: { $in: ["input", "input-output"] },
60 }).lean();
61
62 if (channels.length === 0) return;
63
64 const gateway = getExtension("gateway");
65 if (!gateway?.exports?.getChannelWithSecrets) return;
66
67 for (const ch of channels) {
68 try {
69 const full = await gateway.exports.getChannelWithSecrets(ch._id);
70 if (!full) continue;
71 connectChannel(ch._id.toString(), ch, full.config?.decryptedSecrets || {});
72 } catch (err) {
73 log.warn("GatewayReddit", `Failed to connect channel ${ch._id}: ${err.message}`);
74 }
75 }
76
77 if (activeChannels.size > 0) {
78 startPolling();
79 log.verbose("GatewayReddit", `Poll job: ${activeChannels.size} channel(s) connected`);
80 }
81 } catch (err) {
82 log.error("GatewayReddit", `Startup scan failed: ${err.message}`);
83 }
84}
85
86export function stopPolling() {
87 if (pollTimer) {
88 clearInterval(pollTimer);
89 pollTimer = null;
90 }
91 activeChannels.clear();
92}
93
94function startPolling() {
95 if (pollTimer) return;
96 pollTimer = setInterval(pollAll, POLL_INTERVAL_MS);
97 if (pollTimer.unref) pollTimer.unref();
98 pollAll();
99}
100
101// ─────────────────────────────────────────────────────────────────────────
102// POLL LOOP
103// ─────────────────────────────────────────────────────────────────────────
104
105async function pollAll() {
106 for (const [channelId, info] of activeChannels) {
107 try {
108 const direction = info.channelDoc.direction;
109
110 if (direction === "input-output" && info.subreddit) {
111 // Monitor comments on our posts in the subreddit
112 await pollComments(channelId, info);
113 } else if (info.monitorFilter) {
114 // Search-based listening
115 await pollSearch(channelId, info);
116 } else if (info.subreddit) {
117 // Monitor new posts in subreddit
118 await pollNewPosts(channelId, info);
119 }
120 } catch (err) {
121 log.debug("GatewayReddit", `Poll error for channel ${channelId}: ${err.message}`);
122 }
123 }
124}
125
126/**
127 * Poll comments on the bot's recent posts in the subreddit.
128 * Checks /r/{sub}/comments for new comments, filters to ones
129 * on posts authored by the bot account.
130 */
131async function pollComments(channelId, info) {
132 const path = `/r/${info.subreddit}/comments?limit=25&raw_json=1`;
133 const data = await redditApi(info.creds, "GET", path);
134
135 const comments = data?.data?.children || [];
136 if (comments.length === 0) return;
137
138 const botUsername = info.creds.username.toLowerCase();
139
140 for (const item of comments) {
141 const comment = item.data;
142 if (!comment || item.kind !== "t1") continue;
143
144 // Skip our own comments
145 if (comment.author?.toLowerCase() === botUsername) continue;
146
147 // Only process comments newer than our watermark
148 if (comment.created_utc <= info.sinceTimestamp) continue;
149
150 // Process all comments in the subreddit (not just on our posts).
151 // The gateway core handles the rest.
152 await processRedditItem(channelId, info, {
153 author: comment.author,
154 text: comment.body,
155 fullname: comment.name, // t1_xxxxx
156 created: comment.created_utc,
157 isComment: true,
158 });
159 }
160
161 // Update watermark to newest
162 if (comments.length > 0 && comments[0].data?.created_utc) {
163 info.sinceTimestamp = comments[0].data.created_utc;
164 }
165}
166
167/**
168 * Poll /search for keyword/topic monitoring.
169 */
170async function pollSearch(channelId, info) {
171 const query = encodeURIComponent(info.monitorFilter);
172 const path = `/search?q=${query}&sort=new&limit=10&type=link&raw_json=1&restrict_sr=false`;
173 const data = await redditApi(info.creds, "GET", path);
174
175 const posts = data?.data?.children || [];
176 for (const item of posts) {
177 const post = item.data;
178 if (!post || item.kind !== "t3") continue;
179 if (post.created_utc <= info.sinceTimestamp) continue;
180
181 await processRedditItem(channelId, info, {
182 author: post.author,
183 text: post.selftext ? `[${post.title}] ${post.selftext}` : post.title,
184 fullname: post.name,
185 created: post.created_utc,
186 isComment: false,
187 subreddit: post.subreddit,
188 });
189 }
190
191 if (posts.length > 0 && posts[0].data?.created_utc) {
192 info.sinceTimestamp = posts[0].data.created_utc;
193 }
194}
195
196/**
197 * Poll /r/{sub}/new for new posts.
198 */
199async function pollNewPosts(channelId, info) {
200 const path = `/r/${info.subreddit}/new?limit=10&raw_json=1`;
201 const data = await redditApi(info.creds, "GET", path);
202
203 const posts = data?.data?.children || [];
204 const botUsername = info.creds.username.toLowerCase();
205
206 for (const item of posts) {
207 const post = item.data;
208 if (!post || item.kind !== "t3") continue;
209 if (post.created_utc <= info.sinceTimestamp) continue;
210 if (post.author?.toLowerCase() === botUsername) continue; // skip our own
211
212 await processRedditItem(channelId, info, {
213 author: post.author,
214 text: post.selftext ? `[${post.title}] ${post.selftext}` : post.title,
215 fullname: post.name,
216 created: post.created_utc,
217 isComment: false,
218 });
219 }
220
221 if (posts.length > 0 && posts[0].data?.created_utc) {
222 info.sinceTimestamp = posts[0].data.created_utc;
223 }
224}
225
226/**
227 * Process a single Reddit post or comment through the gateway.
228 */
229async function processRedditItem(channelId, info, item) {
230 const messageText = item.text?.trim();
231 if (!messageText) return;
232
233 log.verbose("GatewayReddit",
234 `Reddit ${item.isComment ? "comment" : "post"} on channel ${channelId} from u/${item.author}: "${messageText.slice(0, 80)}"`,
235 );
236
237 try {
238 const { getExtension } = await import("../loader.js");
239 const gateway = getExtension("gateway");
240 if (!gateway?.exports?.processGatewayMessage) return;
241
242 const result = await gateway.exports.processGatewayMessage(channelId, {
243 senderName: item.author,
244 senderPlatformId: item.author,
245 messageText,
246 });
247
248 // Reply as a comment if input-output and there's a reply
249 const channel = info.channelDoc;
250 if (result.reply && channel.direction === "input-output" && item.fullname) {
251 let replyText = result.reply;
252 if (replyText.length > 10000) replyText = replyText.slice(0, 9997) + "...";
253
254 const params = new URLSearchParams();
255 params.append("api_type", "json");
256 params.append("thing_id", item.fullname);
257 params.append("text", replyText);
258
259 await redditApi(info.creds, "POST", "/api/comment", params);
260 log.verbose("GatewayReddit", `Replied to ${item.fullname} on channel ${channelId}`);
261 }
262 } catch (err) {
263 log.error("GatewayReddit", `Error processing Reddit item on channel ${channelId}: ${err.message}`);
264 }
265}
266
Loading comments...