Merge branch 'develop' into feature/misskey-2024.07

This commit is contained in:
dakkar 2024-08-18 13:13:23 +01:00
commit a58df8ac7c
5 changed files with 117 additions and 2 deletions

View file

@ -156,6 +156,7 @@
"pkce-challenge": "4.1.0",
"probe-image-size": "7.2.3",
"promise-limit": "2.7.0",
"proxy-addr": "^2.0.7",
"pug": "3.0.3",
"punycode": "2.3.1",
"qrcode": "1.5.3",
@ -214,6 +215,7 @@
"@types/oauth2orize": "1.11.5",
"@types/oauth2orize-pkce": "0.1.2",
"@types/pg": "8.11.6",
"@types/proxy-addr": "^2.0.3",
"@types/pug": "2.0.10",
"@types/punycode": "2.1.4",
"@types/qrcode": "1.5.5",

View file

@ -108,5 +108,6 @@ async function net() {
// FS STAT
async function fs() {
return await si.disksIO().catch(() => ({ rIO_sec: 0, wIO_sec: 0 }));
const io = await si.disksIO().catch(() => null);
return io ?? { rIO_sec: 0, wIO_sec: 0 };
}

View file

@ -19,7 +19,15 @@ import { ChannelFollowingService } from '@/core/ChannelFollowingService.js';
import { AuthenticateService, AuthenticationError } from './AuthenticateService.js';
import MainStreamConnection from './stream/Connection.js';
import { ChannelsService } from './stream/ChannelsService.js';
import { RateLimiterService } from './RateLimiterService.js';
import { RoleService } from '@/core/RoleService.js';
import { getIpHash } from '@/misc/get-ip-hash.js';
import proxyAddr from 'proxy-addr';
import ms from 'ms';
import type * as http from 'node:http';
import type { IEndpointMeta } from './endpoints.js';
import { LoggerService } from '@/core/LoggerService.js';
import type Logger from '@/logger.js';
@Injectable()
export class StreamingApiServerService {
@ -41,9 +49,35 @@ export class StreamingApiServerService {
private notificationService: NotificationService,
private usersService: UserService,
private channelFollowingService: ChannelFollowingService,
private rateLimiterService: RateLimiterService,
private roleService: RoleService,
private loggerService: LoggerService,
) {
}
@bindThis
private async rateLimitThis(
user: MiLocalUser | null | undefined,
requestIp: string | undefined,
limit: IEndpointMeta['limit'] & { key: NonNullable<string> },
) : Promise<boolean> {
let limitActor: string;
if (user) {
limitActor = user.id;
} else {
limitActor = getIpHash(requestIp || 'wtf');
}
const factor = user ? (await this.roleService.getUserPolicies(user.id)).rateLimitFactor : 1;
if (factor <= 0) return false;
// Rate limit
return await this.rateLimiterService.limit(limit, limitActor, factor)
.then(() => { return false; })
.catch(err => { return true; });
}
@bindThis
public attach(server: http.Server): void {
this.#wss = new WebSocket.WebSocketServer({
@ -57,6 +91,22 @@ export class StreamingApiServerService {
return;
}
// ServerServices sets `trustProxy: true`, which inside
// fastify/request.js ends up calling `proxyAddr` in this way,
// so we do the same
const requestIp = proxyAddr(request, () => { return true; } );
if (await this.rateLimitThis(null, requestIp, {
key: 'wsconnect',
duration: ms('5min'),
max: 32,
minInterval: ms('1sec'),
})) {
socket.write('HTTP/1.1 429 Rate Limit Exceeded\r\n\r\n');
socket.destroy();
return;
}
const q = new URL(request.url, `http://${request.headers.host}`).searchParams;
let user: MiLocalUser | null = null;
@ -94,13 +144,23 @@ export class StreamingApiServerService {
return;
}
const rateLimiter = () => {
return this.rateLimitThis(user, requestIp, {
key: 'wsmessage',
duration: ms('5sec'),
max: 256,
});
};
const stream = new MainStreamConnection(
this.channelsService,
this.noteReadService,
this.notificationService,
this.cacheService,
this.channelFollowingService,
user, app,
this.loggerService,
user, app, requestIp,
rateLimiter,
);
await stream.init();

View file

@ -18,6 +18,10 @@ import type { JsonObject } from '@/misc/json-value.js';
import type { ChannelsService } from './ChannelsService.js';
import type { EventEmitter } from 'events';
import type Channel from './channel.js';
import { LoggerService } from '@/core/LoggerService.js';
import type Logger from '@/logger.js';
const MAX_CHANNELS_PER_CONNECTION = 32;
/**
* Main stream connection
@ -26,6 +30,7 @@ import type Channel from './channel.js';
export default class Connection {
public user?: MiUser;
public token?: MiAccessToken;
private rateLimiter?: () => Promise<boolean>;
private wsConnection: WebSocket.WebSocket;
public subscriber: StreamEventEmitter;
private channels: Channel[] = [];
@ -39,6 +44,9 @@ export default class Connection {
public userIdsWhoMeMutingRenotes: Set<string> = new Set();
public userMutedInstances: Set<string> = new Set();
private fetchIntervalId: NodeJS.Timeout | null = null;
private activeRateLimitRequests: number = 0;
private closingConnection: boolean = false;
private logger: Logger;
constructor(
private channelsService: ChannelsService,
@ -46,12 +54,18 @@ export default class Connection {
private notificationService: NotificationService,
private cacheService: CacheService,
private channelFollowingService: ChannelFollowingService,
loggerService: LoggerService,
user: MiUser | null | undefined,
token: MiAccessToken | null | undefined,
private ip: string,
rateLimiter: () => Promise<boolean>,
) {
if (user) this.user = user;
if (token) this.token = token;
if (rateLimiter) this.rateLimiter = rateLimiter;
this.logger = loggerService.getLogger('streaming', 'coral');
}
@bindThis
@ -104,6 +118,27 @@ export default class Connection {
private async onWsConnectionMessage(data: WebSocket.RawData) {
let obj: JsonObject;
if (this.closingConnection) return;
if (this.rateLimiter) {
if (this.activeRateLimitRequests <= 128) {
this.activeRateLimitRequests++;
const shouldRateLimit = await this.rateLimiter();
this.activeRateLimitRequests--;
if (shouldRateLimit) return;
if (this.closingConnection) return;
} else {
let connectionInfo = `IP ${this.ip}`;
if (this.user) connectionInfo += `, user ID ${this.user.id}`;
this.logger.warn(`Closing a connection (${connectionInfo}) due to an excessive influx of messages.`);
this.closingConnection = true;
this.wsConnection.close(1008, 'Please stop spamming the streaming API.');
return;
}
}
try {
obj = JSON.parse(data.toString());
} catch (e) {
@ -263,6 +298,10 @@ export default class Connection {
*/
@bindThis
public connectChannel(id: string, params: JsonObject | undefined, channel: string, pong = false) {
if (this.channels.length >= MAX_CHANNELS_PER_CONNECTION) {
return;
}
const channelService = this.channelsService.getChannelService(channel);
if (channelService.requireCredential && this.user == null) {

View file

@ -358,6 +358,9 @@ importers:
promise-limit:
specifier: 2.7.0
version: 2.7.0
proxy-addr:
specifier: ^2.0.7
version: 2.0.7
pug:
specifier: 3.0.3
version: 3.0.3
@ -609,6 +612,9 @@ importers:
'@types/pg':
specifier: 8.11.6
version: 8.11.6
'@types/proxy-addr':
specifier: ^2.0.3
version: 2.0.3
'@types/pug':
specifier: 2.0.10
version: 2.0.10
@ -5126,6 +5132,9 @@ packages:
'@types/prop-types@15.7.5':
resolution: {integrity: sha512-JCB8C6SnDoQf0cNycqd/35A7MjcnK+ZTqE7judS6o7utxUCg6imJg3QK2qzHKszlTjcj2cn+NwMB2i96ubpj7w==}
'@types/proxy-addr@2.0.3':
resolution: {integrity: sha512-TgAHHO4tNG3HgLTUhB+hM4iwW6JUNeQHCLnF1DjaDA9c69PN+IasoFu2MYDhubFc+ZIw5c5t9DMtjvrD6R3Egg==}
'@types/pug@2.0.10':
resolution: {integrity: sha512-Sk/uYFOBAB7mb74XcpizmH0KOR2Pv3D2Hmrh1Dmy5BmK3MpdSa5kqZcg6EKBdklU0bFXX9gCfzvpnyUehrPIuA==}
@ -16873,6 +16882,10 @@ snapshots:
'@types/prop-types@15.7.5': {}
'@types/proxy-addr@2.0.3':
dependencies:
'@types/node': 20.14.12
'@types/pug@2.0.10': {}
'@types/punycode@2.1.4': {}