pyrate_limiter package

class pyrate_limiter.AbstractBucket

Bases: ABC

Base bucket interface Assumption: len(rates) always > 0 TODO: allow empty rates

close()

Release any resources held by the bucket.

Subclasses may override this method to perform any necessary cleanup (e.g., closing files, network connections, or releasing locks) when the bucket is no longer needed.

Return type:

None

abstractmethod count()

Count number of items in the bucket

Return type:

Union[int, Awaitable[int]]

failing_rate = None
abstractmethod flush()

Flush the whole bucket - Must remove failing-rate after flushing

Return type:

Optional[Awaitable[None]]

abstractmethod leak(current_timestamp=None)

leaking bucket - removing items that are outdated

Return type:

Union[int, Awaitable[int]]

limiter_lock()

An additional lock to be used by Limiter in-front of the thread lock. Intended for multiprocessing environments where a thread lock is insufficient.

Return type:

Optional[object]

now()

Retrieve current timestamp from the clock backend.

abstractmethod peek(index)

Peek at the rate-item at a specific index in latest-to-earliest order NOTE: The reason we cannot peek from the start of the queue(earliest-to-latest) is we can’t really tell how many outdated items are still in the queue

Return type:

Union[RateItem, None, Awaitable[Optional[RateItem]]]

abstractmethod put(item)

Put an item (typically the current time) in the bucket return true if successful, otherwise false

Return type:

Union[bool, Awaitable[bool]]

rates
waiting(item)

Calculate time until bucket become availabe to consume an item again

Return type:

Union[int, Awaitable[int]]

class pyrate_limiter.AbstractClock

Bases: ABC

Clock that return timestamp for now

abstractmethod now()

Get time as of now, in milliseconds

Return type:

Union[int, Awaitable[int]]

class pyrate_limiter.BucketAsyncWrapper(bucket)

Bases: AbstractBucket

BucketAsyncWrapper is a wrapping over any bucket that turns a async/synchronous bucket into an async one

async count()

Count number of items in the bucket

property failing_rate

The type of the None singleton.

async flush()

Flush the whole bucket - Must remove failing-rate after flushing

Return type:

None

async leak(current_timestamp=None)

leaking bucket - removing items that are outdated

Return type:

int

now()

Retrieve current timestamp from the clock backend.

Return type:

int

async peek(index)

Peek at the rate-item at a specific index in latest-to-earliest order NOTE: The reason we cannot peek from the start of the queue(earliest-to-latest) is we can’t really tell how many outdated items are still in the queue

Return type:

Optional[RateItem]

async put(item)

Put an item (typically the current time) in the bucket return true if successful, otherwise false

property rates
async waiting(item)

Calculate time until bucket become availabe to consume an item again

Return type:

int

class pyrate_limiter.BucketFactory

Bases: ABC

Asbtract BucketFactory class. It is reserved for user to implement/override this class with his own bucket-routing/creating logic

close()
Return type:

None

create(bucket_class, *args, **kwargs)

Creating a bucket dynamically

Return type:

AbstractBucket

dispose(bucket)

Delete a bucket from the factory

Return type:

bool

abstractmethod get(item)

Get the corresponding bucket to this item

Return type:

Union[AbstractBucket, Awaitable[AbstractBucket]]

get_buckets()

Iterator over all buckets in the factory

Return type:

List[AbstractBucket]

property leak_interval

Retrieve leak-interval from inner Leaker task

schedule_leak(new_bucket)

Schedule all the buckets’ leak, reset bucket’s failing rate

Return type:

None

abstractmethod wrap_item(name, weight=1)

Add the current timestamp to the receiving item using any clock backend - Turn it into a RateItem - Can return either a coroutine or a RateItem instance

Return type:

Union[RateItem, Awaitable[RateItem]]

class pyrate_limiter.Duration(*values)

Bases: Enum

Interval helper class

DAY = 86400000
HOUR = 3600000
MINUTE = 60000
SECOND = 1000
WEEK = 604800000
static readable(value)
Return type:

str

class pyrate_limiter.InMemoryBucket(rates)

Bases: AbstractBucket

Simple In-memory Bucket using native list Clock can be either time.time or time.monotonic When leak, clock is required Pros: fast, safe, and precise Cons: since it resides in local memory, the data is not persistent, nor scalable Usecase: small applications, simple logic

count()

Count number of items in the bucket

Return type:

int

flush()

Flush the whole bucket - Must remove failing-rate after flushing

Return type:

None

items
leak(current_timestamp=None)

leaking bucket - removing items that are outdated

Return type:

int

peek(index)

Peek at the rate-item at a specific index in latest-to-earliest order NOTE: The reason we cannot peek from the start of the queue(earliest-to-latest) is we can’t really tell how many outdated items are still in the queue

Return type:

RateItem | None

put(item)

Put an item (typically the current time) in the bucket return true if successful, otherwise false

Return type:

bool

class pyrate_limiter.Limiter(argument, buffer_ms=50)

Bases: object

This class responsibility is to sum up all underlying logic and make working with async/sync functions easily

__init__(argument, buffer_ms=50)

Init Limiter using either a single bucket / multiple-bucket factory / single rate / rate list.

Parameters:

argument (Union[BucketFactory, AbstractBucket, Rate, List[Rate]]) – The bucket or rate configuration.

as_decorator(*, name='ratelimiter', weight=1)
bucket_factory
buckets()

Get list of active buckets

Return type:

List[AbstractBucket]

buffer_ms
close()
Return type:

None

dispose(bucket)

Dispose/Remove a specific bucket, using bucket-id or bucket object as param

Return type:

bool

handle_bucket_put(bucket, item, blocking, _force_async=False, deadline=None)

Putting item into bucket

Return type:

Union[bool, Awaitable[bool]]

lock
try_acquire(name='pyrate', weight=1, blocking=True, timeout=-1)

Attempt to acquire a permit from the limiter.

Parameters:
  • name (str, default "pyrate") – The bucket key to acquire from.

  • weight (int, default 1) – Number of permits to consume.

  • timeout (int | float, default -1) – Maximum time (in seconds) to wait; -1 means wait indefinitely.

  • blocking (bool, default True) – If True, block until a permit is available (subject to timeout); if False, return immediately.

Returns:

True if the permit was acquired, False otherwise. Async limiters return an awaitable resolving to the same.

Return type:

bool or Awaitable[bool]

async try_acquire_async(name='pyrate', weight=1, blocking=True, timeout=-1)

Attempt to asynchronously acquire a permit from the limiter.

Parameters:
  • name (str, default "pyrate") – The bucket key to acquire from.

  • weight (int, default 1) – Number of permits to consume.

  • blocking (bool, default True) – If True, wait until a permit is available (subject to timeout); if False, return immediately.

  • timeout (int | float, default -1) – Maximum time (in seconds) to wait; -1 means wait indefinitely.

Returns:

True if the permit was acquired, False otherwise.

Return type:

bool

Notes

This is the async variant of try_acquire. A top-level, thread-local async lock is used to prevent blocking the event loop.

class pyrate_limiter.MonotonicAsyncClock

Bases: AbstractClock

Monotonic Async Clock, meant for testing only

async now()

Get monotonic time in milliseconds

Return type:

int

class pyrate_limiter.MonotonicClock

Bases: AbstractClock

now()

Get monotonic time in milliseconds

Return type:

int

class pyrate_limiter.MultiprocessBucket(rates, items, mp_lock)

Bases: InMemoryBucket

classmethod init(rates)

Creates a single ListProxy so that this bucket can be shared across multiple processes.

items
leak(current_timestamp=None)

leaking bucket - removing items that are outdated

Return type:

int

limiter_lock()

An additional lock to be used by Limiter in-front of the thread lock. Intended for multiprocessing environments where a thread lock is insufficient.

mp_lock
put(item)

Put an item (typically the current time) in the bucket return true if successful, otherwise false

Return type:

bool

pyrate_limiter.PgQueries

alias of Queries

class pyrate_limiter.PostgresBucket(pool, table, rates)

Bases: AbstractBucket

close()

Release any resources held by the bucket.

Subclasses may override this method to perform any necessary cleanup (e.g., closing files, network connections, or releasing locks) when the bucket is no longer needed.

count()

Count number of items in the bucket

Return type:

Union[int, Awaitable[int]]

flush()

Flush the whole bucket - Must remove failing-rate after flushing

Return type:

Optional[Awaitable[None]]

leak(current_timestamp=None)

leaking bucket - removing items that are outdated

Return type:

Union[int, Awaitable[int]]

peek(index)

Peek at the rate-item at a specific index in latest-to-earliest order NOTE: The reason we cannot peek from the start of the queue(earliest-to-latest) is we can’t really tell how many outdated items are still in the queue

Return type:

Union[RateItem, None, Awaitable[Optional[RateItem]]]

pool
put(item)

Put an item (typically the current time) in the bucket return true if successful, otherwise false

Return type:

Union[bool, Awaitable[bool]]

table
class pyrate_limiter.PostgresClock(pool)

Bases: AbstractClock

Get timestamp using Postgres as remote clock backend

now()

Get current time in milliseconds using Postgres.

Falls back to local time if the DB query fails for any reason.

Return type:

int

class pyrate_limiter.Rate(limit, interval)

Bases: object

Rate definition.

Parameters:
  • limit (int) – Number of requests allowed within interval

  • interval (Union[int, Duration]) – Time interval, in miliseconds

interval
limit
class pyrate_limiter.RateItem(name, timestamp, weight=1)

Bases: object

RateItem is a wrapper for bucket to work with

name
timestamp
weight
class pyrate_limiter.RedisBucket(rates, redis, bucket_key, script_hash)

Bases: AbstractBucket

A bucket using redis for storing data - We are not using redis’ built-in TIME since it is non-deterministic - In distributed context, use local server time or a remote time server - Each bucket instance use a dedicated connection to avoid race-condition - can be either sync or async

bucket_key
count()

Count number of items in the bucket

flush()

Flush the whole bucket - Must remove failing-rate after flushing

classmethod init(rates, redis, bucket_key)
leak(current_timestamp=None)

leaking bucket - removing items that are outdated

Return type:

Union[int, Awaitable[int]]

now()

Retrieve current timestamp from the clock backend.

peek(index)

Peek at the rate-item at a specific index in latest-to-earliest order NOTE: The reason we cannot peek from the start of the queue(earliest-to-latest) is we can’t really tell how many outdated items are still in the queue

Return type:

Union[RateItem, None, Awaitable[Optional[RateItem]]]

put(item)

Add item to key

Return type:

Union[bool, Awaitable[bool]]

rates
redis
script_hash
class pyrate_limiter.SQLiteBucket(rates, conn, table, lock=None)

Bases: AbstractBucket

For sqlite bucket, we are using the sql time function as the clock item’s timestamp wont matter here

close()

Release any resources held by the bucket.

Subclasses may override this method to perform any necessary cleanup (e.g., closing files, network connections, or releasing locks) when the bucket is no longer needed.

conn
count()

Count number of items in the bucket

Return type:

int

flush()

Flush the whole bucket - Must remove failing-rate after flushing

Return type:

None

full_count_query
classmethod init_from_file(rates, table='rate_bucket', db_path=None, create_new_table=True, use_file_lock=False)
Return type:

SQLiteBucket

leak(current_timestamp=None)

Leaking/clean up bucket

Return type:

int

limiter_lock()

An additional lock to be used by Limiter in-front of the thread lock. Intended for multiprocessing environments where a thread lock is insufficient.

lock
now()

Retrieve current timestamp from the clock backend.

peek(index)

Peek at the rate-item at a specific index in latest-to-earliest order NOTE: The reason we cannot peek from the start of the queue(earliest-to-latest) is we can’t really tell how many outdated items are still in the queue

Return type:

Optional[RateItem]

put(item)

Put an item (typically the current time) in the bucket return true if successful, otherwise false

Return type:

bool

rates
table
use_limiter_lock
class pyrate_limiter.SQLiteClock(conn)

Bases: AbstractClock

Get timestamp using SQLite as remote clock backend

__init__(conn)

In multiprocessing cases, use the bucket, so that a shared lock is used.

classmethod default()
now()

Get time as of now, in milliseconds

Return type:

int

time_query = "SELECT CAST(ROUND((julianday('now') - 2440587.5)*86400000) As INTEGER)"
pyrate_limiter.SQLiteQueries

alias of Queries

class pyrate_limiter.SingleBucketFactory(bucket, schedule_leak=True)

Bases: BucketFactory

Single-bucket factory for quick use with Limiter

__init__(bucket, schedule_leak=True)

Initialize the SingleBucketFactory with a bucket and an optional leak scheduling flag.

schedule_leak (bool): If True, the factory will schedule periodic leaks for the bucket. Default is True. Disable only if you plan to handle leaking manually.

bucket
get(_)

Get the corresponding bucket to this item

Return type:

AbstractBucket

wrap_item(name, weight=1)

Add the current timestamp to the receiving item using any clock backend - Turn it into a RateItem - Can return either a coroutine or a RateItem instance

pyrate_limiter.binary_search(items, value)

Find the index of item in list where left.timestamp < value <= right.timestamp this is to determine the current size of some window that stretches from now back to lower-boundary = value and

Return type:

int

pyrate_limiter.dedicated_sqlite_clock_connection()
pyrate_limiter.id_generator(size=10)
Return type:

str

pyrate_limiter.validate_rate_list(rates)

Raise false if rates are incorrectly ordered.

Return type:

bool

Subpackages

Submodules