mirror of
https://activitypub.software/TransFem-org/Sharkey
synced 2024-11-22 05:55:12 +00:00
merge: Rate limiting for websockets (!598)
View MR for information: https://activitypub.software/TransFem-org/Sharkey/-/merge_requests/598 Approved-by: Marie <2-Marie@users.noreply.activitypub.software> Approved-by: Amelia Yukii <amelia.yukii@shourai.de>
This commit is contained in:
commit
d92402554b
4 changed files with 119 additions and 5 deletions
|
@ -156,6 +156,7 @@
|
||||||
"pkce-challenge": "4.1.0",
|
"pkce-challenge": "4.1.0",
|
||||||
"probe-image-size": "7.2.3",
|
"probe-image-size": "7.2.3",
|
||||||
"promise-limit": "2.7.0",
|
"promise-limit": "2.7.0",
|
||||||
|
"proxy-addr": "^2.0.7",
|
||||||
"pug": "3.0.2",
|
"pug": "3.0.2",
|
||||||
"punycode": "2.3.1",
|
"punycode": "2.3.1",
|
||||||
"qrcode": "1.5.3",
|
"qrcode": "1.5.3",
|
||||||
|
@ -216,6 +217,7 @@
|
||||||
"@types/oauth2orize": "1.11.5",
|
"@types/oauth2orize": "1.11.5",
|
||||||
"@types/oauth2orize-pkce": "0.1.2",
|
"@types/oauth2orize-pkce": "0.1.2",
|
||||||
"@types/pg": "8.11.5",
|
"@types/pg": "8.11.5",
|
||||||
|
"@types/proxy-addr": "^2.0.3",
|
||||||
"@types/pug": "2.0.10",
|
"@types/pug": "2.0.10",
|
||||||
"@types/punycode": "2.1.4",
|
"@types/punycode": "2.1.4",
|
||||||
"@types/qrcode": "1.5.5",
|
"@types/qrcode": "1.5.5",
|
||||||
|
|
|
@ -19,7 +19,15 @@ import { ChannelFollowingService } from '@/core/ChannelFollowingService.js';
|
||||||
import { AuthenticateService, AuthenticationError } from './AuthenticateService.js';
|
import { AuthenticateService, AuthenticationError } from './AuthenticateService.js';
|
||||||
import MainStreamConnection from './stream/Connection.js';
|
import MainStreamConnection from './stream/Connection.js';
|
||||||
import { ChannelsService } from './stream/ChannelsService.js';
|
import { ChannelsService } from './stream/ChannelsService.js';
|
||||||
|
import { RateLimiterService } from './RateLimiterService.js';
|
||||||
|
import { RoleService } from '@/core/RoleService.js';
|
||||||
|
import { getIpHash } from '@/misc/get-ip-hash.js';
|
||||||
|
import proxyAddr from 'proxy-addr';
|
||||||
|
import ms from 'ms';
|
||||||
import type * as http from 'node:http';
|
import type * as http from 'node:http';
|
||||||
|
import type { IEndpointMeta } from './endpoints.js';
|
||||||
|
import { LoggerService } from '@/core/LoggerService.js';
|
||||||
|
import type Logger from '@/logger.js';
|
||||||
|
|
||||||
@Injectable()
|
@Injectable()
|
||||||
export class StreamingApiServerService {
|
export class StreamingApiServerService {
|
||||||
|
@ -41,9 +49,35 @@ export class StreamingApiServerService {
|
||||||
private notificationService: NotificationService,
|
private notificationService: NotificationService,
|
||||||
private usersService: UserService,
|
private usersService: UserService,
|
||||||
private channelFollowingService: ChannelFollowingService,
|
private channelFollowingService: ChannelFollowingService,
|
||||||
|
private rateLimiterService: RateLimiterService,
|
||||||
|
private roleService: RoleService,
|
||||||
|
private loggerService: LoggerService,
|
||||||
) {
|
) {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@bindThis
|
||||||
|
private async rateLimitThis(
|
||||||
|
user: MiLocalUser | null | undefined,
|
||||||
|
requestIp: string | undefined,
|
||||||
|
limit: IEndpointMeta['limit'] & { key: NonNullable<string> },
|
||||||
|
) : Promise<boolean> {
|
||||||
|
let limitActor: string;
|
||||||
|
if (user) {
|
||||||
|
limitActor = user.id;
|
||||||
|
} else {
|
||||||
|
limitActor = getIpHash(requestIp || 'wtf');
|
||||||
|
}
|
||||||
|
|
||||||
|
const factor = user ? (await this.roleService.getUserPolicies(user.id)).rateLimitFactor : 1;
|
||||||
|
|
||||||
|
if (factor <= 0) return false;
|
||||||
|
|
||||||
|
// Rate limit
|
||||||
|
return await this.rateLimiterService.limit(limit, limitActor, factor)
|
||||||
|
.then(() => { return false; })
|
||||||
|
.catch(err => { return true; });
|
||||||
|
}
|
||||||
|
|
||||||
@bindThis
|
@bindThis
|
||||||
public attach(server: http.Server): void {
|
public attach(server: http.Server): void {
|
||||||
this.#wss = new WebSocket.WebSocketServer({
|
this.#wss = new WebSocket.WebSocketServer({
|
||||||
|
@ -57,6 +91,22 @@ export class StreamingApiServerService {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ServerServices sets `trustProxy: true`, which inside
|
||||||
|
// fastify/request.js ends up calling `proxyAddr` in this way,
|
||||||
|
// so we do the same
|
||||||
|
const requestIp = proxyAddr(request, () => { return true; } );
|
||||||
|
|
||||||
|
if (await this.rateLimitThis(null, requestIp, {
|
||||||
|
key: 'wsconnect',
|
||||||
|
duration: ms('5min'),
|
||||||
|
max: 32,
|
||||||
|
minInterval: ms('1sec'),
|
||||||
|
})) {
|
||||||
|
socket.write('HTTP/1.1 429 Rate Limit Exceeded\r\n\r\n');
|
||||||
|
socket.destroy();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
const q = new URL(request.url, `http://${request.headers.host}`).searchParams;
|
const q = new URL(request.url, `http://${request.headers.host}`).searchParams;
|
||||||
|
|
||||||
let user: MiLocalUser | null = null;
|
let user: MiLocalUser | null = null;
|
||||||
|
@ -94,13 +144,23 @@ export class StreamingApiServerService {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const rateLimiter = () => {
|
||||||
|
return this.rateLimitThis(user, requestIp, {
|
||||||
|
key: 'wsmessage',
|
||||||
|
duration: ms('5sec'),
|
||||||
|
max: 256,
|
||||||
|
});
|
||||||
|
};
|
||||||
|
|
||||||
const stream = new MainStreamConnection(
|
const stream = new MainStreamConnection(
|
||||||
this.channelsService,
|
this.channelsService,
|
||||||
this.noteReadService,
|
this.noteReadService,
|
||||||
this.notificationService,
|
this.notificationService,
|
||||||
this.cacheService,
|
this.cacheService,
|
||||||
this.channelFollowingService,
|
this.channelFollowingService,
|
||||||
user, app,
|
this.loggerService,
|
||||||
|
user, app, requestIp,
|
||||||
|
rateLimiter,
|
||||||
);
|
);
|
||||||
|
|
||||||
await stream.init();
|
await stream.init();
|
||||||
|
|
|
@ -17,6 +17,10 @@ import { ChannelFollowingService } from '@/core/ChannelFollowingService.js';
|
||||||
import type { ChannelsService } from './ChannelsService.js';
|
import type { ChannelsService } from './ChannelsService.js';
|
||||||
import type { EventEmitter } from 'events';
|
import type { EventEmitter } from 'events';
|
||||||
import type Channel from './channel.js';
|
import type Channel from './channel.js';
|
||||||
|
import { LoggerService } from '@/core/LoggerService.js';
|
||||||
|
import type Logger from '@/logger.js';
|
||||||
|
|
||||||
|
const MAX_CHANNELS_PER_CONNECTION = 32;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Main stream connection
|
* Main stream connection
|
||||||
|
@ -25,6 +29,7 @@ import type Channel from './channel.js';
|
||||||
export default class Connection {
|
export default class Connection {
|
||||||
public user?: MiUser;
|
public user?: MiUser;
|
||||||
public token?: MiAccessToken;
|
public token?: MiAccessToken;
|
||||||
|
private rateLimiter?: () => Promise<boolean>;
|
||||||
private wsConnection: WebSocket.WebSocket;
|
private wsConnection: WebSocket.WebSocket;
|
||||||
public subscriber: StreamEventEmitter;
|
public subscriber: StreamEventEmitter;
|
||||||
private channels: Channel[] = [];
|
private channels: Channel[] = [];
|
||||||
|
@ -38,6 +43,9 @@ export default class Connection {
|
||||||
public userIdsWhoMeMutingRenotes: Set<string> = new Set();
|
public userIdsWhoMeMutingRenotes: Set<string> = new Set();
|
||||||
public userMutedInstances: Set<string> = new Set();
|
public userMutedInstances: Set<string> = new Set();
|
||||||
private fetchIntervalId: NodeJS.Timeout | null = null;
|
private fetchIntervalId: NodeJS.Timeout | null = null;
|
||||||
|
private activeRateLimitRequests: number = 0;
|
||||||
|
private closingConnection: boolean = false;
|
||||||
|
private logger: Logger;
|
||||||
|
|
||||||
constructor(
|
constructor(
|
||||||
private channelsService: ChannelsService,
|
private channelsService: ChannelsService,
|
||||||
|
@ -45,12 +53,18 @@ export default class Connection {
|
||||||
private notificationService: NotificationService,
|
private notificationService: NotificationService,
|
||||||
private cacheService: CacheService,
|
private cacheService: CacheService,
|
||||||
private channelFollowingService: ChannelFollowingService,
|
private channelFollowingService: ChannelFollowingService,
|
||||||
|
loggerService: LoggerService,
|
||||||
|
|
||||||
user: MiUser | null | undefined,
|
user: MiUser | null | undefined,
|
||||||
token: MiAccessToken | null | undefined,
|
token: MiAccessToken | null | undefined,
|
||||||
|
private ip: string,
|
||||||
|
rateLimiter: () => Promise<boolean>,
|
||||||
) {
|
) {
|
||||||
if (user) this.user = user;
|
if (user) this.user = user;
|
||||||
if (token) this.token = token;
|
if (token) this.token = token;
|
||||||
|
if (rateLimiter) this.rateLimiter = rateLimiter;
|
||||||
|
|
||||||
|
this.logger = loggerService.getLogger('streaming', 'coral', false);
|
||||||
}
|
}
|
||||||
|
|
||||||
@bindThis
|
@bindThis
|
||||||
|
@ -103,6 +117,27 @@ export default class Connection {
|
||||||
private async onWsConnectionMessage(data: WebSocket.RawData) {
|
private async onWsConnectionMessage(data: WebSocket.RawData) {
|
||||||
let obj: Record<string, any>;
|
let obj: Record<string, any>;
|
||||||
|
|
||||||
|
if (this.closingConnection) return;
|
||||||
|
|
||||||
|
if (this.rateLimiter) {
|
||||||
|
if (this.activeRateLimitRequests <= 128) {
|
||||||
|
this.activeRateLimitRequests++;
|
||||||
|
const shouldRateLimit = await this.rateLimiter();
|
||||||
|
this.activeRateLimitRequests--;
|
||||||
|
|
||||||
|
if (shouldRateLimit) return;
|
||||||
|
if (this.closingConnection) return;
|
||||||
|
} else {
|
||||||
|
let connectionInfo = `IP ${this.ip}`;
|
||||||
|
if (this.user) connectionInfo += `, user ID ${this.user.id}`;
|
||||||
|
|
||||||
|
this.logger.warn(`Closing a connection (${connectionInfo}) due to an excessive influx of messages.`);
|
||||||
|
this.closingConnection = true;
|
||||||
|
this.wsConnection.close(1008, 'Please stop spamming the streaming API.');
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
obj = JSON.parse(data.toString());
|
obj = JSON.parse(data.toString());
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
|
@ -254,6 +289,10 @@ export default class Connection {
|
||||||
*/
|
*/
|
||||||
@bindThis
|
@bindThis
|
||||||
public connectChannel(id: string, params: any, channel: string, pong = false) {
|
public connectChannel(id: string, params: any, channel: string, pong = false) {
|
||||||
|
if (this.channels.length >= MAX_CHANNELS_PER_CONNECTION) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
const channelService = this.channelsService.getChannelService(channel);
|
const channelService = this.channelsService.getChannelService(channel);
|
||||||
|
|
||||||
if (channelService.requireCredential && this.user == null) {
|
if (channelService.requireCredential && this.user == null) {
|
||||||
|
|
|
@ -352,6 +352,9 @@ importers:
|
||||||
promise-limit:
|
promise-limit:
|
||||||
specifier: 2.7.0
|
specifier: 2.7.0
|
||||||
version: 2.7.0
|
version: 2.7.0
|
||||||
|
proxy-addr:
|
||||||
|
specifier: ^2.0.7
|
||||||
|
version: 2.0.7
|
||||||
pug:
|
pug:
|
||||||
specifier: 3.0.2
|
specifier: 3.0.2
|
||||||
version: 3.0.2
|
version: 3.0.2
|
||||||
|
@ -609,6 +612,9 @@ importers:
|
||||||
'@types/pg':
|
'@types/pg':
|
||||||
specifier: 8.11.5
|
specifier: 8.11.5
|
||||||
version: 8.11.5
|
version: 8.11.5
|
||||||
|
'@types/proxy-addr':
|
||||||
|
specifier: ^2.0.3
|
||||||
|
version: 2.0.3
|
||||||
'@types/pug':
|
'@types/pug':
|
||||||
specifier: 2.0.10
|
specifier: 2.0.10
|
||||||
version: 2.0.10
|
version: 2.0.10
|
||||||
|
@ -4728,6 +4734,9 @@ packages:
|
||||||
'@types/prop-types@15.7.5':
|
'@types/prop-types@15.7.5':
|
||||||
resolution: {integrity: sha512-JCB8C6SnDoQf0cNycqd/35A7MjcnK+ZTqE7judS6o7utxUCg6imJg3QK2qzHKszlTjcj2cn+NwMB2i96ubpj7w==}
|
resolution: {integrity: sha512-JCB8C6SnDoQf0cNycqd/35A7MjcnK+ZTqE7judS6o7utxUCg6imJg3QK2qzHKszlTjcj2cn+NwMB2i96ubpj7w==}
|
||||||
|
|
||||||
|
'@types/proxy-addr@2.0.3':
|
||||||
|
resolution: {integrity: sha512-TgAHHO4tNG3HgLTUhB+hM4iwW6JUNeQHCLnF1DjaDA9c69PN+IasoFu2MYDhubFc+ZIw5c5t9DMtjvrD6R3Egg==}
|
||||||
|
|
||||||
'@types/pug@2.0.10':
|
'@types/pug@2.0.10':
|
||||||
resolution: {integrity: sha512-Sk/uYFOBAB7mb74XcpizmH0KOR2Pv3D2Hmrh1Dmy5BmK3MpdSa5kqZcg6EKBdklU0bFXX9gCfzvpnyUehrPIuA==}
|
resolution: {integrity: sha512-Sk/uYFOBAB7mb74XcpizmH0KOR2Pv3D2Hmrh1Dmy5BmK3MpdSa5kqZcg6EKBdklU0bFXX9gCfzvpnyUehrPIuA==}
|
||||||
|
|
||||||
|
@ -11112,8 +11121,8 @@ packages:
|
||||||
vue-component-type-helpers@2.0.16:
|
vue-component-type-helpers@2.0.16:
|
||||||
resolution: {integrity: sha512-qisL/iAfdO++7w+SsfYQJVPj6QKvxp4i1MMxvsNO41z/8zu3KuAw9LkhKUfP/kcOWGDxESp+pQObWppXusejCA==}
|
resolution: {integrity: sha512-qisL/iAfdO++7w+SsfYQJVPj6QKvxp4i1MMxvsNO41z/8zu3KuAw9LkhKUfP/kcOWGDxESp+pQObWppXusejCA==}
|
||||||
|
|
||||||
vue-component-type-helpers@2.0.26:
|
vue-component-type-helpers@2.0.29:
|
||||||
resolution: {integrity: sha512-sO9qQ8oC520SW6kqlls0iqDak53gsTVSrYylajgjmkt1c0vcgjsGSy1KzlDrbEx8pm02IEYhlUkU5hCYf8rwtg==}
|
resolution: {integrity: sha512-58i+ZhUAUpwQ+9h5Hck0D+jr1qbYl4voRt5KffBx8qzELViQ4XdT/Tuo+mzq8u63teAG8K0lLaOiL5ofqW38rg==}
|
||||||
|
|
||||||
vue-demi@0.14.7:
|
vue-demi@0.14.7:
|
||||||
resolution: {integrity: sha512-EOG8KXDQNwkJILkx/gPcoL/7vH+hORoBaKgGe+6W7VFMvCYJfmF2dGbvgDroVnI8LU7/kTu8mbjRZGBU1z9NTA==}
|
resolution: {integrity: sha512-EOG8KXDQNwkJILkx/gPcoL/7vH+hORoBaKgGe+6W7VFMvCYJfmF2dGbvgDroVnI8LU7/kTu8mbjRZGBU1z9NTA==}
|
||||||
|
@ -15456,7 +15465,7 @@ snapshots:
|
||||||
ts-dedent: 2.2.0
|
ts-dedent: 2.2.0
|
||||||
type-fest: 2.19.0
|
type-fest: 2.19.0
|
||||||
vue: 3.4.26(typescript@5.4.5)
|
vue: 3.4.26(typescript@5.4.5)
|
||||||
vue-component-type-helpers: 2.0.26
|
vue-component-type-helpers: 2.0.29
|
||||||
transitivePeerDependencies:
|
transitivePeerDependencies:
|
||||||
- encoding
|
- encoding
|
||||||
- supports-color
|
- supports-color
|
||||||
|
@ -15971,6 +15980,10 @@ snapshots:
|
||||||
|
|
||||||
'@types/prop-types@15.7.5': {}
|
'@types/prop-types@15.7.5': {}
|
||||||
|
|
||||||
|
'@types/proxy-addr@2.0.3':
|
||||||
|
dependencies:
|
||||||
|
'@types/node': 20.12.7
|
||||||
|
|
||||||
'@types/pug@2.0.10': {}
|
'@types/pug@2.0.10': {}
|
||||||
|
|
||||||
'@types/punycode@2.1.4': {}
|
'@types/punycode@2.1.4': {}
|
||||||
|
@ -23655,7 +23668,7 @@ snapshots:
|
||||||
|
|
||||||
vue-component-type-helpers@2.0.16: {}
|
vue-component-type-helpers@2.0.16: {}
|
||||||
|
|
||||||
vue-component-type-helpers@2.0.26: {}
|
vue-component-type-helpers@2.0.29: {}
|
||||||
|
|
||||||
vue-demi@0.14.7(vue@3.4.26(typescript@5.4.5)):
|
vue-demi@0.14.7(vue@3.4.26(typescript@5.4.5)):
|
||||||
dependencies:
|
dependencies:
|
||||||
|
|
Loading…
Reference in a new issue