diff --git a/packages/backend/package.json b/packages/backend/package.json index df4fb2b96e..0a5e03539f 100644 --- a/packages/backend/package.json +++ b/packages/backend/package.json @@ -156,6 +156,7 @@ "pkce-challenge": "4.1.0", "probe-image-size": "7.2.3", "promise-limit": "2.7.0", + "proxy-addr": "^2.0.7", "pug": "3.0.3", "punycode": "2.3.1", "qrcode": "1.5.3", @@ -214,6 +215,7 @@ "@types/oauth2orize": "1.11.5", "@types/oauth2orize-pkce": "0.1.2", "@types/pg": "8.11.6", + "@types/proxy-addr": "^2.0.3", "@types/pug": "2.0.10", "@types/punycode": "2.1.4", "@types/qrcode": "1.5.5", diff --git a/packages/backend/src/daemons/ServerStatsService.ts b/packages/backend/src/daemons/ServerStatsService.ts index 2c70344c94..0be2149a0a 100644 --- a/packages/backend/src/daemons/ServerStatsService.ts +++ b/packages/backend/src/daemons/ServerStatsService.ts @@ -108,5 +108,6 @@ async function net() { // FS STAT async function fs() { - return await si.disksIO().catch(() => ({ rIO_sec: 0, wIO_sec: 0 })); + const io = await si.disksIO().catch(() => null); + return io ?? { rIO_sec: 0, wIO_sec: 0 }; } diff --git a/packages/backend/src/server/api/StreamingApiServerService.ts b/packages/backend/src/server/api/StreamingApiServerService.ts index b8f448477b..2070ab6106 100644 --- a/packages/backend/src/server/api/StreamingApiServerService.ts +++ b/packages/backend/src/server/api/StreamingApiServerService.ts @@ -19,7 +19,15 @@ import { ChannelFollowingService } from '@/core/ChannelFollowingService.js'; import { AuthenticateService, AuthenticationError } from './AuthenticateService.js'; import MainStreamConnection from './stream/Connection.js'; import { ChannelsService } from './stream/ChannelsService.js'; +import { RateLimiterService } from './RateLimiterService.js'; +import { RoleService } from '@/core/RoleService.js'; +import { getIpHash } from '@/misc/get-ip-hash.js'; +import proxyAddr from 'proxy-addr'; +import ms from 'ms'; import type * as http from 'node:http'; +import type { IEndpointMeta } from './endpoints.js'; +import { LoggerService } from '@/core/LoggerService.js'; +import type Logger from '@/logger.js'; @Injectable() export class StreamingApiServerService { @@ -41,9 +49,35 @@ export class StreamingApiServerService { private notificationService: NotificationService, private usersService: UserService, private channelFollowingService: ChannelFollowingService, + private rateLimiterService: RateLimiterService, + private roleService: RoleService, + private loggerService: LoggerService, ) { } + @bindThis + private async rateLimitThis( + user: MiLocalUser | null | undefined, + requestIp: string | undefined, + limit: IEndpointMeta['limit'] & { key: NonNullable }, + ) : Promise { + let limitActor: string; + if (user) { + limitActor = user.id; + } else { + limitActor = getIpHash(requestIp || 'wtf'); + } + + const factor = user ? (await this.roleService.getUserPolicies(user.id)).rateLimitFactor : 1; + + if (factor <= 0) return false; + + // Rate limit + return await this.rateLimiterService.limit(limit, limitActor, factor) + .then(() => { return false; }) + .catch(err => { return true; }); + } + @bindThis public attach(server: http.Server): void { this.#wss = new WebSocket.WebSocketServer({ @@ -57,6 +91,22 @@ export class StreamingApiServerService { return; } + // ServerServices sets `trustProxy: true`, which inside + // fastify/request.js ends up calling `proxyAddr` in this way, + // so we do the same + const requestIp = proxyAddr(request, () => { return true; } ); + + if (await this.rateLimitThis(null, requestIp, { + key: 'wsconnect', + duration: ms('5min'), + max: 32, + minInterval: ms('1sec'), + })) { + socket.write('HTTP/1.1 429 Rate Limit Exceeded\r\n\r\n'); + socket.destroy(); + return; + } + const q = new URL(request.url, `http://${request.headers.host}`).searchParams; let user: MiLocalUser | null = null; @@ -94,13 +144,23 @@ export class StreamingApiServerService { return; } + const rateLimiter = () => { + return this.rateLimitThis(user, requestIp, { + key: 'wsmessage', + duration: ms('5sec'), + max: 256, + }); + }; + const stream = new MainStreamConnection( this.channelsService, this.noteReadService, this.notificationService, this.cacheService, this.channelFollowingService, - user, app, + this.loggerService, + user, app, requestIp, + rateLimiter, ); await stream.init(); diff --git a/packages/backend/src/server/api/stream/Connection.ts b/packages/backend/src/server/api/stream/Connection.ts index c24459c1e1..8254dcb84f 100644 --- a/packages/backend/src/server/api/stream/Connection.ts +++ b/packages/backend/src/server/api/stream/Connection.ts @@ -18,6 +18,10 @@ import type { JsonObject } from '@/misc/json-value.js'; import type { ChannelsService } from './ChannelsService.js'; import type { EventEmitter } from 'events'; import type Channel from './channel.js'; +import { LoggerService } from '@/core/LoggerService.js'; +import type Logger from '@/logger.js'; + +const MAX_CHANNELS_PER_CONNECTION = 32; /** * Main stream connection @@ -26,6 +30,7 @@ import type Channel from './channel.js'; export default class Connection { public user?: MiUser; public token?: MiAccessToken; + private rateLimiter?: () => Promise; private wsConnection: WebSocket.WebSocket; public subscriber: StreamEventEmitter; private channels: Channel[] = []; @@ -39,6 +44,9 @@ export default class Connection { public userIdsWhoMeMutingRenotes: Set = new Set(); public userMutedInstances: Set = new Set(); private fetchIntervalId: NodeJS.Timeout | null = null; + private activeRateLimitRequests: number = 0; + private closingConnection: boolean = false; + private logger: Logger; constructor( private channelsService: ChannelsService, @@ -46,12 +54,18 @@ export default class Connection { private notificationService: NotificationService, private cacheService: CacheService, private channelFollowingService: ChannelFollowingService, + loggerService: LoggerService, user: MiUser | null | undefined, token: MiAccessToken | null | undefined, + private ip: string, + rateLimiter: () => Promise, ) { if (user) this.user = user; if (token) this.token = token; + if (rateLimiter) this.rateLimiter = rateLimiter; + + this.logger = loggerService.getLogger('streaming', 'coral'); } @bindThis @@ -104,6 +118,27 @@ export default class Connection { private async onWsConnectionMessage(data: WebSocket.RawData) { let obj: JsonObject; + if (this.closingConnection) return; + + if (this.rateLimiter) { + if (this.activeRateLimitRequests <= 128) { + this.activeRateLimitRequests++; + const shouldRateLimit = await this.rateLimiter(); + this.activeRateLimitRequests--; + + if (shouldRateLimit) return; + if (this.closingConnection) return; + } else { + let connectionInfo = `IP ${this.ip}`; + if (this.user) connectionInfo += `, user ID ${this.user.id}`; + + this.logger.warn(`Closing a connection (${connectionInfo}) due to an excessive influx of messages.`); + this.closingConnection = true; + this.wsConnection.close(1008, 'Please stop spamming the streaming API.'); + return; + } + } + try { obj = JSON.parse(data.toString()); } catch (e) { @@ -263,6 +298,10 @@ export default class Connection { */ @bindThis public connectChannel(id: string, params: JsonObject | undefined, channel: string, pong = false) { + if (this.channels.length >= MAX_CHANNELS_PER_CONNECTION) { + return; + } + const channelService = this.channelsService.getChannelService(channel); if (channelService.requireCredential && this.user == null) { diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index b2c4d9ba47..94434719c6 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -358,6 +358,9 @@ importers: promise-limit: specifier: 2.7.0 version: 2.7.0 + proxy-addr: + specifier: ^2.0.7 + version: 2.0.7 pug: specifier: 3.0.3 version: 3.0.3 @@ -609,6 +612,9 @@ importers: '@types/pg': specifier: 8.11.6 version: 8.11.6 + '@types/proxy-addr': + specifier: ^2.0.3 + version: 2.0.3 '@types/pug': specifier: 2.0.10 version: 2.0.10 @@ -5126,6 +5132,9 @@ packages: '@types/prop-types@15.7.5': resolution: {integrity: sha512-JCB8C6SnDoQf0cNycqd/35A7MjcnK+ZTqE7judS6o7utxUCg6imJg3QK2qzHKszlTjcj2cn+NwMB2i96ubpj7w==} + '@types/proxy-addr@2.0.3': + resolution: {integrity: sha512-TgAHHO4tNG3HgLTUhB+hM4iwW6JUNeQHCLnF1DjaDA9c69PN+IasoFu2MYDhubFc+ZIw5c5t9DMtjvrD6R3Egg==} + '@types/pug@2.0.10': resolution: {integrity: sha512-Sk/uYFOBAB7mb74XcpizmH0KOR2Pv3D2Hmrh1Dmy5BmK3MpdSa5kqZcg6EKBdklU0bFXX9gCfzvpnyUehrPIuA==} @@ -16873,6 +16882,10 @@ snapshots: '@types/prop-types@15.7.5': {} + '@types/proxy-addr@2.0.3': + dependencies: + '@types/node': 20.14.12 + '@types/pug@2.0.10': {} '@types/punycode@2.1.4': {}