mirror of
https://activitypub.software/TransFem-org/Sharkey
synced 2024-12-18 22:40:07 +00:00
use atomic variant of Leaky Bucket for safe concurrent rate limits
This commit is contained in:
parent
407b2423af
commit
0ea9d6ec5d
6 changed files with 378 additions and 344 deletions
|
@ -155,8 +155,6 @@ import { QueueModule } from './QueueModule.js';
|
||||||
import { QueueService } from './QueueService.js';
|
import { QueueService } from './QueueService.js';
|
||||||
import { LoggerService } from './LoggerService.js';
|
import { LoggerService } from './LoggerService.js';
|
||||||
import { SponsorsService } from './SponsorsService.js';
|
import { SponsorsService } from './SponsorsService.js';
|
||||||
import { RedisConnectionPool } from './RedisConnectionPool.js';
|
|
||||||
import { TimeoutService } from './TimeoutService.js';
|
|
||||||
import type { Provider } from '@nestjs/common';
|
import type { Provider } from '@nestjs/common';
|
||||||
|
|
||||||
//#region 文字列ベースでのinjection用(循環参照対応のため)
|
//#region 文字列ベースでのinjection用(循環参照対応のため)
|
||||||
|
@ -385,8 +383,6 @@ const $SponsorsService: Provider = { provide: 'SponsorsService', useExisting: Sp
|
||||||
ChannelFollowingService,
|
ChannelFollowingService,
|
||||||
RegistryApiService,
|
RegistryApiService,
|
||||||
ReversiService,
|
ReversiService,
|
||||||
RedisConnectionPool,
|
|
||||||
TimeoutService,
|
|
||||||
TimeService,
|
TimeService,
|
||||||
EnvService,
|
EnvService,
|
||||||
|
|
||||||
|
@ -688,8 +684,6 @@ const $SponsorsService: Provider = { provide: 'SponsorsService', useExisting: Sp
|
||||||
ChannelFollowingService,
|
ChannelFollowingService,
|
||||||
RegistryApiService,
|
RegistryApiService,
|
||||||
ReversiService,
|
ReversiService,
|
||||||
RedisConnectionPool,
|
|
||||||
TimeoutService,
|
|
||||||
TimeService,
|
TimeService,
|
||||||
EnvService,
|
EnvService,
|
||||||
|
|
||||||
|
|
|
@ -1,103 +0,0 @@
|
||||||
/*
|
|
||||||
* SPDX-FileCopyrightText: hazelnoot and other Sharkey contributors
|
|
||||||
* SPDX-License-Identifier: AGPL-3.0-only
|
|
||||||
*/
|
|
||||||
|
|
||||||
import { Inject, Injectable, OnApplicationShutdown } from '@nestjs/common';
|
|
||||||
import Redis, { RedisOptions } from 'ioredis';
|
|
||||||
import { DI } from '@/di-symbols.js';
|
|
||||||
import type { Config } from '@/config.js';
|
|
||||||
import Logger from '@/logger.js';
|
|
||||||
import { Timeout, TimeoutService } from '@/core/TimeoutService.js';
|
|
||||||
import { LoggerService } from './LoggerService.js';
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Target number of connections to keep open and ready for use.
|
|
||||||
* The pool may grow beyond this during bursty traffic, but it will always shrink back to this number.
|
|
||||||
* The pool may remain below this number is the server never experiences enough traffic to consume this many clients.
|
|
||||||
*/
|
|
||||||
export const poolSize = 16;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* How often to drop an idle connection from the pool.
|
|
||||||
* This will never shrink the pool below poolSize.
|
|
||||||
*/
|
|
||||||
export const poolShrinkInterval = 5 * 1000; // 5 seconds
|
|
||||||
|
|
||||||
@Injectable()
|
|
||||||
export class RedisConnectionPool implements OnApplicationShutdown {
|
|
||||||
private readonly poolShrinkTimer: Timeout;
|
|
||||||
private readonly pool: Redis.Redis[] = [];
|
|
||||||
private readonly logger: Logger;
|
|
||||||
private readonly redisOptions: RedisOptions;
|
|
||||||
|
|
||||||
constructor(@Inject(DI.config) config: Config, loggerService: LoggerService, timeoutService: TimeoutService) {
|
|
||||||
this.logger = loggerService.getLogger('redis-pool');
|
|
||||||
this.poolShrinkTimer = timeoutService.setInterval(() => this.shrinkPool(), poolShrinkInterval);
|
|
||||||
this.redisOptions = {
|
|
||||||
...config.redis,
|
|
||||||
|
|
||||||
// Set lazyConnect so that we can await() the connection manually.
|
|
||||||
// This helps to avoid a "stampede" of new connections (which are processed in the background!) under bursty conditions.
|
|
||||||
lazyConnect: true,
|
|
||||||
enableOfflineQueue: false,
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Gets a Redis connection from the pool, or creates a new connection if the pool is empty.
|
|
||||||
* The returned object MUST be returned with a call to free(), even in the case of exceptions!
|
|
||||||
* Use a try...finally block for safe handling.
|
|
||||||
*/
|
|
||||||
public async alloc(): Promise<Redis.Redis> {
|
|
||||||
let redis = this.pool.pop();
|
|
||||||
|
|
||||||
// The pool may be empty if we're under heavy load and/or we haven't opened all connections.
|
|
||||||
// Just construct a new instance, which will eventually be added to the pool.
|
|
||||||
// Excess clients will be disposed eventually.
|
|
||||||
if (!redis) {
|
|
||||||
redis = new Redis.Redis(this.redisOptions);
|
|
||||||
await redis.connect();
|
|
||||||
}
|
|
||||||
|
|
||||||
return redis;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns a Redis connection to the pool.
|
|
||||||
* The instance MUST not be used after returning!
|
|
||||||
* Use a try...finally block for safe handling.
|
|
||||||
*/
|
|
||||||
public async free(redis: Redis.Redis): Promise<void> {
|
|
||||||
// https://redis.io/docs/latest/commands/reset/
|
|
||||||
await redis.reset();
|
|
||||||
|
|
||||||
this.pool.push(redis);
|
|
||||||
}
|
|
||||||
|
|
||||||
public async onApplicationShutdown(): Promise<void> {
|
|
||||||
// Cancel timer, otherwise it will cause a memory leak
|
|
||||||
clearInterval(this.poolShrinkTimer);
|
|
||||||
|
|
||||||
// Disconnect all remaining instances
|
|
||||||
while (this.pool.length > 0) {
|
|
||||||
await this.dropClient();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private async shrinkPool(): Promise<void> {
|
|
||||||
this.logger.debug(`Pool size is ${this.pool.length}`);
|
|
||||||
if (this.pool.length > poolSize) {
|
|
||||||
await this.dropClient();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private async dropClient(): Promise<void> {
|
|
||||||
try {
|
|
||||||
const redis = this.pool.pop();
|
|
||||||
await redis?.quit();
|
|
||||||
} catch (err) {
|
|
||||||
this.logger.warn(`Error disconnecting from redis: ${err}`, { err });
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,76 +0,0 @@
|
||||||
/*
|
|
||||||
* SPDX-FileCopyrightText: hazelnoot and other Sharkey contributors
|
|
||||||
* SPDX-License-Identifier: AGPL-3.0-only
|
|
||||||
*/
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Provides access to setTimeout, setInterval, and related functions.
|
|
||||||
* Used to support deterministic unit testing.
|
|
||||||
*/
|
|
||||||
export class TimeoutService {
|
|
||||||
/**
|
|
||||||
* Returns a promise that resolves after the specified timeout in milliseconds.
|
|
||||||
*/
|
|
||||||
public delay(timeout: number): Promise<void> {
|
|
||||||
return new Promise(resolve => {
|
|
||||||
this.setTimeout(resolve, timeout);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Passthrough to node's setTimeout
|
|
||||||
*/
|
|
||||||
public setTimeout(handler: TimeoutHandler, timeout?: number): Timeout {
|
|
||||||
return setTimeout(() => handler(), timeout);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Passthrough to node's setInterval
|
|
||||||
*/
|
|
||||||
public setInterval(handler: TimeoutHandler, timeout?: number): Timeout {
|
|
||||||
return setInterval(() => handler(), timeout);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Passthrough to node's clearTimeout
|
|
||||||
*/
|
|
||||||
public clearTimeout(timeout: Timeout) {
|
|
||||||
clearTimeout(timeout);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Passthrough to node's clearInterval
|
|
||||||
*/
|
|
||||||
public clearInterval(timeout: Timeout) {
|
|
||||||
clearInterval(timeout);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Function to be called when a timer or interval elapses.
|
|
||||||
*/
|
|
||||||
export type TimeoutHandler = () => void;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* A fucked TS issue causes the DOM setTimeout to get merged with Node setTimeout, creating a "quantum method" that returns either "number" or "NodeJS.Timeout" depending on how it's called.
|
|
||||||
* This would be fine, except it always matches the *wrong type*!
|
|
||||||
* The result is this "impossible" scenario:
|
|
||||||
*
|
|
||||||
* ```typescript
|
|
||||||
* // Test evaluates to "false", because the method's return type is not equal to itself.
|
|
||||||
* type Test = ReturnType<typeof setTimeout> extends ReturnType<typeof setTimeout> ? true : false;
|
|
||||||
*
|
|
||||||
* // This is a compiler error, because the type is broken and triggers some internal TS bug.
|
|
||||||
* const timeout = setTimeout(handler);
|
|
||||||
* clearTimeout(timeout); // compiler error here, because even type inference doesn't work.
|
|
||||||
*
|
|
||||||
* // This fails to compile.
|
|
||||||
* function test(handler, timeout): ReturnType<typeof setTimeout> {
|
|
||||||
* return setTimeout(handler, timeout);
|
|
||||||
* }
|
|
||||||
* ```
|
|
||||||
*
|
|
||||||
* The bug is marked as "wontfix" by TS devs, so we have to work around it ourselves. -_-
|
|
||||||
* By forcing the return type to *explicitly* include both types, we at least make it possible to work with the resulting token.
|
|
||||||
*/
|
|
||||||
export type Timeout = NodeJS.Timeout | number;
|
|
143
packages/backend/src/server/SkRateLimiterService.md
Normal file
143
packages/backend/src/server/SkRateLimiterService.md
Normal file
|
@ -0,0 +1,143 @@
|
||||||
|
# SkRateLimiterService - Leaky Bucket Rate Limit Implementation
|
||||||
|
|
||||||
|
SkRateLimiterService replaces Misskey's RateLimiterService for all use cases.
|
||||||
|
It offers a simplified API, detailed metrics, and support for Rate Limit headers.
|
||||||
|
The prime feature is an implementation of Leaky Bucket - a flexible rate limiting scheme that better supports bursty request patterns common with human interaction.
|
||||||
|
|
||||||
|
## Compatibility
|
||||||
|
|
||||||
|
The API is backwards-compatible with existing limit definitions, but it's preferred to use the new BucketRateLimit interface.
|
||||||
|
Legacy limits will be "translated" into a bucket limit in a way that attempts to respect max, duration, and minInterval (if present).
|
||||||
|
SkRateLimiterService is quite not plug-and-play compatible with existing call sites, because it no longer throws when a limit is exceeded.
|
||||||
|
Instead, the returned LimitInfo object will have "blocked" set to true.
|
||||||
|
Callers are responsible for checking this property and taking any desired action, such as rejecting a request or returning limit details.
|
||||||
|
|
||||||
|
## Headers
|
||||||
|
|
||||||
|
LimitInfo objects (returned by SkRateLimitService.limit()) can be passed to rate-limit-utils.attachHeaders() to send standard rate limit headers with an HTTP response.
|
||||||
|
The defined headers are:
|
||||||
|
|
||||||
|
| Header | Definition | Example |
|
||||||
|
|-------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------------------------|
|
||||||
|
| `X-RateLimit-Remaining` | Number of calls that can be made without triggering the rate limit. Will be zero if the limit is already exceeded, or will be exceeded by the next request. | `X-RateLimit-Remaining: 1` |
|
||||||
|
| `X-RateLimit-Clear` | Time in seconds required to completely clear the rate limit "bucket". | `X-RateLimit-Clear: 1.5` |
|
||||||
|
| `X-RateLimit-Reset` | Contains the number of seconds to wait before retrying the current request. Clients should delay for at least this long before making another call. Only included if the rate limit has already been exceeded. | `X-RateLimit-Reset: 0.755` |
|
||||||
|
| `Retry-After` | Like `X-RateLimit-Reset`, but measured in seconds (rounded up). Preserved for backwards compatibility, and only included if the rate limit has already been exceeded. | `Retry-After: 2` |
|
||||||
|
|
||||||
|
Note: rate limit headers are not standardized, except for `Retry-After`.
|
||||||
|
Header meanings and usage have been devised by adapting common patterns to work with a leaky bucket model instead.
|
||||||
|
|
||||||
|
## Performance
|
||||||
|
|
||||||
|
SkRateLimiterService makes between 1 and 4 redis transactions per rate limit check.
|
||||||
|
One call is read-only, while the others perform at least one write operation.
|
||||||
|
Two integer keys are stored per client/subject, and both expire together after the maximum duration of the limit.
|
||||||
|
While performance has not been formally tested, it's expected that SkRateLimiterService will perform roughly on par with the legacy RateLimiterService.
|
||||||
|
Redis memory usage should be notably lower due to the reduced number of keys and avoidance of set / array constructions.
|
||||||
|
|
||||||
|
## Concurrency and Multi-Node Correctness
|
||||||
|
|
||||||
|
To provide consistency across multi-node environments, leaky bucket is implemented with only atomic operations (Increment, Decrement, Add, and Subtract).
|
||||||
|
This allows the use of Optimistic Locking via modify-check-rollback logic.
|
||||||
|
If a data conflict is detected during the "drip" operation, then it's safely reverted by executing its inverse (Increment <-> Decrement, Add <-> Subtract).
|
||||||
|
We don't need to check for conflicts when adding the current request, as all checks account for the case where the bucket has been "overfilled".
|
||||||
|
Should that happen, the limit delay will be extended until the bucket size is back within limits.
|
||||||
|
|
||||||
|
There is one non-atomic `SET` operation used to populate the initial Timestamp value, but we can safely ignore data races there.
|
||||||
|
Any possible conflict would have to occur within a few-milliseconds window, which means that the final value can be no more than a few milliseconds off from the expected value.
|
||||||
|
This error does not compound, as all further operations are relative (Increment and Add).
|
||||||
|
Thus, it's considered an acceptable tradeoff given the limitations imposed by Redis and IORedis library.
|
||||||
|
|
||||||
|
## Algorithm Pseudocode
|
||||||
|
|
||||||
|
The Atomic Leaky Bucket algorithm is described here, in pseudocode:
|
||||||
|
|
||||||
|
```
|
||||||
|
# Terms
|
||||||
|
# * Now - UNIX timestamp of the current moment
|
||||||
|
# * Bucket Size - Maximum number of requests allowed in the bucket
|
||||||
|
# * Counter - Number of requests in the bucket
|
||||||
|
# * Drip Rate - How often to decrement counter
|
||||||
|
# * Drip Size - How much to decrement the counter
|
||||||
|
# * Timestamp - UNIX timestamp of last bucket drip
|
||||||
|
# * Delta Counter - Difference between current and expected counter value
|
||||||
|
# * Delta Timestamp - Difference between current and expected timestamp value
|
||||||
|
|
||||||
|
# 0 - Calculations
|
||||||
|
dripRate = ceil(limit.dripRate ?? 1000);
|
||||||
|
dripSize = ceil(limit.dripSize ?? 1);
|
||||||
|
bucketSize = max(ceil(limit.size / factor), 1);
|
||||||
|
maxExpiration = max(ceil((dripRate * ceil(bucketSize / dripSize)) / 1000), 1);;
|
||||||
|
|
||||||
|
# 1 - Read
|
||||||
|
MULTI
|
||||||
|
GET 'counter' INTO counter
|
||||||
|
GET 'timestamp' INTO timestamp
|
||||||
|
EXEC
|
||||||
|
|
||||||
|
# 2 - Drip
|
||||||
|
if (counter > 0) {
|
||||||
|
# Deltas
|
||||||
|
deltaCounter = floor((now - timestamp) / dripRate) * dripSize;
|
||||||
|
deltaCounter = min(deltaCounter, counter);
|
||||||
|
deltaTimestamp = deltaCounter * dripRate;
|
||||||
|
if (deltaCounter > 0) {
|
||||||
|
# Update
|
||||||
|
expectedTimestamp = timestamp
|
||||||
|
MULTI
|
||||||
|
GET 'timestamp' INTO canaryTimestamp
|
||||||
|
INCRBY 'timestamp' deltaTimestamp
|
||||||
|
EXPIRE 'timestamp' maxExpiration
|
||||||
|
GET 'timestamp' INTO timestamp
|
||||||
|
DECRBY 'counter' deltaCounter
|
||||||
|
EXPIRE 'counter' maxExpiration
|
||||||
|
GET 'counter' INTO counter
|
||||||
|
EXEC
|
||||||
|
# Rollback
|
||||||
|
if (canaryTimestamp != expectedTimestamp) {
|
||||||
|
MULTI
|
||||||
|
DECRBY 'timestamp' deltaTimestamp
|
||||||
|
GET 'timestamp' INTO timestmamp
|
||||||
|
INCRBY 'counter' deltaCounter
|
||||||
|
GET 'counter' INTO counter
|
||||||
|
EXEC
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
# 3 - Check
|
||||||
|
blocked = counter >= bucketSize
|
||||||
|
if (!blocked) {
|
||||||
|
if (timestamp == 0) {
|
||||||
|
# Edge case - set the initial value for timestamp.
|
||||||
|
# Otherwise the first request will immediately drip away.
|
||||||
|
MULTI
|
||||||
|
SET 'timestamp', now
|
||||||
|
EXPIRE 'timestamp' maxExpiration
|
||||||
|
INCR 'counter'
|
||||||
|
EXPIRE 'counter' maxExpiration
|
||||||
|
GET 'counter' INTO counter
|
||||||
|
EXEC
|
||||||
|
} else {
|
||||||
|
MULTI
|
||||||
|
INCR 'counter'
|
||||||
|
EXPIRE 'counter' maxExpiration
|
||||||
|
GET 'counter' INTO counter
|
||||||
|
EXEC
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
# 4 - Handle
|
||||||
|
if (blocked) {
|
||||||
|
# Application-specific code goes here.
|
||||||
|
# At this point blocked, counter, and timestamp are all accurate and synced to redis.
|
||||||
|
# Caller can apply limits, calculate headers, log audit failure, or anything else.
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
## Notes, Resources, and Further Reading
|
||||||
|
|
||||||
|
* https://en.wikipedia.org/wiki/Leaky_bucket#As_a_meter
|
||||||
|
* https://ietf-wg-httpapi.github.io/ratelimit-headers/darrelmiller-policyname/draft-ietf-httpapi-ratelimit-headers.txt
|
||||||
|
* https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Retry-After
|
||||||
|
* https://stackoverflow.com/a/16022625
|
|
@ -8,8 +8,7 @@ import Redis from 'ioredis';
|
||||||
import { TimeService } from '@/core/TimeService.js';
|
import { TimeService } from '@/core/TimeService.js';
|
||||||
import { EnvService } from '@/core/EnvService.js';
|
import { EnvService } from '@/core/EnvService.js';
|
||||||
import { BucketRateLimit, LegacyRateLimit, LimitInfo, RateLimit, hasMinLimit, isLegacyRateLimit, Keyed, hasMaxLimit, disabledLimitInfo, MaxLegacyLimit, MinLegacyLimit } from '@/misc/rate-limit-utils.js';
|
import { BucketRateLimit, LegacyRateLimit, LimitInfo, RateLimit, hasMinLimit, isLegacyRateLimit, Keyed, hasMaxLimit, disabledLimitInfo, MaxLegacyLimit, MinLegacyLimit } from '@/misc/rate-limit-utils.js';
|
||||||
import { RedisConnectionPool } from '@/core/RedisConnectionPool.js';
|
import { DI } from '@/di-symbols.js';
|
||||||
import { TimeoutService } from '@/core/TimeoutService.js';
|
|
||||||
|
|
||||||
@Injectable()
|
@Injectable()
|
||||||
export class SkRateLimiterService {
|
export class SkRateLimiterService {
|
||||||
|
@ -19,11 +18,8 @@ export class SkRateLimiterService {
|
||||||
@Inject(TimeService)
|
@Inject(TimeService)
|
||||||
private readonly timeService: TimeService,
|
private readonly timeService: TimeService,
|
||||||
|
|
||||||
@Inject(TimeoutService)
|
@Inject(DI.redis)
|
||||||
private readonly timeoutService: TimeoutService,
|
private readonly redisClient: Redis.Redis,
|
||||||
|
|
||||||
@Inject(RedisConnectionPool)
|
|
||||||
private readonly redisPool: RedisConnectionPool,
|
|
||||||
|
|
||||||
@Inject(EnvService)
|
@Inject(EnvService)
|
||||||
envService: EnvService,
|
envService: EnvService,
|
||||||
|
@ -31,6 +27,12 @@ export class SkRateLimiterService {
|
||||||
this.disabled = envService.env.NODE_ENV === 'test';
|
this.disabled = envService.env.NODE_ENV === 'test';
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check & increment a rate limit
|
||||||
|
* @param limit The limit definition
|
||||||
|
* @param actor Client who is calling this limit
|
||||||
|
* @param factor Scaling factor - smaller = larger limit (less restrictive)
|
||||||
|
*/
|
||||||
public async limit(limit: Keyed<RateLimit>, actor: string, factor = 1): Promise<LimitInfo> {
|
public async limit(limit: Keyed<RateLimit>, actor: string, factor = 1): Promise<LimitInfo> {
|
||||||
if (this.disabled || factor === 0) {
|
if (this.disabled || factor === 0) {
|
||||||
return disabledLimitInfo;
|
return disabledLimitInfo;
|
||||||
|
@ -40,52 +42,28 @@ export class SkRateLimiterService {
|
||||||
throw new Error(`Rate limit factor is zero or negative: ${factor}`);
|
throw new Error(`Rate limit factor is zero or negative: ${factor}`);
|
||||||
}
|
}
|
||||||
|
|
||||||
const redis = await this.redisPool.alloc();
|
return await this.tryLimit(limit, actor, factor);
|
||||||
try {
|
}
|
||||||
return await this.tryLimit(redis, limit, actor, factor);
|
|
||||||
} finally {
|
private async tryLimit(limit: Keyed<RateLimit>, actor: string, factor: number): Promise<LimitInfo> {
|
||||||
await this.redisPool.free(redis);
|
if (isLegacyRateLimit(limit)) {
|
||||||
|
return await this.limitLegacy(limit, actor, factor);
|
||||||
|
} else {
|
||||||
|
return await this.limitBucket(limit, actor, factor);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private async tryLimit(redis: Redis.Redis, limit: Keyed<RateLimit>, actor: string, factor: number, retry = 0): Promise<LimitInfo> {
|
private async limitLegacy(limit: Keyed<LegacyRateLimit>, actor: string, factor: number): Promise<LimitInfo> {
|
||||||
try {
|
|
||||||
if (retry > 0) {
|
|
||||||
// Real-world testing showed the need for backoff to "spread out" bursty traffic.
|
|
||||||
const backoff = Math.round(Math.pow(2, retry + Math.random()));
|
|
||||||
await this.timeoutService.delay(backoff);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (isLegacyRateLimit(limit)) {
|
|
||||||
return await this.limitLegacy(redis, limit, actor, factor);
|
|
||||||
} else {
|
|
||||||
return await this.limitBucket(redis, limit, actor, factor);
|
|
||||||
}
|
|
||||||
} catch (err) {
|
|
||||||
// We may experience collision errors from optimistic locking.
|
|
||||||
// This is expected, so we should retry a few times before giving up.
|
|
||||||
// https://redis.io/docs/latest/develop/interact/transactions/#optimistic-locking-using-check-and-set
|
|
||||||
if (err instanceof ConflictError && retry < 4) {
|
|
||||||
// We can reuse the same connection to reduce pool contention, but we have to reset it first.
|
|
||||||
await redis.reset();
|
|
||||||
return await this.tryLimit(redis, limit, actor, factor, retry + 1);
|
|
||||||
}
|
|
||||||
|
|
||||||
throw err;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private async limitLegacy(redis: Redis.Redis, limit: Keyed<LegacyRateLimit>, actor: string, factor: number): Promise<LimitInfo> {
|
|
||||||
if (hasMaxLimit(limit)) {
|
if (hasMaxLimit(limit)) {
|
||||||
return await this.limitMaxLegacy(redis, limit, actor, factor);
|
return await this.limitMaxLegacy(limit, actor, factor);
|
||||||
} else if (hasMinLimit(limit)) {
|
} else if (hasMinLimit(limit)) {
|
||||||
return await this.limitMinLegacy(redis, limit, actor, factor);
|
return await this.limitMinLegacy(limit, actor, factor);
|
||||||
} else {
|
} else {
|
||||||
return disabledLimitInfo;
|
return disabledLimitInfo;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private async limitMaxLegacy(redis: Redis.Redis, limit: Keyed<MaxLegacyLimit>, actor: string, factor: number): Promise<LimitInfo> {
|
private async limitMaxLegacy(limit: Keyed<MaxLegacyLimit>, actor: string, factor: number): Promise<LimitInfo> {
|
||||||
if (limit.duration === 0) return disabledLimitInfo;
|
if (limit.duration === 0) return disabledLimitInfo;
|
||||||
if (limit.duration < 0) throw new Error(`Invalid rate limit ${limit.key}: duration is negative (${limit.duration})`);
|
if (limit.duration < 0) throw new Error(`Invalid rate limit ${limit.key}: duration is negative (${limit.duration})`);
|
||||||
if (limit.max < 1) throw new Error(`Invalid rate limit ${limit.key}: max is less than 1 (${limit.max})`);
|
if (limit.max < 1) throw new Error(`Invalid rate limit ${limit.key}: max is less than 1 (${limit.max})`);
|
||||||
|
@ -106,10 +84,10 @@ export class SkRateLimiterService {
|
||||||
dripRate,
|
dripRate,
|
||||||
dripSize,
|
dripSize,
|
||||||
};
|
};
|
||||||
return await this.limitBucket(redis, bucketLimit, actor, factor);
|
return await this.limitBucket(bucketLimit, actor, factor);
|
||||||
}
|
}
|
||||||
|
|
||||||
private async limitMinLegacy(redis: Redis.Redis, limit: Keyed<MinLegacyLimit>, actor: string, factor: number): Promise<LimitInfo> {
|
private async limitMinLegacy(limit: Keyed<MinLegacyLimit>, actor: string, factor: number): Promise<LimitInfo> {
|
||||||
if (limit.minInterval === 0) return disabledLimitInfo;
|
if (limit.minInterval === 0) return disabledLimitInfo;
|
||||||
if (limit.minInterval < 0) throw new Error(`Invalid rate limit ${limit.key}: minInterval is negative (${limit.minInterval})`);
|
if (limit.minInterval < 0) throw new Error(`Invalid rate limit ${limit.key}: minInterval is negative (${limit.minInterval})`);
|
||||||
|
|
||||||
|
@ -121,33 +99,83 @@ export class SkRateLimiterService {
|
||||||
dripRate,
|
dripRate,
|
||||||
dripSize: 1,
|
dripSize: 1,
|
||||||
};
|
};
|
||||||
return await this.limitBucket(redis, bucketLimit, actor, factor);
|
return await this.limitBucket(bucketLimit, actor, factor);
|
||||||
}
|
}
|
||||||
|
|
||||||
private async limitBucket(redis: Redis.Redis, limit: Keyed<BucketRateLimit>, actor: string, factor: number): Promise<LimitInfo> {
|
/**
|
||||||
|
* Implementation of Leaky Bucket rate limiting - see SkRateLimiterService.md for details.
|
||||||
|
*/
|
||||||
|
private async limitBucket(limit: Keyed<BucketRateLimit>, actor: string, factor: number): Promise<LimitInfo> {
|
||||||
if (limit.size < 1) throw new Error(`Invalid rate limit ${limit.key}: size is less than 1 (${limit.size})`);
|
if (limit.size < 1) throw new Error(`Invalid rate limit ${limit.key}: size is less than 1 (${limit.size})`);
|
||||||
if (limit.dripRate != null && limit.dripRate < 1) throw new Error(`Invalid rate limit ${limit.key}: dripRate is less than 1 (${limit.dripRate})`);
|
if (limit.dripRate != null && limit.dripRate < 1) throw new Error(`Invalid rate limit ${limit.key}: dripRate is less than 1 (${limit.dripRate})`);
|
||||||
if (limit.dripSize != null && limit.dripSize < 1) throw new Error(`Invalid rate limit ${limit.key}: dripSize is less than 1 (${limit.dripSize})`);
|
if (limit.dripSize != null && limit.dripSize < 1) throw new Error(`Invalid rate limit ${limit.key}: dripSize is less than 1 (${limit.dripSize})`);
|
||||||
|
|
||||||
const redisKey = createLimitKey(limit, actor);
|
// 0 - Calculate
|
||||||
|
const now = this.timeService.now;
|
||||||
const bucketSize = Math.max(Math.ceil(limit.size / factor), 1);
|
const bucketSize = Math.max(Math.ceil(limit.size / factor), 1);
|
||||||
const dripRate = Math.ceil(limit.dripRate ?? 1000);
|
const dripRate = Math.ceil(limit.dripRate ?? 1000);
|
||||||
const dripSize = Math.ceil(limit.dripSize ?? 1);
|
const dripSize = Math.ceil(limit.dripSize ?? 1);
|
||||||
const expirationSec = Math.max(Math.ceil(bucketSize / dripRate), 1);
|
const expirationSec = Math.max(Math.ceil((dripRate * Math.ceil(bucketSize / dripSize)) / 1000), 1);
|
||||||
|
|
||||||
// Simulate bucket drips
|
// 1 - Read
|
||||||
const counter = await this.getLimitCounter(redis, redisKey);
|
const counterKey = createLimitKey(limit, actor, 'c');
|
||||||
if (counter.counter > 0) {
|
const timestampKey = createLimitKey(limit, actor, 't');
|
||||||
const dripsSinceLastTick = Math.floor((this.timeService.now - counter.timestamp) / dripRate) * dripSize;
|
const counter = await this.getLimitCounter(counterKey, timestampKey);
|
||||||
counter.counter = Math.max(counter.counter - dripsSinceLastTick, 0);
|
|
||||||
|
// 2 - Drip
|
||||||
|
const dripsSinceLastTick = Math.floor((now - counter.timestamp) / dripRate) * dripSize;
|
||||||
|
const deltaCounter = Math.min(dripsSinceLastTick, counter.counter);
|
||||||
|
const deltaTimestamp = dripsSinceLastTick * dripRate;
|
||||||
|
if (deltaCounter > 0) {
|
||||||
|
// Execute the next drip(s)
|
||||||
|
const results = await this.executeRedisMulti(
|
||||||
|
['get', timestampKey],
|
||||||
|
['incrby', timestampKey, deltaTimestamp],
|
||||||
|
['expire', timestampKey, expirationSec],
|
||||||
|
['get', timestampKey],
|
||||||
|
['decrby', counterKey, deltaCounter],
|
||||||
|
['expire', counterKey, expirationSec],
|
||||||
|
['get', counterKey],
|
||||||
|
);
|
||||||
|
const expectedTimestamp = counter.timestamp;
|
||||||
|
const canaryTimestamp = results[0] ? parseInt(results[0]) : 0;
|
||||||
|
counter.timestamp = results[3] ? parseInt(results[3]) : 0;
|
||||||
|
counter.counter = results[6] ? parseInt(results[6]) : 0;
|
||||||
|
|
||||||
|
// Check for a data collision and rollback
|
||||||
|
if (canaryTimestamp !== expectedTimestamp) {
|
||||||
|
const rollbackResults = await this.executeRedisMulti(
|
||||||
|
['decrby', timestampKey, deltaTimestamp],
|
||||||
|
['get', timestampKey],
|
||||||
|
['incrby', counterKey, deltaCounter],
|
||||||
|
['get', counterKey],
|
||||||
|
);
|
||||||
|
counter.timestamp = rollbackResults[1] ? parseInt(rollbackResults[1]) : 0;
|
||||||
|
counter.counter = rollbackResults[3] ? parseInt(rollbackResults[3]) : 0;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Increment the limit, then synchronize with redis
|
// 3 - Check
|
||||||
const blocked = counter.counter >= bucketSize;
|
const blocked = counter.counter >= bucketSize;
|
||||||
if (!blocked) {
|
if (!blocked) {
|
||||||
counter.counter++;
|
if (counter.timestamp === 0) {
|
||||||
counter.timestamp = this.timeService.now;
|
const results = await this.executeRedisMulti(
|
||||||
await this.updateLimitCounter(redis, redisKey, expirationSec, counter);
|
['set', timestampKey, now],
|
||||||
|
['expire', timestampKey, expirationSec],
|
||||||
|
['incr', counterKey],
|
||||||
|
['expire', counterKey, expirationSec],
|
||||||
|
['get', counterKey],
|
||||||
|
);
|
||||||
|
counter.timestamp = now;
|
||||||
|
counter.counter = results[4] ? parseInt(results[4]) : 0;
|
||||||
|
} else {
|
||||||
|
const results = await this.executeRedisMulti(
|
||||||
|
['incr', counterKey],
|
||||||
|
['expire', counterKey, expirationSec],
|
||||||
|
['get', counterKey],
|
||||||
|
);
|
||||||
|
counter.counter = results[2] ? parseInt(results[2]) : 0;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Calculate how much time is needed to free up a bucket slot
|
// Calculate how much time is needed to free up a bucket slot
|
||||||
|
@ -164,37 +192,20 @@ export class SkRateLimiterService {
|
||||||
return { blocked, remaining, resetSec, resetMs, fullResetSec, fullResetMs };
|
return { blocked, remaining, resetSec, resetMs, fullResetSec, fullResetMs };
|
||||||
}
|
}
|
||||||
|
|
||||||
private async getLimitCounter(redis: Redis.Redis, key: string): Promise<LimitCounter> {
|
private async getLimitCounter(counterKey: string, timestampKey: string): Promise<LimitCounter> {
|
||||||
const counter: LimitCounter = { counter: 0, timestamp: 0 };
|
const [counter, timestamp] = await this.executeRedisMulti(
|
||||||
|
['get', counterKey],
|
||||||
// Watch the key BEFORE reading it!
|
['get', timestampKey],
|
||||||
await redis.watch(key);
|
|
||||||
const data = await redis.get(key);
|
|
||||||
|
|
||||||
// Data may be missing or corrupt if the key doesn't exist.
|
|
||||||
// This is an expected edge case.
|
|
||||||
if (data) {
|
|
||||||
const parts = data.split(':');
|
|
||||||
if (parts.length === 2) {
|
|
||||||
counter.counter = parseInt(parts[0]);
|
|
||||||
counter.timestamp = parseInt(parts[1]);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return counter;
|
|
||||||
}
|
|
||||||
|
|
||||||
private async updateLimitCounter(redis: Redis.Redis, key: string, expirationSec: number, counter: LimitCounter): Promise<void> {
|
|
||||||
const data = `${counter.counter}:${counter.timestamp}`;
|
|
||||||
|
|
||||||
await this.executeRedisMulti(
|
|
||||||
redis,
|
|
||||||
[['set', key, data, 'EX', expirationSec]],
|
|
||||||
);
|
);
|
||||||
|
|
||||||
|
return {
|
||||||
|
counter: counter ? parseInt(counter) : 0,
|
||||||
|
timestamp: timestamp ? parseInt(timestamp) : 0,
|
||||||
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
private async executeRedisMulti<Num extends number>(redis: Redis.Redis, batch: RedisBatch<Num>): Promise<RedisResults<Num>> {
|
private async executeRedisMulti(...batch: RedisCommand[]): Promise<RedisResult[]> {
|
||||||
const results = await redis.multi(batch).exec();
|
const results = await this.redisClient.multi(batch).exec();
|
||||||
|
|
||||||
// Transaction conflict (retryable)
|
// Transaction conflict (retryable)
|
||||||
if (!results) {
|
if (!results) {
|
||||||
|
@ -206,21 +217,32 @@ export class SkRateLimiterService {
|
||||||
throw new Error('Redis error: failed to execute batch');
|
throw new Error('Redis error: failed to execute batch');
|
||||||
}
|
}
|
||||||
|
|
||||||
// Command failed (fatal)
|
// Map responses
|
||||||
const errors = results.map(r => r[0]).filter(e => e != null);
|
const errors: Error[] = [];
|
||||||
if (errors.length > 0) {
|
const responses: RedisResult[] = [];
|
||||||
throw new AggregateError(errors, `Redis error: failed to execute command(s): '${errors.join('\', \'')}'`);
|
for (const [error, response] of results) {
|
||||||
|
if (error) errors.push(error);
|
||||||
|
responses.push(response as RedisResult);
|
||||||
}
|
}
|
||||||
|
|
||||||
return results.map(r => r[1]) as RedisResults<Num>;
|
// Command failed (fatal)
|
||||||
|
if (errors.length > 0) {
|
||||||
|
const errorMessages = errors
|
||||||
|
.map((e, i) => `Error in command ${i}: ${e}`)
|
||||||
|
.join('\', \'');
|
||||||
|
throw new AggregateError(errors, `Redis error: failed to execute command(s): '${errorMessages}'`);
|
||||||
|
}
|
||||||
|
|
||||||
|
return responses;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type RedisBatch<Num extends number> = [string, ...unknown[]][] & { length: Num };
|
// Not correct, but good enough for the basic commands we use.
|
||||||
type RedisResults<Num extends number> = (string | null)[] & { length: Num };
|
type RedisResult = string | null;
|
||||||
|
type RedisCommand = [command: string, ...args: unknown[]];
|
||||||
|
|
||||||
function createLimitKey(limit: Keyed<RateLimit>, actor: string): string {
|
function createLimitKey(limit: Keyed<RateLimit>, actor: string, value: string): string {
|
||||||
return `rl_${actor}_${limit.key}`;
|
return `rl_${actor}_${limit.key}_${value}`;
|
||||||
}
|
}
|
||||||
|
|
||||||
class ConflictError extends Error {}
|
class ConflictError extends Error {}
|
||||||
|
|
|
@ -6,8 +6,6 @@
|
||||||
import type Redis from 'ioredis';
|
import type Redis from 'ioredis';
|
||||||
import { SkRateLimiterService } from '@/server/api/SkRateLimiterService.js';
|
import { SkRateLimiterService } from '@/server/api/SkRateLimiterService.js';
|
||||||
import { BucketRateLimit, Keyed, LegacyRateLimit } from '@/misc/rate-limit-utils.js';
|
import { BucketRateLimit, Keyed, LegacyRateLimit } from '@/misc/rate-limit-utils.js';
|
||||||
import { RedisConnectionPool } from '@/core/RedisConnectionPool.js';
|
|
||||||
import { Timeout, TimeoutHandler, TimeoutService } from '@/core/TimeoutService.js';
|
|
||||||
|
|
||||||
/* eslint-disable @typescript-eslint/no-non-null-assertion */
|
/* eslint-disable @typescript-eslint/no-non-null-assertion */
|
||||||
|
|
||||||
|
@ -64,12 +62,6 @@ describe(SkRateLimiterService, () => {
|
||||||
return Promise.resolve();
|
return Promise.resolve();
|
||||||
},
|
},
|
||||||
} as unknown as Redis.Redis;
|
} as unknown as Redis.Redis;
|
||||||
const mockRedisPool = {
|
|
||||||
alloc() {
|
|
||||||
return Promise.resolve(mockRedisClient);
|
|
||||||
},
|
|
||||||
free() {},
|
|
||||||
} as unknown as RedisConnectionPool;
|
|
||||||
|
|
||||||
mockEnvironment = Object.create(process.env);
|
mockEnvironment = Object.create(process.env);
|
||||||
mockEnvironment.NODE_ENV = 'production';
|
mockEnvironment.NODE_ENV = 'production';
|
||||||
|
@ -77,22 +69,9 @@ describe(SkRateLimiterService, () => {
|
||||||
env: mockEnvironment,
|
env: mockEnvironment,
|
||||||
};
|
};
|
||||||
|
|
||||||
const mockTimeoutService = new class extends TimeoutService {
|
|
||||||
setTimeout(handler: TimeoutHandler): Timeout {
|
|
||||||
handler();
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
setInterval(handler: TimeoutHandler): Timeout {
|
|
||||||
handler();
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
clearTimeout() {}
|
|
||||||
clearInterval() {}
|
|
||||||
};
|
|
||||||
|
|
||||||
let service: SkRateLimiterService | undefined = undefined;
|
let service: SkRateLimiterService | undefined = undefined;
|
||||||
serviceUnderTest = () => {
|
serviceUnderTest = () => {
|
||||||
return service ??= new SkRateLimiterService(mockTimeService, mockTimeoutService, mockRedisPool, mockEnvService);
|
return service ??= new SkRateLimiterService(mockTimeService, mockRedisClient, mockEnvService);
|
||||||
};
|
};
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -108,15 +87,70 @@ describe(SkRateLimiterService, () => {
|
||||||
limitTimestamp = undefined;
|
limitTimestamp = undefined;
|
||||||
|
|
||||||
mockRedis.push(([command, ...args]) => {
|
mockRedis.push(([command, ...args]) => {
|
||||||
if (command === 'set' && args[0] === 'rl_actor_test') {
|
if (command === 'get') {
|
||||||
const parts = (args[1] as string).split(':');
|
if (args[0] === 'rl_actor_test_c') {
|
||||||
limitCounter = parseInt(parts[0] as string);
|
const data = limitCounter?.toString() ?? null;
|
||||||
limitTimestamp = parseInt(parts[1] as string);
|
return [null, data];
|
||||||
return [null, args[1]];
|
}
|
||||||
|
if (args[0] === 'rl_actor_test_t') {
|
||||||
|
const data = limitTimestamp?.toString() ?? null;
|
||||||
|
return [null, data];
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if (command === 'get' && args[0] === 'rl_actor_test') {
|
|
||||||
const data = `${limitCounter ?? 0}:${limitTimestamp ?? 0}`;
|
if (command === 'set') {
|
||||||
return [null, data];
|
if (args[0] === 'rl_actor_test_c') {
|
||||||
|
limitCounter = parseInt(args[1] as string);
|
||||||
|
return [null, args[1]];
|
||||||
|
}
|
||||||
|
if (args[0] === 'rl_actor_test_t') {
|
||||||
|
limitTimestamp = parseInt(args[1] as string);
|
||||||
|
return [null, args[1]];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (command === 'incr') {
|
||||||
|
if (args[0] === 'rl_actor_test_c') {
|
||||||
|
limitCounter = (limitCounter ?? 0) + 1;
|
||||||
|
return [null, null];
|
||||||
|
}
|
||||||
|
if (args[0] === 'rl_actor_test_t') {
|
||||||
|
limitTimestamp = (limitTimestamp ?? 0) + 1;
|
||||||
|
return [null, null];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (command === 'incrby') {
|
||||||
|
if (args[0] === 'rl_actor_test_c') {
|
||||||
|
limitCounter = (limitCounter ?? 0) + parseInt(args[1] as string);
|
||||||
|
return [null, null];
|
||||||
|
}
|
||||||
|
if (args[0] === 'rl_actor_test_t') {
|
||||||
|
limitTimestamp = (limitTimestamp ?? 0) + parseInt(args[1] as string);
|
||||||
|
return [null, null];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (command === 'decr') {
|
||||||
|
if (args[0] === 'rl_actor_test_c') {
|
||||||
|
limitCounter = (limitCounter ?? 0) - 1;
|
||||||
|
return [null, null];
|
||||||
|
}
|
||||||
|
if (args[0] === 'rl_actor_test_t') {
|
||||||
|
limitTimestamp = (limitTimestamp ?? 0) - 1;
|
||||||
|
return [null, null];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (command === 'decrby') {
|
||||||
|
if (args[0] === 'rl_actor_test_c') {
|
||||||
|
limitCounter = (limitCounter ?? 0) - parseInt(args[1] as string);
|
||||||
|
return [null, null];
|
||||||
|
}
|
||||||
|
if (args[0] === 'rl_actor_test_t') {
|
||||||
|
limitTimestamp = (limitTimestamp ?? 0) - parseInt(args[1] as string);
|
||||||
|
return [null, null];
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return null;
|
return null;
|
||||||
|
@ -269,7 +303,19 @@ describe(SkRateLimiterService, () => {
|
||||||
|
|
||||||
await serviceUnderTest().limit(limit, actor);
|
await serviceUnderTest().limit(limit, actor);
|
||||||
|
|
||||||
expect(commands).toContainEqual(['set', 'rl_actor_test', '1:0', 'EX', 1]);
|
expect(commands).toContainEqual(['expire', 'rl_actor_test_c', 1]);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should set timestamp expiration', async () => {
|
||||||
|
const commands: unknown[][] = [];
|
||||||
|
mockRedis.push(command => {
|
||||||
|
commands.push(command);
|
||||||
|
return null;
|
||||||
|
});
|
||||||
|
|
||||||
|
await serviceUnderTest().limit(limit, actor);
|
||||||
|
|
||||||
|
expect(commands).toContainEqual(['expire', 'rl_actor_test_t', 1]);
|
||||||
});
|
});
|
||||||
|
|
||||||
it('should not increment when already blocked', async () => {
|
it('should not increment when already blocked', async () => {
|
||||||
|
@ -368,35 +414,6 @@ describe(SkRateLimiterService, () => {
|
||||||
await expect(promise).rejects.toThrow(/dripSize is less than 1/);
|
await expect(promise).rejects.toThrow(/dripSize is less than 1/);
|
||||||
});
|
});
|
||||||
|
|
||||||
it('should retry when redis conflicts', async () => {
|
|
||||||
let numCalls = 0;
|
|
||||||
const originalExec = mockRedisExec;
|
|
||||||
mockRedisExec = () => {
|
|
||||||
numCalls++;
|
|
||||||
if (numCalls > 1) {
|
|
||||||
mockRedisExec = originalExec;
|
|
||||||
}
|
|
||||||
return Promise.resolve(null);
|
|
||||||
};
|
|
||||||
|
|
||||||
await serviceUnderTest().limit(limit, actor);
|
|
||||||
|
|
||||||
expect(numCalls).toBe(2);
|
|
||||||
});
|
|
||||||
|
|
||||||
it('should bail out after 5 tries', async () => {
|
|
||||||
let numCalls = 0;
|
|
||||||
mockRedisExec = () => {
|
|
||||||
numCalls++;
|
|
||||||
return Promise.resolve(null);
|
|
||||||
};
|
|
||||||
|
|
||||||
const promise = serviceUnderTest().limit(limit, actor);
|
|
||||||
|
|
||||||
await expect(promise).rejects.toThrow(/transaction conflict/);
|
|
||||||
expect(numCalls).toBe(5);
|
|
||||||
});
|
|
||||||
|
|
||||||
it('should apply correction if extra calls slip through', async () => {
|
it('should apply correction if extra calls slip through', async () => {
|
||||||
limitCounter = 2;
|
limitCounter = 2;
|
||||||
|
|
||||||
|
@ -473,8 +490,9 @@ describe(SkRateLimiterService, () => {
|
||||||
await serviceUnderTest().limit(limit, actor); // blocked
|
await serviceUnderTest().limit(limit, actor); // blocked
|
||||||
mockTimeService.now += 1000; // 1 - 1 = 0
|
mockTimeService.now += 1000; // 1 - 1 = 0
|
||||||
mockTimeService.now += 1000; // 0 - 1 = 0
|
mockTimeService.now += 1000; // 0 - 1 = 0
|
||||||
await serviceUnderTest().limit(limit, actor); // 0 + 1 = 1
|
const info = await serviceUnderTest().limit(limit, actor); // 0 + 1 = 1
|
||||||
|
|
||||||
|
expect(info.blocked).toBeFalsy();
|
||||||
expect(limitCounter).toBe(1);
|
expect(limitCounter).toBe(1);
|
||||||
expect(limitTimestamp).toBe(3000);
|
expect(limitTimestamp).toBe(3000);
|
||||||
});
|
});
|
||||||
|
@ -529,7 +547,19 @@ describe(SkRateLimiterService, () => {
|
||||||
|
|
||||||
await serviceUnderTest().limit(limit, actor);
|
await serviceUnderTest().limit(limit, actor);
|
||||||
|
|
||||||
expect(commands).toContainEqual(['set', 'rl_actor_test', '1:0', 'EX', 1]);
|
expect(commands).toContainEqual(['expire', 'rl_actor_test_c', 1]);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should set timer expiration', async () => {
|
||||||
|
const commands: unknown[][] = [];
|
||||||
|
mockRedis.push(command => {
|
||||||
|
commands.push(command);
|
||||||
|
return null;
|
||||||
|
});
|
||||||
|
|
||||||
|
await serviceUnderTest().limit(limit, actor);
|
||||||
|
|
||||||
|
expect(commands).toContainEqual(['expire', 'rl_actor_test_t', 1]);
|
||||||
});
|
});
|
||||||
|
|
||||||
it('should not increment when already blocked', async () => {
|
it('should not increment when already blocked', async () => {
|
||||||
|
@ -688,7 +718,19 @@ describe(SkRateLimiterService, () => {
|
||||||
|
|
||||||
await serviceUnderTest().limit(limit, actor);
|
await serviceUnderTest().limit(limit, actor);
|
||||||
|
|
||||||
expect(commands).toContainEqual(['set', 'rl_actor_test', '1:0', 'EX', 1]);
|
expect(commands).toContainEqual(['expire', 'rl_actor_test_c', 1]);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should set timestamp expiration', async () => {
|
||||||
|
const commands: unknown[][] = [];
|
||||||
|
mockRedis.push(command => {
|
||||||
|
commands.push(command);
|
||||||
|
return null;
|
||||||
|
});
|
||||||
|
|
||||||
|
await serviceUnderTest().limit(limit, actor);
|
||||||
|
|
||||||
|
expect(commands).toContainEqual(['expire', 'rl_actor_test_t', 1]);
|
||||||
});
|
});
|
||||||
|
|
||||||
it('should not increment when already blocked', async () => {
|
it('should not increment when already blocked', async () => {
|
||||||
|
@ -866,7 +908,19 @@ describe(SkRateLimiterService, () => {
|
||||||
|
|
||||||
await serviceUnderTest().limit(limit, actor);
|
await serviceUnderTest().limit(limit, actor);
|
||||||
|
|
||||||
expect(commands).toContainEqual(['set', 'rl_actor_test', '1:0', 'EX', 1]);
|
expect(commands).toContainEqual(['expire', 'rl_actor_test_c', 1]);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should set timestamp expiration', async () => {
|
||||||
|
const commands: unknown[][] = [];
|
||||||
|
mockRedis.push(command => {
|
||||||
|
commands.push(command);
|
||||||
|
return null;
|
||||||
|
});
|
||||||
|
|
||||||
|
await serviceUnderTest().limit(limit, actor);
|
||||||
|
|
||||||
|
expect(commands).toContainEqual(['expire', 'rl_actor_test_t', 1]);
|
||||||
});
|
});
|
||||||
|
|
||||||
it('should not increment when already blocked', async () => {
|
it('should not increment when already blocked', async () => {
|
||||||
|
|
Loading…
Reference in a new issue