pyrate_limiter package¶
- class pyrate_limiter.AbstractBucket
Bases:
ABCBase bucket interface Assumption: len(rates) always > 0 TODO: allow empty rates
- close()
Release any resources held by the bucket.
Subclasses may override this method to perform any necessary cleanup (e.g., closing files, network connections, or releasing locks) when the bucket is no longer needed.
- Return type:
None
- abstractmethod count()
Count number of items in the bucket
- Return type:
Union[int,Awaitable[int]]
- failing_rate = None
- abstractmethod flush()
Flush the whole bucket - Must remove failing-rate after flushing
- Return type:
Optional[Awaitable[None]]
- abstractmethod leak(current_timestamp=None)
leaking bucket - removing items that are outdated
- Return type:
Union[int,Awaitable[int]]
- limiter_lock()
An additional lock to be used by Limiter in-front of the thread lock. Intended for multiprocessing environments where a thread lock is insufficient.
- Return type:
Optional[object]
- now()
Retrieve current timestamp from the clock backend.
- abstractmethod peek(index)
Peek at the rate-item at a specific index in latest-to-earliest order NOTE: The reason we cannot peek from the start of the queue(earliest-to-latest) is we can’t really tell how many outdated items are still in the queue
- abstractmethod put(item)
Put an item (typically the current time) in the bucket return true if successful, otherwise false
- Return type:
Union[bool,Awaitable[bool]]
- rates
- waiting(item)
Calculate time until bucket become availabe to consume an item again
- Return type:
Union[int,Awaitable[int]]
- class pyrate_limiter.AbstractClock
Bases:
ABCClock that return timestamp for now
- abstractmethod now()
Get time as of now, in milliseconds
- Return type:
Union[int,Awaitable[int]]
- class pyrate_limiter.BucketAsyncWrapper(bucket)
Bases:
AbstractBucketBucketAsyncWrapper is a wrapping over any bucket that turns a async/synchronous bucket into an async one
- async count()
Count number of items in the bucket
- property failing_rate
The type of the None singleton.
- async flush()
Flush the whole bucket - Must remove failing-rate after flushing
- Return type:
None
- async leak(current_timestamp=None)
leaking bucket - removing items that are outdated
- Return type:
int
- now()
Retrieve current timestamp from the clock backend.
- Return type:
int
- async peek(index)
Peek at the rate-item at a specific index in latest-to-earliest order NOTE: The reason we cannot peek from the start of the queue(earliest-to-latest) is we can’t really tell how many outdated items are still in the queue
- Return type:
Optional[RateItem]
- async put(item)
Put an item (typically the current time) in the bucket return true if successful, otherwise false
- property rates
- async waiting(item)
Calculate time until bucket become availabe to consume an item again
- Return type:
int
- class pyrate_limiter.BucketFactory
Bases:
ABCAsbtract BucketFactory class. It is reserved for user to implement/override this class with his own bucket-routing/creating logic
- close()
- Return type:
None
- create(bucket_class, *args, **kwargs)
Creating a bucket dynamically
- Return type:
- dispose(bucket)
Delete a bucket from the factory
- Return type:
bool
- abstractmethod get(item)
Get the corresponding bucket to this item
- Return type:
Union[AbstractBucket,Awaitable[AbstractBucket]]
- get_buckets()
Iterator over all buckets in the factory
- Return type:
List[AbstractBucket]
- property leak_interval
Retrieve leak-interval from inner Leaker task
- schedule_leak(new_bucket)
Schedule all the buckets’ leak, reset bucket’s failing rate
- Return type:
None
- class pyrate_limiter.Duration(*values)
Bases:
EnumInterval helper class
- DAY = 86400000
- HOUR = 3600000
- MINUTE = 60000
- SECOND = 1000
- WEEK = 604800000
- static readable(value)
- Return type:
str
- class pyrate_limiter.InMemoryBucket(rates)
Bases:
AbstractBucketSimple In-memory Bucket using native list Clock can be either time.time or time.monotonic When leak, clock is required Pros: fast, safe, and precise Cons: since it resides in local memory, the data is not persistent, nor scalable Usecase: small applications, simple logic
- count()
Count number of items in the bucket
- Return type:
int
- flush()
Flush the whole bucket - Must remove failing-rate after flushing
- Return type:
None
- items
- leak(current_timestamp=None)
leaking bucket - removing items that are outdated
- Return type:
int
- peek(index)
Peek at the rate-item at a specific index in latest-to-earliest order NOTE: The reason we cannot peek from the start of the queue(earliest-to-latest) is we can’t really tell how many outdated items are still in the queue
- Return type:
RateItem|None
- put(item)
Put an item (typically the current time) in the bucket return true if successful, otherwise false
- Return type:
bool
- class pyrate_limiter.Limiter(argument, buffer_ms=50)
Bases:
objectThis class responsibility is to sum up all underlying logic and make working with async/sync functions easily
- __init__(argument, buffer_ms=50)
Init Limiter using either a single bucket / multiple-bucket factory / single rate / rate list.
- Parameters:
argument (Union[BucketFactory, AbstractBucket, Rate, List[Rate]]) – The bucket or rate configuration.
- as_decorator(*, name='ratelimiter', weight=1)
- bucket_factory
- buckets()
Get list of active buckets
- Return type:
List[AbstractBucket]
- buffer_ms
- close()
- Return type:
None
- dispose(bucket)
Dispose/Remove a specific bucket, using bucket-id or bucket object as param
- Return type:
bool
- handle_bucket_put(bucket, item, blocking, _force_async=False, deadline=None)
Putting item into bucket
- Return type:
Union[bool,Awaitable[bool]]
- lock
- try_acquire(name='pyrate', weight=1, blocking=True, timeout=-1)
Attempt to acquire a permit from the limiter.
- Parameters:
name (str, default "pyrate") – The bucket key to acquire from.
weight (int, default 1) – Number of permits to consume.
timeout (int | float, default -1) – Maximum time (in seconds) to wait; -1 means wait indefinitely.
blocking (bool, default True) – If True, block until a permit is available (subject to timeout); if False, return immediately.
- Returns:
True if the permit was acquired, False otherwise. Async limiters return an awaitable resolving to the same.
- Return type:
bool or Awaitable[bool]
- async try_acquire_async(name='pyrate', weight=1, blocking=True, timeout=-1)
Attempt to asynchronously acquire a permit from the limiter.
- Parameters:
name (str, default "pyrate") – The bucket key to acquire from.
weight (int, default 1) – Number of permits to consume.
blocking (bool, default True) – If True, wait until a permit is available (subject to timeout); if False, return immediately.
timeout (int | float, default -1) – Maximum time (in seconds) to wait; -1 means wait indefinitely.
- Returns:
True if the permit was acquired, False otherwise.
- Return type:
bool
Notes
This is the async variant of
try_acquire. A top-level, thread-local async lock is used to prevent blocking the event loop.
- class pyrate_limiter.MonotonicAsyncClock
Bases:
AbstractClockMonotonic Async Clock, meant for testing only
- async now()
Get monotonic time in milliseconds
- Return type:
int
- class pyrate_limiter.MonotonicClock
Bases:
AbstractClock- now()
Get monotonic time in milliseconds
- Return type:
int
- class pyrate_limiter.MultiprocessBucket(rates, items, mp_lock)
Bases:
InMemoryBucket- classmethod init(rates)
Creates a single ListProxy so that this bucket can be shared across multiple processes.
- items
- leak(current_timestamp=None)
leaking bucket - removing items that are outdated
- Return type:
int
- limiter_lock()
An additional lock to be used by Limiter in-front of the thread lock. Intended for multiprocessing environments where a thread lock is insufficient.
- mp_lock
- put(item)
Put an item (typically the current time) in the bucket return true if successful, otherwise false
- Return type:
bool
- pyrate_limiter.PgQueries
alias of
Queries
- class pyrate_limiter.PostgresBucket(pool, table, rates)
Bases:
AbstractBucket- close()
Release any resources held by the bucket.
Subclasses may override this method to perform any necessary cleanup (e.g., closing files, network connections, or releasing locks) when the bucket is no longer needed.
- count()
Count number of items in the bucket
- Return type:
Union[int,Awaitable[int]]
- flush()
Flush the whole bucket - Must remove failing-rate after flushing
- Return type:
Optional[Awaitable[None]]
- leak(current_timestamp=None)
leaking bucket - removing items that are outdated
- Return type:
Union[int,Awaitable[int]]
- peek(index)
Peek at the rate-item at a specific index in latest-to-earliest order NOTE: The reason we cannot peek from the start of the queue(earliest-to-latest) is we can’t really tell how many outdated items are still in the queue
- pool
- put(item)
Put an item (typically the current time) in the bucket return true if successful, otherwise false
- Return type:
Union[bool,Awaitable[bool]]
- table
- class pyrate_limiter.PostgresClock(pool)
Bases:
AbstractClockGet timestamp using Postgres as remote clock backend
- now()
Get current time in milliseconds using Postgres.
Falls back to local time if the DB query fails for any reason.
- Return type:
int
- class pyrate_limiter.Rate(limit, interval)
Bases:
objectRate definition.
- Parameters:
limit (
int) – Number of requests allowed withinintervalinterval (
Union[int,Duration]) – Time interval, in miliseconds
- interval
- limit
- class pyrate_limiter.RateItem(name, timestamp, weight=1)
Bases:
objectRateItem is a wrapper for bucket to work with
- name
- timestamp
- weight
- class pyrate_limiter.RedisBucket(rates, redis, bucket_key, script_hash)
Bases:
AbstractBucketA bucket using redis for storing data - We are not using redis’ built-in TIME since it is non-deterministic - In distributed context, use local server time or a remote time server - Each bucket instance use a dedicated connection to avoid race-condition - can be either sync or async
- bucket_key
- count()
Count number of items in the bucket
- flush()
Flush the whole bucket - Must remove failing-rate after flushing
- classmethod init(rates, redis, bucket_key)
- leak(current_timestamp=None)
leaking bucket - removing items that are outdated
- Return type:
Union[int,Awaitable[int]]
- now()
Retrieve current timestamp from the clock backend.
- peek(index)
Peek at the rate-item at a specific index in latest-to-earliest order NOTE: The reason we cannot peek from the start of the queue(earliest-to-latest) is we can’t really tell how many outdated items are still in the queue
- put(item)
Add item to key
- Return type:
Union[bool,Awaitable[bool]]
- rates
- redis
- script_hash
- class pyrate_limiter.SQLiteBucket(rates, conn, table, lock=None)
Bases:
AbstractBucketFor sqlite bucket, we are using the sql time function as the clock item’s timestamp wont matter here
- close()
Release any resources held by the bucket.
Subclasses may override this method to perform any necessary cleanup (e.g., closing files, network connections, or releasing locks) when the bucket is no longer needed.
- conn
- count()
Count number of items in the bucket
- Return type:
int
- flush()
Flush the whole bucket - Must remove failing-rate after flushing
- Return type:
None
- full_count_query
- classmethod init_from_file(rates, table='rate_bucket', db_path=None, create_new_table=True, use_file_lock=False)
- Return type:
- leak(current_timestamp=None)
Leaking/clean up bucket
- Return type:
int
- limiter_lock()
An additional lock to be used by Limiter in-front of the thread lock. Intended for multiprocessing environments where a thread lock is insufficient.
- lock
- now()
Retrieve current timestamp from the clock backend.
- peek(index)
Peek at the rate-item at a specific index in latest-to-earliest order NOTE: The reason we cannot peek from the start of the queue(earliest-to-latest) is we can’t really tell how many outdated items are still in the queue
- Return type:
Optional[RateItem]
- put(item)
Put an item (typically the current time) in the bucket return true if successful, otherwise false
- Return type:
bool
- rates
- table
- use_limiter_lock
- class pyrate_limiter.SQLiteClock(conn)
Bases:
AbstractClockGet timestamp using SQLite as remote clock backend
- __init__(conn)
In multiprocessing cases, use the bucket, so that a shared lock is used.
- classmethod default()
- now()
Get time as of now, in milliseconds
- Return type:
int
- time_query = "SELECT CAST(ROUND((julianday('now') - 2440587.5)*86400000) As INTEGER)"
- pyrate_limiter.SQLiteQueries
alias of
Queries
- class pyrate_limiter.SingleBucketFactory(bucket, schedule_leak=True)
Bases:
BucketFactorySingle-bucket factory for quick use with Limiter
- __init__(bucket, schedule_leak=True)
Initialize the SingleBucketFactory with a bucket and an optional leak scheduling flag.
schedule_leak (bool): If True, the factory will schedule periodic leaks for the bucket. Default is True. Disable only if you plan to handle leaking manually.
- bucket
- get(_)
Get the corresponding bucket to this item
- Return type:
- wrap_item(name, weight=1)
Add the current timestamp to the receiving item using any clock backend - Turn it into a RateItem - Can return either a coroutine or a RateItem instance
- pyrate_limiter.binary_search(items, value)
Find the index of item in list where left.timestamp < value <= right.timestamp this is to determine the current size of some window that stretches from now back to lower-boundary = value and
- Return type:
int
- pyrate_limiter.dedicated_sqlite_clock_connection()
- pyrate_limiter.id_generator(size=10)
- Return type:
str
- pyrate_limiter.validate_rate_list(rates)
Raise false if rates are incorrectly ordered.
- Return type:
bool
Subpackages¶
- pyrate_limiter.abstracts package
- pyrate_limiter.buckets package
InMemoryBucketMultiprocessBucketPgQueriesPostgresBucketRedisBucketSQLiteBucketSQLiteBucket.close()SQLiteBucket.connSQLiteBucket.count()SQLiteBucket.flush()SQLiteBucket.full_count_querySQLiteBucket.init_from_file()SQLiteBucket.leak()SQLiteBucket.limiter_lock()SQLiteBucket.lockSQLiteBucket.now()SQLiteBucket.peek()SQLiteBucket.put()SQLiteBucket.ratesSQLiteBucket.tableSQLiteBucket.use_limiter_lock
SQLiteClockSQLiteQueries- Submodules
- pyrate_limiter.extras package