API reference

Synchronization primitives

Events

class aiologic.Event

Bases: object

static __new__(cls, /) Self

__getnewargs__() tuple[Any, ...]

Returns arguments that can be used to create new instances with the same initial values.

Used by:

  • The pickle module for pickling.

  • The copy module for copying.

The current state does not affect the arguments.

Example

>>> orig = Event()
>>> copy = Event(*orig.__getnewargs__())
__getstate__() None

Disables the use of internal state for pickling and copying.

__copy__() Self

__repr__() str

__bool__() bool

Returns True if the event is set.

Used by the standard truth testing procedure.

Example

>>> finished = Event()  # event is unset
>>> bool(finished)
False
>>> finished.set()  # event is set
>>> bool(finished)
True
__await__(timeout: float | None = None) Generator[Any, Any, bool]

async with_(timeout: float | None = None) bool

wait(timeout: float | None = None) bool

set() None

is_set() bool

Return True if the event is set.

Example

>>> event = Event()
>>> event.is_set()
False
>>> event.set()
>>> event.is_set()
True
property waiting: int

The current number of tasks waiting for the event.

It represents the length of the waiting queue and thus changes immediately.

class aiologic.REvent

Bases: Event

static __new__(cls, /) Self

__getnewargs__() tuple[Any, ...]

Returns arguments that can be used to create new instances with the same initial values.

Used by:

  • The pickle module for pickling.

  • The copy module for copying.

The current state does not affect the arguments.

Example

>>> orig = REvent()
>>> copy = REvent(*orig.__getnewargs__())
__getstate__() None

Disables the use of internal state for pickling and copying.

__copy__() Self

__repr__() str

__bool__() bool

Returns True if the event is set.

Used by the standard truth testing procedure.

Example

>>> running = REvent()  # event is unset
>>> bool(running)
False
>>> running.set()  # event is set
>>> bool(running)
True
>>> running.clear()  # event is unset
>>> bool(running)
False
__await__(timeout: float | None = None) Generator[Any, Any, bool]

async with_(timeout: float | None = None) bool

wait(timeout: float | None = None) bool

clear() None

set() None

is_set() bool

Return True if the event is set.

Example

>>> event = REvent()
>>> event.is_set()
False
>>> event.set()
>>> event.is_set()
True
>>> event.clear()
>>> event.is_set()
False
property waiting: int

The current number of tasks waiting for the event.

It represents the length of the waiting queue and thus changes immediately.

class aiologic.CountdownEvent

Bases: object

static __new__(cls, /, initial_value: int | DefaultType = DEFAULT) Self

__getnewargs__() tuple[Any, ...]

Returns arguments that can be used to create new instances with the same initial values.

Used by:

  • The pickle module for pickling.

  • The copy module for copying.

The current state does not affect the arguments.

Example

>>> orig = CountdownEvent(1)
>>> orig.initial_value
1
>>> copy = CountdownEvent(*orig.__getnewargs__())
>>> copy.initial_value
1
__getstate__() None

Disables the use of internal state for pickling and copying.

__copy__() Self

__repr__() str

__bool__() bool

Returns True if the event is set.

Used by the standard truth testing procedure.

Example

>>> done = CountdownEvent()  # event is set
>>> bool(done)
True
>>> done.up()  # event is unset
>>> bool(done)
False
>>> done.down()  # event is set
>>> bool(done)
True
__await__(timeout: float | None = None) Generator[Any, Any, bool]

async with_(timeout: float | None = None) bool

wait(timeout: float | None = None) bool

up(count: int = 1) None

down() None

clear() None

property initial_value: int

The initial number of down() calls required to set the event.

property value: int

The current number of down() calls remaining to set the event.

property waiting: int

The current number of tasks waiting for the event.

It represents the length of the waiting queue and thus changes immediately.

Barriers

class aiologic.Latch

Bases: object

static __new__(cls, /, parties: int | DefaultType = DEFAULT) Self

__getnewargs__() tuple[Any, ...]

Returns arguments that can be used to create new instances with the same initial values.

Used by:

  • The pickle module for pickling.

  • The copy module for copying.

The current state does not affect the arguments.

Example

>>> orig = Latch(4)
>>> orig.parties
4
>>> copy = Latch(*orig.__getnewargs__())
>>> copy.parties
4
__getstate__() None

Disables the use of internal state for pickling and copying.

__copy__() Self

__repr__() str

__bool__() bool

Returns True if the barrier has been passed or broken.

Used by the standard truth testing procedure.

Example

>>> started = Latch(1)  # barrier is filling
>>> bool(started)
False
>>> started.wait()  # barrier is draining
>>> bool(started)
True
>>> started.abort()  # barrier is broken
>>> bool(started)
True
__await__(timeout: float | None = None) Generator[Any, Any, None]

async with_(timeout: float | None = None) None

wait(timeout: float | None = None) None

abort() None

property parties: int

The initial number of tasks required to pass the barrier.

property broken: bool

A boolean that is True if the barrier is in the broken state.

property waiting: int

The current number of tasks waiting to pass.

It represents the length of the waiting queue and thus changes immediately.

class aiologic.Barrier

Bases: object

static __new__(cls, /, parties: int | DefaultType = DEFAULT) Self

__getnewargs__() tuple[Any, ...]

Returns arguments that can be used to create new instances with the same initial values.

Used by:

  • The pickle module for pickling.

  • The copy module for copying.

The current state does not affect the arguments.

Example

>>> orig = Barrier(4)
>>> orig.parties
4
>>> copy = Barrier(*orig.__getnewargs__())
>>> copy.parties
4
__getstate__() None

Disables the use of internal state for pickling and copying.

__copy__() Self

__repr__() str

async __aenter__() int

__enter__() int

async __aexit__(exc_type: type[BaseException] | None, exc_value: BaseException | None, traceback: TracebackType | None) None

__exit__(exc_type: type[BaseException] | None, exc_value: BaseException | None, traceback: TracebackType | None) None

__await__(timeout: float | None = None) Generator[Any, Any, int]

async with_(timeout: float | None = None) int

wait(timeout: float | None = None) int

abort() None

property parties: int

The initial number of tasks required to pass the barrier.

property broken: bool

A boolean that is True if the barrier is in the broken state.

property waiting: int

The current number of tasks waiting to pass.

It represents the length of the waiting queue and thus changes immediately.

class aiologic.RBarrier

Bases: Barrier

static __new__(cls, /, parties: int | DefaultType = DEFAULT) Self

__getnewargs__() tuple[Any, ...]

Returns arguments that can be used to create new instances with the same initial values.

Used by:

  • The pickle module for pickling.

  • The copy module for copying.

The current state does not affect the arguments.

Example

>>> orig = RBarrier(4)
>>> orig.parties
4
>>> copy = RBarrier(*orig.__getnewargs__())
>>> copy.parties
4
__getstate__() None

Disables the use of internal state for pickling and copying.

__copy__() Self

__repr__() str

async __aenter__() int

__enter__() int

async __aexit__(exc_type: type[BaseException] | None, exc_value: BaseException | None, traceback: TracebackType | None) None

__exit__(exc_type: type[BaseException] | None, exc_value: BaseException | None, traceback: TracebackType | None) None

__await__(timeout: float | None = None) Generator[Any, Any, int]

async with_(timeout: float | None = None) int

wait(timeout: float | None = None) int

reset() None

abort() None

property parties: int

The initial number of tasks required to pass the barrier.

property broken: bool

A boolean that is True if the barrier is in the broken state.

property waiting: int

The current number of tasks waiting to pass.

It represents the length of the waiting queue and thus changes immediately.

exception aiologic.BrokenBarrierError

Bases: RuntimeError

Semaphores

class aiologic.Semaphore

Bases: object

static __new__(cls, /, initial_value: int | DefaultType = DEFAULT, max_value: None = None) Self
static __new__(cls, /, initial_value: int | DefaultType, max_value: int | DefaultType) BoundedSemaphore
static __new__(cls, /, *, max_value: int | DefaultType) BoundedSemaphore

__getnewargs__() tuple[Any, ...]

Returns arguments that can be used to create new instances with the same initial values.

Used by:

  • The pickle module for pickling.

  • The copy module for copying.

The current state does not affect the arguments.

Example

>>> orig = Semaphore(2)
>>> orig.initial_value
2
>>> copy = Semaphore(*orig.__getnewargs__())
>>> copy.initial_value
2
__getstate__() None

Disables the use of internal state for pickling and copying.

__copy__() Self

__repr__() str

async __aenter__() Self

__enter__() Self

async __aexit__(exc_type: type[BaseException] | None, exc_value: BaseException | None, traceback: TracebackType | None) None

__exit__(exc_type: type[BaseException] | None, exc_value: BaseException | None, traceback: TracebackType | None) None

async async_acquire(*, blocking: bool = True, timeout: float | None = None) bool

green_acquire(*, blocking: bool = True, timeout: float | None = None) bool

release(count: int = 1) None

async_release(count: int = 1) None

green_release(count: int = 1) None

property initial_value: int

The initial number of permits available for acquiring.

property value: int

The current number of permits available to be acquired.

It may not change after release if all the released permits have been reassigned to waiting tasks.

property waiting: int

The current number of tasks waiting to acquire.

It represents the length of the waiting queue and thus changes immediately.

class aiologic.BoundedSemaphore

Bases: Semaphore

static __new__(cls, /, initial_value: int | DefaultType = DEFAULT, max_value: int | DefaultType = DEFAULT) Self

__getnewargs__() tuple[Any, ...]

Returns arguments that can be used to create new instances with the same initial values.

Used by:

  • The pickle module for pickling.

  • The copy module for copying.

The current state does not affect the arguments.

Example

>>> orig = BoundedSemaphore(2)
>>> orig.max_value
2
>>> copy = BoundedSemaphore(*orig.__getnewargs__())
>>> copy.max_value
2
__getstate__() None

Disables the use of internal state for pickling and copying.

__copy__() Self

__repr__() str

async __aenter__() Self

__enter__() Self

async __aexit__(exc_type: type[BaseException] | None, exc_value: BaseException | None, traceback: TracebackType | None) None

__exit__(exc_type: type[BaseException] | None, exc_value: BaseException | None, traceback: TracebackType | None) None

async async_acquire(*, blocking: bool = True, timeout: float | None = None) bool

green_acquire(*, blocking: bool = True, timeout: float | None = None) bool

release(count: int = 1) None

async_release(count: int = 1) None

green_release(count: int = 1) None

property initial_value: int

The initial number of permits available for acquiring.

property max_value: int

The maximum number of permits which the semaphore can hold.

property value: int

The current number of permits available to be acquired.

It may not change after release if all the released permits have been reassigned to waiting tasks.

property waiting: int

The current number of tasks waiting to acquire.

It represents the length of the waiting queue and thus changes immediately.

class aiologic.BinarySemaphore

Bases: Semaphore

static __new__(cls, /, initial_value: int | DefaultType = DEFAULT, max_value: None = None) Self
static __new__(cls, /, initial_value: int | DefaultType, max_value: int | DefaultType) BoundedBinarySemaphore
static __new__(cls, /, *, max_value: int | DefaultType) BoundedBinarySemaphore

__getnewargs__() tuple[Any, ...]

Returns arguments that can be used to create new instances with the same initial values.

Used by:

  • The pickle module for pickling.

  • The copy module for copying.

The current state does not affect the arguments.

Example

>>> orig = BinarySemaphore(0)
>>> orig.initial_value
0
>>> copy = BinarySemaphore(*orig.__getnewargs__())
>>> copy.initial_value
0
__getstate__() None

Disables the use of internal state for pickling and copying.

__copy__() Self

__repr__() str

async __aenter__() Self

__enter__() Self

async __aexit__(exc_type: type[BaseException] | None, exc_value: BaseException | None, traceback: TracebackType | None) None

__exit__(exc_type: type[BaseException] | None, exc_value: BaseException | None, traceback: TracebackType | None) None

async async_acquire(*, blocking: bool = True, timeout: float | None = None) bool

green_acquire(*, blocking: bool = True, timeout: float | None = None) bool

release(count: int = 1) None

async_release(count: int = 1) None

green_release(count: int = 1) None

property initial_value: int

The initial number of permits available for acquiring.

property value: int

The current number of permits available to be acquired.

It may not change after release if all the released permits have been reassigned to waiting tasks.

property waiting: int

The current number of tasks waiting to acquire.

It represents the length of the waiting queue and thus changes immediately.

class aiologic.BoundedBinarySemaphore

Bases: BinarySemaphore, BoundedSemaphore

static __new__(cls, /, initial_value: int | DefaultType = DEFAULT, max_value: int | DefaultType = DEFAULT) Self

__getnewargs__() tuple[Any, ...]

Returns arguments that can be used to create new instances with the same initial values.

Used by:

  • The pickle module for pickling.

  • The copy module for copying.

The current state does not affect the arguments.

Example

>>> orig = BoundedBinarySemaphore(0)
>>> orig.max_value
0
>>> copy = BoundedBinarySemaphore(*orig.__getnewargs__())
>>> copy.max_value
0
__getstate__() None

Disables the use of internal state for pickling and copying.

__copy__() Self

__repr__() str

async __aenter__() Self

__enter__() Self

async __aexit__(exc_type: type[BaseException] | None, exc_value: BaseException | None, traceback: TracebackType | None) None

__exit__(exc_type: type[BaseException] | None, exc_value: BaseException | None, traceback: TracebackType | None) None

async async_acquire(*, blocking: bool = True, timeout: float | None = None) bool

green_acquire(*, blocking: bool = True, timeout: float | None = None) bool

release(count: int = 1) None

async_release(count: int = 1) None

green_release(count: int = 1) None

property initial_value: int

The initial number of permits available for acquiring.

property max_value: int

The maximum number of permits which the semaphore can hold.

property value: int

The current number of permits available to be acquired.

It may not change after release if all the released permits have been reassigned to waiting tasks.

property waiting: int

The current number of tasks waiting to acquire.

It represents the length of the waiting queue and thus changes immediately.

Capacity limiters

class aiologic.CapacityLimiter

Bases: object

static __new__(cls, /, total_tokens: int | DefaultType = DEFAULT) Self

__getnewargs__() tuple[Any, ...]

Returns arguments that can be used to create new instances with the same initial values.

Used by:

  • The pickle module for pickling.

  • The copy module for copying.

The current state does not affect the arguments.

Example

>>> orig = CapacityLimiter(3)
>>> orig.total_tokens
3
>>> copy = CapacityLimiter(*orig.__getnewargs__())
>>> copy.total_tokens
3
__getstate__() None

Disables the use of internal state for pickling and copying.

__copy__() Self

__repr__() str

__bool__() bool

Returns True if the capacity limiter is used by any task.

Used by the standard truth testing procedure.

Example

>>> reading = CapacityLimiter()
>>> bool(reading)
False
>>> with reading:  # capacity limiter is in use
...     bool(reading)
True
>>> bool(reading)
False
async __aenter__() Self

__enter__() Self

async __aexit__(exc_type: type[BaseException] | None, exc_value: BaseException | None, traceback: TracebackType | None) None

__exit__(exc_type: type[BaseException] | None, exc_value: BaseException | None, traceback: TracebackType | None) None

async async_acquire(*, blocking: bool = True, timeout: float | None = None) bool

green_acquire(*, blocking: bool = True, timeout: float | None = None) bool

async_release() None

green_release() None

async_borrowed() bool

Return True if the current async task holds any token.

Example

>>> limiter = CapacityLimiter()
>>> limiter.async_borrowed()
False
>>> async with limiter:
...     limiter.async_borrowed()
True
>>> limiter.async_borrowed()
False
green_borrowed() bool

Return True if the current green task holds any token.

Example

>>> limiter = CapacityLimiter()
>>> limiter.green_borrowed()
False
>>> with limiter:
...     limiter.green_borrowed()
True
>>> limiter.green_borrowed()
False
property total_tokens: int

The initial number of tokens available for borrowing.

property available_tokens: int

The current number of tokens available to be borrowed.

It may not change after release if all the released tokens have been reassigned to waiting tasks.

property borrowed_tokens: int

The current number of tokens that have been borrowed.

It may not change after release if all the released tokens have been reassigned to waiting tasks.

property borrowers: MappingProxyType

The read-only proxy of the dictionary that maps tasks’ identifiers to their respective recursion levels. Contains identifiers of only those tasks that hold any token. Updated automatically when the current state changes.

It may not contain identifiers of those tasks to which tokens were reassigned during release if they have not yet resumed execution.

property waiting: int

The current number of tasks waiting to borrow.

It represents the length of the waiting queue and thus changes immediately.

class aiologic.RCapacityLimiter

Bases: CapacityLimiter

static __new__(cls, /, total_tokens: int | DefaultType = DEFAULT) Self

__getnewargs__() tuple[Any, ...]

Returns arguments that can be used to create new instances with the same initial values.

Used by:

  • The pickle module for pickling.

  • The copy module for copying.

The current state does not affect the arguments.

Example

>>> orig = RCapacityLimiter(3)
>>> orig.total_tokens
3
>>> copy = RCapacityLimiter(*orig.__getnewargs__())
>>> copy.total_tokens
3
__getstate__() None

Disables the use of internal state for pickling and copying.

__copy__() Self

__repr__() str

__bool__() bool

Returns True if the capacity limiter is used by any task.

Used by the standard truth testing procedure.

Example

>>> reading = RCapacityLimiter()
>>> bool(reading)
False
>>> with reading:  # capacity limiter is in use
...     bool(reading)
True
>>> bool(reading)
False
async __aenter__() Self

__enter__() Self

async __aexit__(exc_type: type[BaseException] | None, exc_value: BaseException | None, traceback: TracebackType | None) None

__exit__(exc_type: type[BaseException] | None, exc_value: BaseException | None, traceback: TracebackType | None) None

async async_acquire(count: int = 1, *, blocking: bool = True, timeout: float | None = None) bool

green_acquire(count: int = 1, *, blocking: bool = True, timeout: float | None = None) bool

async_release(count: int = 1) None

green_release(count: int = 1) None

async_borrowed() bool

Return True if the current async task holds any token.

Example

>>> limiter = RCapacityLimiter()
>>> limiter.async_borrowed()
False
>>> async with limiter:
...     limiter.async_borrowed()
True
>>> limiter.async_borrowed()
False
green_borrowed() bool

Return True if the current green task holds any token.

Example

>>> limiter = RCapacityLimiter()
>>> limiter.green_borrowed()
False
>>> with limiter:
...     limiter.green_borrowed()
True
>>> limiter.green_borrowed()
False
async_count() int

Return the recursion level of the current async task.

Example

>>> limiter = RCapacityLimiter()
>>> limiter.async_count()
0
>>> async with limiter:
...     limiter.async_count()
1
>>> limiter.async_count()
0
green_count() int

Return the recursion level of the current green task.

Example

>>> limiter = RCapacityLimiter()
>>> limiter.green_count()
0
>>> with limiter:
...     limiter.green_count()
1
>>> limiter.green_count()
0
property total_tokens: int

The initial number of tokens available for borrowing.

property available_tokens: int

The current number of tokens available to be borrowed.

It may not change after release if all the released tokens have been reassigned to waiting tasks.

property borrowed_tokens: int

The current number of tokens that have been borrowed.

It may not change after release if all the released tokens have been reassigned to waiting tasks.

property borrowers: MappingProxyType

The read-only proxy of the dictionary that maps tasks’ identifiers to their respective recursion levels. Contains identifiers of only those tasks that hold any token. Updated automatically when the current state changes.

It may not contain identifiers of those tasks to which tokens were reassigned during release if they have not yet resumed execution.

property waiting: int

The current number of tasks waiting to borrow.

It represents the length of the waiting queue and thus changes immediately.

Locks

class aiologic.Lock

Bases: object

static __new__(cls, /) Self

__getnewargs__() tuple[Any, ...]

Returns arguments that can be used to create new instances with the same initial values.

Used by:

  • The pickle module for pickling.

  • The copy module for copying.

The current state does not affect the arguments.

Example

>>> orig = Lock()
>>> copy = Lock(*orig.__getnewargs__())
__getstate__() None

Disables the use of internal state for pickling and copying.

__copy__() Self

__repr__() str

__bool__() bool

Returns True if the lock is used by any task.

Used by the standard truth testing procedure.

Example

>>> writing = Lock()
>>> bool(writing)
False
>>> with writing:  # lock is in use
...     bool(writing)
True
>>> bool(writing)
False
async __aenter__() Self

__enter__() Self

async __aexit__(exc_type: type[BaseException] | None, exc_value: BaseException | None, traceback: TracebackType | None) None

__exit__(exc_type: type[BaseException] | None, exc_value: BaseException | None, traceback: TracebackType | None) None

async async_acquire(*, blocking: bool = True, timeout: float | None = None) bool

green_acquire(*, blocking: bool = True, timeout: float | None = None) bool

async_release() None

green_release() None

async_owned() bool

Return True if the current async task owns the lock.

Unlike the owner property, always reliable.

Example

>>> lock = Lock()
>>> lock.async_owned()
False
>>> async with lock:
...     lock.async_owned()
True
>>> lock.async_owned()
False
green_owned() bool

Return True if the current green task owns the lock.

Unlike the owner property, always reliable.

Example

>>> lock = Lock()
>>> lock.green_owned()
False
>>> with lock:
...     lock.green_owned()
True
>>> lock.green_owned()
False
locked() bool

Return True if anyone owns the lock.

Example

>>> import asyncio
>>> async def own_the_lock():
...     async with lock:
...         await asyncio.sleep(3600)
>>> lock = Lock()
>>> lock.locked()
False
>>> task = asyncio.create_task(own_the_lock())
>>> lock.locked()
True
>>> task.cancel()
>>> lock.locked()
False
property owner: tuple[str, int] | None

The current identifier of the task that owns the lock, or None if no one owns the lock.

It is not reliable during release, as it may temporarily be the identifier of a task that has cancelled the async_acquire() or green_acquire() call (e.g., due to a timeout).

property waiting: int

The current number of tasks waiting to own.

It represents the length of the waiting queue and thus changes immediately.

class aiologic.RLock

Bases: Lock

static __new__(cls, /) Self

__getnewargs__() tuple[Any, ...]

Returns arguments that can be used to create new instances with the same initial values.

Used by:

  • The pickle module for pickling.

  • The copy module for copying.

The current state does not affect the arguments.

Example

>>> orig = RLock()
>>> copy = RLock(*orig.__getnewargs__())
__getstate__() None

Disables the use of internal state for pickling and copying.

__copy__() Self

__repr__() str

__bool__() bool

Returns True if the lock is used by any task.

Used by the standard truth testing procedure.

Example

>>> writing = RLock()
>>> bool(writing)
False
>>> with writing:  # lock is in use
...     bool(writing)
True
>>> bool(writing)
False
async __aenter__() Self

__enter__() Self

async __aexit__(exc_type: type[BaseException] | None, exc_value: BaseException | None, traceback: TracebackType | None) None

__exit__(exc_type: type[BaseException] | None, exc_value: BaseException | None, traceback: TracebackType | None) None

async async_acquire(count: int = 1, *, blocking: bool = True, timeout: float | None = None) bool

green_acquire(count: int = 1, *, blocking: bool = True, timeout: float | None = None) bool

async_release(count: int = 1) None

green_release(count: int = 1) None

async_owned() bool

Return True if the current async task owns the lock.

Unlike the owner property, always reliable.

Example

>>> lock = RLock()
>>> lock.async_owned()
False
>>> async with lock:
...     lock.async_owned()
True
>>> lock.async_owned()
False
green_owned() bool

Return True if the current green task owns the lock.

Unlike the owner property, always reliable.

Example

>>> lock = RLock()
>>> lock.green_owned()
False
>>> with lock:
...     lock.green_owned()
True
>>> lock.green_owned()
False
async_count() int

Return the recursion level of the current async task.

Unlike the count property, always reliable.

Example

>>> lock = RLock()
>>> lock.async_count()
0
>>> async with lock:
...     lock.async_count()
1
>>> lock.async_count()
0
green_count() int

Return the recursion level of the current green task.

Unlike the count property, always reliable.

Example

>>> lock = RLock()
>>> lock.green_count()
0
>>> with lock:
...     lock.green_count()
1
>>> lock.green_count()
0
locked() bool

Return True if anyone owns the lock.

Example

>>> import asyncio
>>> async def own_the_lock():
...     async with lock:
...         await asyncio.sleep(3600)
>>> lock = RLock()
>>> lock.locked()
False
>>> task = asyncio.create_task(own_the_lock())
>>> lock.locked()
True
>>> task.cancel()
>>> lock.locked()
False
property owner: tuple[str, int] | None

The current identifier of the task that owns the lock, or None if no one owns the lock.

It is not reliable during release, as it may temporarily be the identifier of a task that has cancelled the async_acquire() or green_acquire() call (e.g., due to a timeout).

property count: int

The current recursion level of the task that owns the lock, or 0 if no one owns the lock.

It is not reliable during release, as it may temporarily be the recursion level of a task that has cancelled the async_acquire() or green_acquire() call (e.g., due to a timeout).

property waiting: int

The current number of tasks waiting to own.

It represents the length of the waiting queue and thus changes immediately.

aiologic.synchronized(wrapped: _AALock, /) _AASynchronizer
aiologic.synchronized(wrapped: _ASLock, /) _ASSynchronizer
aiologic.synchronized(wrapped: _SSLock, /) _SSSynchronizer
aiologic.synchronized(wrapped: _MMLock, /) _MMSynchronizer
aiologic.synchronized(wrapped: _SynchronizedType[_LockT], /) _SynchronizedDecorator
aiologic.synchronized(wrapped: _CallableT, /) _CallableT
aiologic.synchronized(wrapped: object, /) _SynchronizedDecorator

Condition variables

class aiologic.Condition

Bases: Generic[_T_co, _S_co]

static __new__(cls, /, lock: DefaultType = DEFAULT, timer: DefaultType = DEFAULT) Condition[RLock, Callable[[], int]]
static __new__(cls, /, lock: DefaultType = DEFAULT, *, timer: _S_co) Condition[RLock, _S_co]
static __new__(cls, /, lock: _T_co, timer: DefaultType = DEFAULT) Condition[_T_co, Callable[[], int]]
static __new__(cls, /, lock: _T_co, timer: _S_co) Self

__getnewargs__() tuple[Any, ...]

Returns arguments that can be used to create new instances with the same initial values.

Used by:

  • The pickle module for pickling.

  • The copy module for copying.

The current state does not affect the arguments.

Example

>>> orig = Condition()
>>> copy = Condition(*orig.__getnewargs__())
>>> copy.lock is orig.lock
True
__getstate__() None

Disables the use of internal state for pickling and copying.

__copy__() Self

__repr__() str

__bool__() bool

Returns True if the underlying lock is used by any task.

If there is no lock, returns False.

Used by the standard truth testing procedure.

Example

>>> accessing = Condition()
>>> bool(accessing)
False
>>> with accessing:  # condition variable is in use
...     bool(accessing)
True
>>> bool(accessing)
False
async __aenter__() Self

__enter__() Self

async __aexit__(exc_type: type[BaseException] | None, exc_value: BaseException | None, traceback: TracebackType | None) None

__exit__(exc_type: type[BaseException] | None, exc_value: BaseException | None, traceback: TracebackType | None) None

__await__(timeout: float | None = None) Generator[Any, Any, bool]

async with_(timeout: float | None = None) bool

wait(timeout: float | None = None) bool

async for_(predicate: Callable[[], _T], timeout: float | None = None, *, delegate: bool = True) _T

wait_for(predicate: Callable[[], _T], timeout: float | None = None, *, delegate: bool = True) _T

notify(count: int = 1, *, deadline: float | None = None) int

notify_all(*, deadline: float | None = None) int

property lock: _T_co

The underlying lock used by the condition variable.

property timer: _S_co

The callable object used by the condition variable.

property waiting: int

The current number of tasks waiting to be notified.

It represents the length of the waiting queue and thus changes immediately.

Communication primitives

Queues

class aiologic.SimpleQueue

Bases: Generic[_T]

static __new__(cls, items: Iterable[_T] | MissingType = MISSING, /) Self

__getnewargs__() tuple[Any, ...]

Returns arguments that can be used to create new instances with the same state.

Used by:

  • The pickle module for pickling.

  • The copy module for copying.

The current state affects the arguments.

Example

>>> orig = SimpleQueue('items')
>>> orig.green_get()
'i'
>>> copy = SimpleQueue(*orig.__getnewargs__())
>>> copy.green_get()
't'
__getstate__() None

Disables the use of internal state for pickling and copying.

__copy__() Self

__repr__() str

__bool__() bool

Returns True if the queue is not empty.

Used by the standard truth testing procedure.

Example

>>> items = SimpleQueue()  # queue is empty
>>> bool(items)
False
>>> items.green_put('spam')  # queue is not empty
>>> bool(items)
True
>>> item = items.green_get()  # queue is empty
>>> bool(items)
False
__len__() int

Returns the number of items in the queue.

Used by the built-in function len().

Example

>>> items = SimpleQueue()  # queue has no items
>>> len(items)
0
>>> items.green_put('spam')  # queue has one item
>>> len(items)
1
>>> item = items.green_get()  # queue has no items
>>> len(items)
0
copy() Self

put(item: _T) None

async async_put(item: _T, *, blocking: bool = True, timeout: float | None = None) None

green_put(item: _T, *, blocking: bool = True, timeout: float | None = None) None

async async_get(*, blocking: bool = True, timeout: float | None = None) _T

green_get(*, blocking: bool = True, timeout: float | None = None) _T

property putting: int

The current number of tasks waiting to put.

It is always 0 for simple queues.

property getting: int

The current number of tasks waiting to get.

It represents the length of the waiting queue and thus changes immediately.

property waiting: int

The current number of tasks waiting to access.

It is the same as the getting property.

class aiologic.SimpleLifoQueue

Bases: SimpleQueue[_T]

static __new__(cls, items: Iterable[_T] | MissingType = MISSING, /) Self

__getnewargs__() tuple[Any, ...]

Returns arguments that can be used to create new instances with the same state.

Used by:

  • The pickle module for pickling.

  • The copy module for copying.

The current state affects the arguments.

Example

>>> orig = SimpleLifoQueue('items')
>>> orig.green_get()
's'
>>> copy = SimpleLifoQueue(*orig.__getnewargs__())
>>> copy.green_get()
'm'
__getstate__() None

Disables the use of internal state for pickling and copying.

__copy__() Self

__repr__() str

__bool__() bool

Returns True if the queue is not empty.

Used by the standard truth testing procedure.

Example

>>> items = SimpleLifoQueue()  # queue is empty
>>> bool(items)
False
>>> items.green_put('spam')  # queue is not empty
>>> bool(items)
True
>>> item = items.green_get()  # queue is empty
>>> bool(items)
False
__len__() int

Returns the number of items in the queue.

Used by the built-in function len().

Example

>>> items = SimpleLifoQueue()  # queue has no items
>>> len(items)
0
>>> items.green_put('spam')  # queue has one item
>>> len(items)
1
>>> item = items.green_get()  # queue has no items
>>> len(items)
0
copy() Self

put(item: _T) None

async async_put(item: _T, *, blocking: bool = True, timeout: float | None = None) None

green_put(item: _T, *, blocking: bool = True, timeout: float | None = None) None

async async_get(*, blocking: bool = True, timeout: float | None = None) _T

green_get(*, blocking: bool = True, timeout: float | None = None) _T

property putting: int

The current number of tasks waiting to put.

It is always 0 for simple queues.

property getting: int

The current number of tasks waiting to get.

It represents the length of the waiting queue and thus changes immediately.

property waiting: int

The current number of tasks waiting to access.

It is the same as the getting property.

class aiologic.Queue

Bases: Generic[_T]

static __new__(cls, /, maxsize: int | None = None) Self
static __new__(cls, items: Iterable[_T] | MissingType = MISSING, /, maxsize: int | None = None) Self

__getnewargs__() tuple[Any, ...]

Returns arguments that can be used to create new instances with the same state.

Used by:

  • The pickle module for pickling.

  • The copy module for copying.

The current state affects the arguments.

Example

>>> orig = Queue('items')
>>> orig.green_get()
'i'
>>> copy = Queue(*orig.__getnewargs__())
>>> copy.green_get()
't'
__getstate__() None

Disables the use of internal state for pickling and copying.

__copy__() Self

__repr__() str

__bool__() bool

Returns True if the queue is not empty.

Used by the standard truth testing procedure.

Example

>>> items = Queue()  # queue is empty
>>> bool(items)
False
>>> items.green_put('spam')  # queue is not empty
>>> bool(items)
True
>>> item = items.green_get()  # queue is empty
>>> bool(items)
False
__len__() int

Returns the number of items in the queue.

Used by the built-in function len().

Example

>>> items = Queue()  # queue has no items
>>> len(items)
0
>>> items.green_put('spam')  # queue has one item
>>> len(items)
1
>>> item = items.green_get()  # queue has no items
>>> len(items)
0
copy() Self

async async_put(item: _T, *, blocking: bool = True, timeout: float | None = None) None

green_put(item: _T, *, blocking: bool = True, timeout: float | None = None) None

async async_get(*, blocking: bool = True, timeout: float | None = None) _T

green_get(*, blocking: bool = True, timeout: float | None = None) _T

property maxsize: int

The maximum number of items which the queue can hold.

property putting: int

The current number of tasks waiting to put.

It represents the length of the waiting queue and thus changes immediately.

property getting: int

The current number of tasks waiting to get.

It represents the length of the waiting queue and thus changes immediately.

property waiting: int

The current number of tasks waiting to access.

It is roughly equivalent to the sum of the putting and getting properties, but is more reliable than the sum in a multithreaded environment.

class aiologic.LifoQueue

Bases: Queue[_T]

static __new__(cls, /, maxsize: int | None = None) Self
static __new__(cls, items: Iterable[_T] | MissingType = MISSING, /, maxsize: int | None = None) Self

__getnewargs__() tuple[Any, ...]

Returns arguments that can be used to create new instances with the same state.

Used by:

  • The pickle module for pickling.

  • The copy module for copying.

The current state affects the arguments.

Example

>>> orig = LifoQueue('items')
>>> orig.green_get()
's'
>>> copy = LifoQueue(*orig.__getnewargs__())
>>> copy.green_get()
'm'
__getstate__() None

Disables the use of internal state for pickling and copying.

__copy__() Self

__repr__() str

__bool__() bool

Returns True if the queue is not empty.

Used by the standard truth testing procedure.

Example

>>> items = LifoQueue()  # queue is empty
>>> bool(items)
False
>>> items.green_put('spam')  # queue is not empty
>>> bool(items)
True
>>> item = items.green_get()  # queue is empty
>>> bool(items)
False
__len__() bool

Returns the number of items in the queue.

Used by the built-in function len().

Example

>>> items = LifoQueue()  # queue has no items
>>> len(items)
0
>>> items.green_put('spam')  # queue has one item
>>> len(items)
1
>>> item = items.green_get()  # queue has no items
>>> len(items)
0
copy() Self

async async_put(item: _T, *, blocking: bool = True, timeout: float | None = None) None

green_put(item: _T, *, blocking: bool = True, timeout: float | None = None) None

async async_get(*, blocking: bool = True, timeout: float | None = None) _T

green_get(*, blocking: bool = True, timeout: float | None = None) _T

property maxsize: int

The maximum number of items which the queue can hold.

property putting: int

The current number of tasks waiting to put.

It represents the length of the waiting queue and thus changes immediately.

property getting: int

The current number of tasks waiting to get.

It represents the length of the waiting queue and thus changes immediately.

property waiting: int

The current number of tasks waiting to access.

It is roughly equivalent to the sum of the putting and getting properties, but is more reliable than the sum in a multithreaded environment.

class aiologic.PriorityQueue

Bases: Queue[_RichComparableT]

static __new__(cls, /, maxsize: int | None = None) Self
static __new__(cls, items: Iterable[_RichComparableT] | MissingType = MISSING, /, maxsize: int | None = None) Self

__getnewargs__() tuple[Any, ...]

Returns arguments that can be used to create new instances with the same state.

Used by:

  • The pickle module for pickling.

  • The copy module for copying.

The current state affects the arguments.

Example

>>> orig = PriorityQueue('items')
>>> orig.green_get()
'e'
>>> copy = PriorityQueue(*orig.__getnewargs__())
>>> copy.green_get()
'i'
__getstate__() None

Disables the use of internal state for pickling and copying.

__copy__() Self

__repr__() str

__bool__() bool

Returns True if the queue is not empty.

Used by the standard truth testing procedure.

Example

>>> items = PriorityQueue()  # queue is empty
>>> bool(items)
False
>>> items.green_put('spam')  # queue is not empty
>>> bool(items)
True
>>> item = items.green_get()  # queue is empty
>>> bool(items)
False
__len__() bool

Returns the number of items in the queue.

Used by the built-in function len().

Example

>>> items = PriorityQueue()  # queue has no items
>>> len(items)
0
>>> items.green_put('spam')  # queue has one item
>>> len(items)
1
>>> item = items.green_get()  # queue has no items
>>> len(items)
0
copy() Self

async async_put(item: _RichComparableT, *, blocking: bool = True, timeout: float | None = None) None

green_put(item: _RichComparableT, *, blocking: bool = True, timeout: float | None = None) None

async async_get(*, blocking: bool = True, timeout: float | None = None) _RichComparableT

green_get(*, blocking: bool = True, timeout: float | None = None) _RichComparableT

property maxsize: int

The maximum number of items which the queue can hold.

property putting: int

The current number of tasks waiting to put.

It represents the length of the waiting queue and thus changes immediately.

property getting: int

The current number of tasks waiting to get.

It represents the length of the waiting queue and thus changes immediately.

property waiting: int

The current number of tasks waiting to access.

It is roughly equivalent to the sum of the putting and getting properties, but is more reliable than the sum in a multithreaded environment.

exception aiologic.QueueEmpty

Bases: Exception

exception aiologic.QueueFull

Bases: Exception

Non-blocking primitives

Flags

class aiologic.Flag

Bases: Generic[_T]

static __new__(cls, /, marker: _T | MissingType = MISSING) Self

__getnewargs__() tuple[Any, ...]

Returns arguments that can be used to create new instances with the same state.

Used by:

  • The pickle module for pickling.

  • The copy module for copying.

The current state affects the arguments.

Example

>>> orig = Flag()
>>> orig.set('value')  # change the state
>>> orig.get()
'value'
>>> copy = Flag(*orig.__getnewargs__())
>>> copy.get()
'value'
__getstate__() None

Disables the use of internal state for pickling and copying.

__copy__() Self

__repr__() str

__bool__() bool

Returns True if the flag is set.

Used by the standard truth testing procedure.

Example

>>> cancelled = Flag()  # flag is unset
>>> bool(cancelled)
False
>>> cancelled.set()  # flag is set
>>> bool(cancelled)
True
copy() Self

get(default: _T | MissingType = MISSING, *, default_factory: MissingType = MISSING) _T
get(default: _D, *, default_factory: MissingType = MISSING) _T | _D
get(default: MissingType = MISSING, *, default_factory: Callable[[], _T]) _T
get(default: MissingType = MISSING, *, default_factory: Callable[[], _D]) _T | _D

set(marker: MissingType = MISSING) bool
set(marker: _T) bool

clear() None

Resource guards

class aiologic.ResourceGuard

Bases: object

static __new__(cls, _: DefaultType = DEFAULT, /, action: str | DefaultType = DEFAULT) Self

__getnewargs__() tuple[Any, ...]

Returns arguments that can be used to create new instances with the same initial values.

Used by:

  • The pickle module for pickling.

  • The copy module for copying.

The current state does not affect the arguments.

Example

>>> orig = ResourceGuard(action='waiting')
>>> orig.action
'waiting'
>>> copy = ResourceGuard(*orig.__getnewargs__())
>>> copy.action
'waiting'
__getstate__() None

Disables the use of internal state for pickling and copying.

__copy__() Self

__repr__() str

__bool__() bool

Returns True if the resource guard is used by any task.

Used by the standard truth testing procedure.

Example

>>> using = ResourceGuard()
>>> bool(using)
False
>>> with using:  # resource guard is in use
...     bool(using)
True
>>> bool(using)
False
__enter__() Self

__exit__(exc_type: type[BaseException] | None, exc_value: BaseException | None, traceback: TracebackType | None) None

property action: str

The action to guard against.

exception aiologic.BusyResourceError

Bases: RuntimeError

Low-level primitives

Waiters (low-level)

class aiologic.lowlevel.Waiter

Bases: Protocol

The simplest synchronization primitive.

It represents a blocking call that can either be completed from outside by a task/thread or be cancelled (due to a timeout or an exception). One wait is one instance.

Unlike all other primitives, it is bound to the current event loop upon creation.

wake() None

Reschedule (resume/notify) the task that is blocked.

It can be called multiple times and in parallel: all calls are expected to be serialized by the event loop (if any). If no task is blocked on the primitive, it has no effect.

Note that while redundant calls are allowed, they can lead to excessive load on the event loop in the form of callback flood. If your scenario involves either multiple notifiers or premature task rescheduling (e.g., due to a timeout), consider using events instead of waiters.

class aiologic.lowlevel.AsyncWaiter

Bases: Waiter, Protocol

The return type of create_async_waiter().

__await__(timeout: float | None = None) Generator[Any, Any, bool]

Block (put to sleep) the task until wake() is called from any thread, and then return True.

It must be called exactly once (or never) during the object’s lifetime, even if the first call was cancelled due to a timeout. Otherwise, the behavior is undefined.

Used by the await expressions.

Parameters:

timeout – If set to a non-negative number, the method will block at most timeout seconds and return False if there were no calls to wake() within that time. For zero: if no such calls were serialized.

async with_(timeout: float | None = None) bool

Block (put to sleep) the task until wake() is called from any thread, and then return True.

It must be called exactly once (or never) during the object’s lifetime, even if the first call was cancelled due to a timeout. Otherwise, the behavior is undefined.

Parameters:

timeout – If set to a non-negative number, the method will block at most timeout seconds and return False if there were no calls to wake() within that time. For zero: if no such calls were serialized.

wake() None

Reschedule (resume/notify) the task that is blocked.

It can be called multiple times and in parallel: all calls are expected to be serialized by the event loop (if any). If no task is blocked on the primitive, it has no effect.

Note that while redundant calls are allowed, they can lead to excessive load on the event loop in the form of callback flood. If your scenario involves either multiple notifiers or premature task rescheduling (e.g., due to a timeout), consider using events instead of waiters.

property shield: bool

A boolean that is True if the __await__()/with_() call will be shielded from external cancellation, False otherwise.

The effect is mostly equivalent to applying the shield() universal decorator to the primitive/method, but it is more efficient. Also, any non-negative timeout passed to the method will be ignored.

It can be rewritten, but any changes will only take effect until the call.

class aiologic.lowlevel.GreenWaiter

Bases: Waiter, Protocol

The return type of create_green_waiter().

wait(timeout: float | None = None) bool

Block (put to sleep) the task until wake() is called from any thread, and then return True.

It must be called exactly once (or never) during the object’s lifetime, even if the first call was cancelled due to a timeout. Otherwise, the behavior is undefined.

Parameters:

timeout – If set to a non-negative number, the method will block at most timeout seconds and return False if there were no calls to wake() within that time. For zero: if no such calls were serialized.

wake() None

Reschedule (resume/notify) the task that is blocked.

It can be called multiple times and in parallel: all calls are expected to be serialized by the event loop (if any). If no task is blocked on the primitive, it has no effect.

Note that while redundant calls are allowed, they can lead to excessive load on the event loop in the form of callback flood. If your scenario involves either multiple notifiers or premature task rescheduling (e.g., due to a timeout), consider using events instead of waiters.

property shield: bool

A boolean that is True if the wait() call will be shielded from external cancellation, False otherwise.

The effect is mostly equivalent to applying the shield() universal decorator to the method, but it is more efficient. Also, any non-negative timeout passed to the method will be ignored.

It can be rewritten, but any changes will only take effect until the call.

aiologic.lowlevel.create_async_waiter(*, shield: bool = False) AsyncWaiter

Create a new instance for a blocking async call.

Parameters:

shield – See the shield property.

aiologic.lowlevel.create_green_waiter(*, shield: bool = False) GreenWaiter

Create a new instance for a blocking green call.

Parameters:

shield – See the shield property.

Events (low-level)

class aiologic.lowlevel.Event

Bases: Protocol

__bool__() bool

set() bool

is_set() bool

cancelled() bool

property shield: bool

property force: bool

class aiologic.lowlevel.AsyncEvent

Bases: ABC, Event

The return type of create_async_waiter().

abstractmethod __await__(timeout: float | None = None) Generator[Any, Any, bool]

Block (put to sleep) the task until set() is called from any thread, and then return True.

It must be called exactly once (or never) during the object’s lifetime, even if the first call was cancelled due to a timeout. Otherwise, a RuntimeError is raised.

If the event is already in the set state (and the method has not yet been called), this is equivalent to async_checkpoint().

Parameters:

timeout – If set to a non-negative number, the method will block at most timeout seconds and return False if there were no calls to set() within that time. For zero: if no such calls were serialized.

abstractmethod async with_(timeout: float | None = None) bool

Block (put to sleep) the task until set() is called from any thread, and then return True.

It must be called exactly once (or never) during the object’s lifetime, even if the first call was cancelled due to a timeout. Otherwise, a RuntimeError is raised.

If the event is already in the set state (and the method has not yet been called), this is equivalent to async_checkpoint().

Parameters:

timeout – If set to a non-negative number, the method will block at most timeout seconds and return False if there were no calls to set() within that time. For zero: if no such calls were serialized.

abstractmethod __bool__() bool

Return True if the set() method was successfully called, False otherwise. Mutually exclusive with cancelled().

It is reliable only after the task has been rescheduled (that is, primarily for checks on the task’s own side). Attempts at parallel checks from notifiers may return false values (situations where the set() method has already been called by someone else, but both is_set() and cancelled() return False; the same applies to races between the task and one notifier when the former is cancelled).

Used by the standard truth testing procedure.

abstractmethod set() bool

Put the event into the set state, and return True if this was the first successful attempt (no one preempted it, and the call to the __await__()/with_() method was not cancelled), False otherwise.

If the task is already blocked, it is rescheduled. Otherwise, the subsequent call will behave only as a checkpoint (no actual waiting).

abstractmethod is_set() bool

Return True if the set() method was successfully called, False otherwise. Mutually exclusive with cancelled().

It is reliable only after the task has been rescheduled (that is, primarily for checks on the task’s own side). Attempts at parallel checks from notifiers may return false values (situations where the set() method has already been called by someone else, but both is_set() and cancelled() return False; the same applies to races between the task and one notifier when the former is cancelled).

abstractmethod cancelled() bool

Return True if the call was cancelled (interrupted by a timeout or any exception), False otherwise. Mutually exclusive with is_set().

It is reliable only after the task has been rescheduled (that is, primarily for checks on the task’s own side). Attempts at parallel checks from notifiers may return false values (situations where the set() method has already been called by someone else, but both is_set() and cancelled() return False; the same applies to races between the task and one notifier when the former is cancelled).

abstract property shield: bool

A boolean that is True if the __await__()/with_() call will be shielded from external cancellation, False otherwise.

The effect is mostly equivalent to applying the shield() universal decorator to the primitive/method, but it is more efficient. Also, any non-negative timeout passed to the method will be ignored.

It can be rewritten, but any changes will only take effect until the call.

abstract property force: bool

A boolean that is True if the __await__()/with_() call will have a forced checkpoint (that is, the call is guaranteed to switch to the event loop and check for cancellation), False otherwise.

The effect is mostly equivalent to applying the enable_checkpoints() universal decorator to the primitive/method, but it is more efficient.

It can be rewritten, but any changes will only take effect until the call.

class aiologic.lowlevel.GreenEvent

Bases: ABC, Event

The return type of create_green_waiter().

abstractmethod wait(timeout: float | None = None) bool

Block (put to sleep) the task until set() is called from any thread, and then return True.

It must be called exactly once (or never) during the object’s lifetime, even if the first call was cancelled due to a timeout. Otherwise, a RuntimeError is raised.

If the event is already in the set state (and the method has not yet been called), this is equivalent to green_checkpoint().

Parameters:

timeout – If set to a non-negative number, the method will block at most timeout seconds and return False if there were no calls to set() within that time. For zero: if no such calls were serialized.

abstractmethod __bool__() bool

Return True if the set() method was successfully called, False otherwise. Mutually exclusive with cancelled().

It is reliable only after the task has been rescheduled (that is, primarily for checks on the task’s own side). Attempts at parallel checks from notifiers may return false values (situations where the set() method has already been called by someone else, but both is_set() and cancelled() return False; the same applies to races between the task and one notifier when the former is cancelled).

Used by the standard truth testing procedure.

abstractmethod set() bool

Put the event into the set state, and return True if this was the first successful attempt (no one preempted it, and the call to the wait() method was not cancelled), False otherwise.

If the task is already blocked, it is rescheduled. Otherwise, the subsequent call will behave only as a checkpoint (no actual waiting).

abstractmethod is_set() bool

Return True if the set() method was successfully called, False otherwise. Mutually exclusive with cancelled().

It is reliable only after the task has been rescheduled (that is, primarily for checks on the task’s own side). Attempts at parallel checks from notifiers may return false values (situations where the set() method has already been called by someone else, but both is_set() and cancelled() return False; the same applies to races between the task and one notifier when the former is cancelled).

abstractmethod cancelled() bool

Return True if the call was cancelled (interrupted by a timeout or any exception), False otherwise. Mutually exclusive with is_set().

It is reliable only after the task has been rescheduled (that is, primarily for checks on the task’s own side). Attempts at parallel checks from notifiers may return false values (situations where the set() method has already been called by someone else, but both is_set() and cancelled() return False; the same applies to races between the task and one notifier when the former is cancelled).

abstract property shield: bool

A boolean that is True if the wait() call will be shielded from external cancellation, False otherwise.

The effect is mostly equivalent to applying the shield() universal decorator to the method, but it is more efficient. Also, any non-negative timeout passed to the method will be ignored.

It can be rewritten, but any changes will only take effect until the call.

abstract property force: bool

A boolean that is True if the wait() call will have a forced checkpoint (that is, the call is guaranteed to switch to the event loop and check for cancellation), False otherwise.

The effect is mostly equivalent to applying the enable_checkpoints() universal decorator to the method, but it is more efficient.

It can be rewritten, but any changes will only take effect until the call.

final class aiologic.lowlevel.SetEvent

Bases: GreenEvent, AsyncEvent

The singleton type for SET_EVENT.

final class aiologic.lowlevel.DummyEvent

Bases: GreenEvent, AsyncEvent

The singleton type for DUMMY_EVENT.

final class aiologic.lowlevel.CancelledEvent

Bases: GreenEvent, AsyncEvent

The singleton type for CANCELLED_EVENT.

aiologic.lowlevel.SET_EVENT: SetEvent

The singleton instance representing an event that is always set.

aiologic.lowlevel.DUMMY_EVENT: DummyEvent

The same as SET_EVENT.

aiologic.lowlevel.CANCELLED_EVENT: CancelledEvent

The singleton instance representing an event that is always cancelled.

aiologic.lowlevel.create_async_event(*, locking: bool = False, shield: bool = False, force: bool = False) AsyncEvent

Create a new instance for a blocking async call (with state).

Parameters:
  • locking – If set to True, the instance will also include the slots of ThreadOnceLock, allowing the latter’s methods to be used with the returned object. This can be used when one-time synchronization on the event is required, but having a separate ThreadOnceLock instance is expensive in terms of memory.

  • shield – See the shield property.

  • force – See the force property.

aiologic.lowlevel.create_green_event(*, locking: bool = False, shield: bool = False, force: bool = False) GreenEvent

Create a new instance for a blocking green call (with state).

Parameters:
  • locking – If set to True, the instance will also include the slots of ThreadOnceLock, allowing the latter’s methods to be used with the returned object. This can be used when one-time synchronization on the event is required, but having a separate ThreadOnceLock instance is expensive in terms of memory.

  • shield – See the shield property.

  • force – See the force property.

Locks (low-level)

final class aiologic.lowlevel.ThreadLock

Bases: object

The return type of create_thread_lock().

final class aiologic.lowlevel.ThreadRLock

Bases: object

The return type of create_thread_rlock().

final class aiologic.lowlevel.ThreadOnceLock

Bases: object

The return type of create_thread_oncelock().

final class aiologic.lowlevel.ThreadDummyLock

Bases: object

A singleton class for THREAD_DUMMY_LOCK.

aiologic.lowlevel.THREAD_DUMMY_LOCK: ThreadDummyLock

A singleton object that mimics a reentrant lock but does nothing.

Can be used as a replacement for a once lock after release (to reduce memory usage).

aiologic.lowlevel.create_thread_lock() ThreadLock

Create a new instance of a primitive lock that blocks threads.

The same as threading.Lock, but not affected by monkey patching.

aiologic.lowlevel.create_thread_rlock() ThreadRLock

Create a new instance of a reentrant lock that blocks threads.

The same as threading.RLock, but not affected by monkey patching.

aiologic.lowlevel.create_thread_oncelock() ThreadOnceLock

Create a new instance of a once lock that mimics a reentrant lock but does nothing after release (when the counter reaches zero).

It wakes up all threads at once, thereby solving the square problem, which makes it suitable for creating thread-safe initialization (or any other one-time actions).

Unlike threading.RLock, it is signal-safe.

aiologic.lowlevel.once(wrapped: MissingType = MISSING, /, *, reentrant: bool = False) Callable[[Callable[[], _T]], Callable[[], _T]]
aiologic.lowlevel.once(wrapped: Callable[[], _T], /) Callable[[], _T]

Transform wrapped into a one-time function.

Blocks threads attempting to execute the function in parallel and wakes them up at once upon completion. The result is stored in the closure of the new function and is returned on each subsequent call.

Parameters:

reentrant – Unless set to True, recursive attempts to call the function will raise the RuntimeError exception. Also affects signal handlers and destructors.

Raises:

RuntimeError – if called recursively and reentrant=False.

Queues (low-level)

class aiologic.lowlevel.lazydeque

Bases: MutableSequence[_T]

class aiologic.lowlevel.lazyqueue

Bases: Generic[_T]

Advanced topics

Libraries

aiologic.lowlevel.current_async_library(*, failsafe: Literal[False] = False) str
aiologic.lowlevel.current_async_library(*, failsafe: Literal[True]) str | None

Detect which async library is currently running.

Parameters:

failsafe – Unless set to True, the function will raise an exception when there is no current async library. Otherwise the function returns None in that case.

Returns:

A string like "trio" or None.

Raises:

AsyncLibraryNotFoundError – if the current async library was not recognized.

Example

async def async_sleep(seconds: float) -> None:
    match library := aiologic.lowlevel.current_async_library():
        case "asyncio":
            await asyncio.sleep(seconds)
        case "curio":
            await curio.sleep(seconds)
        case "trio":
            await trio.sleep(seconds)
        case _:
            msg = f"unsupported async library {library!r}"
            raise RuntimeError(msg)
aiologic.lowlevel.current_green_library(*, failsafe: Literal[False] = False) str
aiologic.lowlevel.current_green_library(*, failsafe: Literal[True]) str | None

Detect which green library is currently running.

Parameters:

failsafe – Unless set to True, the function will raise an exception when there is no current green library. Otherwise the function returns None in that case.

Returns:

A string like "gevent" or None.

Raises:

GreenLibraryNotFoundError – if the current green library was not recognized.

Example

def green_sleep(seconds: float) -> None:
    match library := aiologic.lowlevel.current_green_library():
        case "threading":
            time.sleep(seconds)
        case "eventlet":
            eventlet.sleep(seconds)
        case "gevent":
            gevent.sleep(seconds)
        case _:
            msg = f"unsupported green library {library!r}"
            raise RuntimeError(msg)
aiologic.lowlevel.current_async_library_tlocal: threading.local

Thread-local data to control the return value of aiologic.lowlevel.current_async_library().

current_async_library_tlocal.name: str | None = None

Unless set to a non-None object, the function detects the current async library with its own algorithms. Otherwise the function returns exactly the set object.

Example

library = aiologic.lowlevel.current_async_library_tlocal.name

aiologic.lowlevel.current_async_library_tlocal.name = "someio"

try:
    ...  # aiologic.lowlevel.current_async_library() == "someio"
finally:
    aiologic.lowlevel.current_async_library_tlocal.name = library
aiologic.lowlevel.current_green_library_tlocal: threading.local

Thread-local data to control the return value of aiologic.lowlevel.current_green_library().

current_green_library_tlocal.name: str | None = None

Unless set to a non-None object, the function detects the current green library with its own algorithms. Otherwise the function returns exactly the set object.

Example

library = aiologic.lowlevel.current_green_library_tlocal.name

aiologic.lowlevel.current_green_library_tlocal.name = "somelet"

try:
    ...  # aiologic.lowlevel.current_green_library() == "somelet"
finally:
    aiologic.lowlevel.current_green_library_tlocal.name = library
exception aiologic.lowlevel.AsyncLibraryNotFoundError

Bases: RuntimeError

Exception raised by the aiologic.lowlevel.current_async_library() function if the current async library was not recognized.

exception aiologic.lowlevel.GreenLibraryNotFoundError

Bases: RuntimeError

Exception raised by the aiologic.lowlevel.current_green_library() function if the current green library was not recognized.

Execution units

aiologic.lowlevel.current_thread() threading.Thread

aiologic.lowlevel.current_thread_ident() int

aiologic.lowlevel.current_async_token() object

aiologic.lowlevel.current_green_token() object

aiologic.lowlevel.current_async_token_ident() tuple[str, int]

aiologic.lowlevel.current_green_token_ident() tuple[str, int]

aiologic.lowlevel.current_async_task() object

aiologic.lowlevel.current_green_task() object

aiologic.lowlevel.current_async_task_ident() tuple[str, int]

aiologic.lowlevel.current_green_task_ident() tuple[str, int]

Cancellation and timeouts

aiologic.lowlevel.async_clock() float

aiologic.lowlevel.green_clock() float

async aiologic.lowlevel.async_sleep(seconds: float, /) None

aiologic.lowlevel.green_sleep(seconds: float, /) None

async aiologic.lowlevel.async_sleep_until(deadline: float, /) None

aiologic.lowlevel.green_sleep_until(deadline: float, /) None

async aiologic.lowlevel.async_sleep_forever() NoReturn

aiologic.lowlevel.green_sleep_forever() NoReturn

aiologic.lowlevel.async_seconds_per_sleep() float

aiologic.lowlevel.green_seconds_per_sleep() float

aiologic.lowlevel.async_seconds_per_timeout() float

aiologic.lowlevel.green_seconds_per_timeout() float

aiologic.lowlevel.shield(wrapped: _AwaitableT, /) _AwaitableT
aiologic.lowlevel.shield(wrapped: _CallableT, /) _CallableT

Checkpoints and fairness

async aiologic.lowlevel.async_checkpoint(*, force: bool = False) None

A pure async checkpoint.

It checks for cancellation and allows the scheduler to switch to another task. In many ways, it is similar to async_sleep(0), but has the following differences:

  • It can have a more efficient implementation.

  • It can be enabled/disabled in the current context.

The latter distinguishes aiologic checkpoints from Trio/AnyIO checkpoints. You can control whether checkpoints are enabled or not in the following ways (in order of priority):

  • Set AIOLOGIC_ASYNC_CHECKPOINTS environment variable to any non-empty value. This will enable async checkpoints for all async libraries (in all threads). The empty value has the opposite effect.

  • Set AIOLOGIC_<ASYNC_LIBRARY>_CHECKPOINTS environment variable to any non-empty value. This will enable async checkpoints for the specified async library (in all threads). The empty value has the opposite effect.

  • Use enable_checkpoints()/disable_checkpoints() to control the state of checkpoints in the current context.

  • Pass force=True to force a checkpoint.

Note

High-level primitives (and some low-level ones) implement checkpoints for all blocking calls (regardless of whether waiting is required or not), and enabling/disabling checkpoints also affects them, so you can use checkpoints even without checkpoint functions if there are blocking calls (it is enough to explicitly enable checkpoints).

async with lock:  # an async checkpoint (if enabled)
    # ...exclusive access...

Furthermore, the library implements a “one blocking call, one checkpoint” concept, which gives a predictable number of context switches per call (zero or one if disabled, exactly one if enabled).

By default, async checkpoints are enabled for Trio only.

aiologic.lowlevel.green_checkpoint(*, force: bool = False) None

A pure green checkpoint.

It checks for cancellation and allows the scheduler to switch to another task. In many ways, it is similar to green_sleep(0), but has the following differences:

  • It can have a more efficient implementation.

  • It can be enabled/disabled in the current context.

The latter distinguishes aiologic checkpoints from Trio/AnyIO checkpoints. You can control whether checkpoints are enabled or not in the following ways (in order of priority):

  • Set AIOLOGIC_GREEN_CHECKPOINTS environment variable to any non-empty value. This will enable green checkpoints for all green libraries (in all threads). The empty value has the opposite effect.

  • Set AIOLOGIC_<GREEN_LIBRARY>_CHECKPOINTS environment variable to any non-empty value. This will enable green checkpoints for the specified green library (in all threads). The empty value has the opposite effect.

  • Use enable_checkpoints()/disable_checkpoints() to control the state of checkpoints in the current context.

  • Pass force=True to force a checkpoint.

Note

High-level primitives (and some low-level ones) implement checkpoints for all blocking calls (regardless of whether waiting is required or not), and enabling/disabling checkpoints also affects them, so you can use checkpoints even without checkpoint functions if there are blocking calls (it is enough to explicitly enable checkpoints).

with lock:  # a green checkpoint (if enabled)
    # ...exclusive access...

Furthermore, the library implements a “one blocking call, one checkpoint” concept, which gives a predictable number of context switches per call (zero or one if disabled, exactly one if enabled).

By default, green checkpoints are not enabled for any library.

async aiologic.lowlevel.async_checkpoint_if_cancelled(*, force: bool = False) None

Issue an async checkpoint if the calling context has been cancelled.

Used in conjunction with shield() to check for cancellation before shielding, and is not suitable for any other use case.

aiologic.lowlevel.green_checkpoint_if_cancelled(*, force: bool = False) None

Issue a green checkpoint if the calling context has been cancelled.

Used in conjunction with shield() to check for cancellation before shielding, and is not suitable for any other use case.

aiologic.lowlevel.async_checkpoint_enabled() bool

Return True if async checkpoints are enabled in the current context, False otherwise.

aiologic.lowlevel.green_checkpoint_enabled() bool

Return True if green checkpoints are enabled in the current context, False otherwise.

aiologic.lowlevel.enable_checkpoints(wrapped: MissingType = MISSING, /) _CheckpointsManager
aiologic.lowlevel.enable_checkpoints(wrapped: _AwaitableT, /) _AwaitableT
aiologic.lowlevel.enable_checkpoints(wrapped: _CallableT, /) _CallableT

Enable checkpoints in the current context.

If a callable object is passed, it is wrapped with a universal decorator and a callable proxy is returned. If an awaitable object is passed, it is also wrapped and an awaitable proxy is returned. If nothing is passed, a sync/async context manager is returned.

To distinguish between green and async functions, aiologic.meta.iscoroutinefactory() is used. Therefore, if you implement your own callable object that returns a coroutine object, consider using aiologic.meta.markcoroutinefactory().

Example

>>> async_checkpoint_enabled()
False
>>> async with enable_checkpoints():
...     async_checkpoint_enabled()
True
>>> async_checkpoint_enabled()
False
>>> async def test():
...     return async_checkpoint_enabled()
>>> await test()
False
>>> await enable_checkpoints(test)()  # for a callable object
True
>>> await enable_checkpoints(test())  # for an awaitable object
True
>>> await test()
False
aiologic.lowlevel.disable_checkpoints(wrapped: MissingType = MISSING, /) _NoCheckpointsManager
aiologic.lowlevel.disable_checkpoints(wrapped: _AwaitableT, /) _AwaitableT
aiologic.lowlevel.disable_checkpoints(wrapped: _CallableT, /) _CallableT

Disable checkpoints in the current context.

Like enable_checkpoints(), but with the opposite effect.

Example

>>> with enable_checkpoints():
...     green_checkpoint_enabled()
...     with disable_checkpoints():
...         green_checkpoint_enabled()
...     green_checkpoint_enabled()
True
False
True

Safety and reentrancy

aiologic.lowlevel.signal_safety_enabled() bool

Return True if signal-safety is enabled in the current context, False otherwise.

aiologic.lowlevel.enable_signal_safety(wrapped: MissingType = MISSING, /) _SignalSafetyManager
aiologic.lowlevel.enable_signal_safety(wrapped: _AwaitableT, /) _AwaitableT
aiologic.lowlevel.enable_signal_safety(wrapped: _CallableT, /) _CallableT

Enable signal-safety in the current context.

If a callable object is passed, it is wrapped with a universal decorator and a callable proxy is returned. If an awaitable object is passed, it is also wrapped and an awaitable proxy is returned. If nothing is passed, a sync/async context manager is returned.

To distinguish between green and async functions, aiologic.meta.iscoroutinefactory() is used. Therefore, if you implement your own callable object that returns a coroutine object, consider using aiologic.meta.markcoroutinefactory().

Example

>>> signal_safety_enabled()
False
>>> async with enable_signal_safety():
...     signal_safety_enabled()
True
>>> signal_safety_enabled()
False
>>> async def test():
...     return signal_safety_enabled()
>>> await test()
False
>>> await enable_signal_safety(test)()  # for a callable object
True
>>> await enable_signal_safety(test())  # for an awaitable object
True
>>> await test()
False
aiologic.lowlevel.disable_signal_safety(wrapped: MissingType = MISSING, /) _NoSignalSafetyManager
aiologic.lowlevel.disable_signal_safety(wrapped: _AwaitableT, /) _AwaitableT
aiologic.lowlevel.disable_signal_safety(wrapped: _CallableT, /) _CallableT

Disable signal-safety in the current context.

Like enable_signal_safety(), but with the opposite effect.

Example

>>> with enable_signal_safety():
...     signal_safety_enabled()
...     with disable_signal_safety():
...         signal_safety_enabled()
...     signal_safety_enabled()
True
False
True

Metaprogramming

Singletons and markers

class aiologic.meta.SingletonEnum

Bases: Enum

A base class for type-checker-friendly singleton classes whose instances will be defined at the module level.

Unlike enum.Enum, it prohibits setting attributes that are not explicitly declared via __slots__.

Example

>>> class SingletonType(SingletonEnum):
...     __slots__ = ('_x',)
...     SINGLETON = 'SINGLETON'
>>> SINGLETON = SingletonType.SINGLETON
>>> repr(SINGLETON) == f"{__name__}.SINGLETON"
True
>>> SINGLETON._x = 1  # ok
>>> SINGLETON._y = 2
Traceback (most recent call last):
AttributeError: 'SingletonType' object has no attribute '_y'
final class aiologic.meta.DefaultType

Bases: SingletonEnum

A singleton class for DEFAULT; mimics NoneType.

final class aiologic.meta.MissingType

Bases: SingletonEnum

A singleton class for MISSING; mimics NoneType.

aiologic.meta.DEFAULT: DefaultType

A singleton object for default values; mimics None.

Used as a marker to indicate that some object will be used by default (without any special behavior).

aiologic.meta.MISSING: MissingType

A singleton object for default values; mimics None.

Used as a marker to indicate that some special behavior will be used by default (cannot be achieved by passing any value).

Can also be used outside of parameters to indicate that there is no object.

Introspecting

aiologic.meta.lookup_static(owner: type, name: str, /) Any
aiologic.meta.lookup_static(owner: type, name: str, /, default: _T) Any | _T

aiologic.meta.resolve_special(owner: type, name: str, instance: None = None, /) Any
aiologic.meta.resolve_special(owner: type[_T1], name: str, instance: _T1, /) Any
aiologic.meta.resolve_special(owner: type, name: str, instance: None = None, /, *, default: _T2) Any | _T2
aiologic.meta.resolve_special(owner: type[_T1], name: str, instance: _T1, /, *, default: _T2) Any | _T2

aiologic.meta.isdatadescriptor_static(obj: object, /) bool

aiologic.meta.ismethoddescriptor_static(obj: object, /) bool

aiologic.meta.ismetaclass_static(obj: object, /) bool

aiologic.meta.isclass_static(obj: object, /) bool

aiologic.meta.issubclass_static(obj: object, class_or_tuple: type | tuple[_ClassInfo, ...], /) bool

aiologic.meta.isinstance_static(obj: object, class_or_tuple: type | tuple[_ClassInfo, ...], /) bool

aiologic.meta.isgeneratorlike(obj: object, /) TypeIs[Generator[Any, Any, Any]]

Return True if the object looks like a generator iterator, that is, implements collections.abc.Generator, and False otherwise.

Example

>>> from collections.abc import Generator
>>> class SimpleGenerator(Generator):
...     def send(self, value):
...         return super().send(value)
...     def throw(self, typ, val=None, tb=None):
...         return super().throw(typ, val, tb)
>>> def generator_function():
...     return
...     yield
>>> isgeneratorlike(object())
False
>>> isgeneratorlike(gen := generator_function())
True
>>> isgeneratorlike(SimpleGenerator())
True
aiologic.meta.iscoroutinelike(obj: object, /) TypeIs[Coroutine[Any, Any, Any]]

Return True if the object looks like a coroutine, that is, implements collections.abc.Coroutine, and False otherwise.

Example

>>> from collections.abc import Coroutine, Generator
>>> class SimpleCoroutine(Coroutine, Generator):
...     def __await__(self):
...         return self
...     def send(self, value):
...         return super().send(value)
...     def throw(self, typ, val=None, tb=None):
...         return super().throw(typ, val, tb)
>>> async def coroutine_function():
...     pass
>>> iscoroutinelike(object())
False
>>> iscoroutinelike(coro := coroutine_function())
True
>>> iscoroutinelike(SimpleCoroutine())
True
>>> await coro  # to avoid `RuntimeWarning`

Caution

Some objects, such as generator-based coroutines (see types.coroutine()), may not have the __await__() method but still behave like coroutine objects. They are also treated as coroutine-like objects. So if you want to get an iterator for such an object, consider using await_for(obj).__await__().

aiologic.meta.isasyncgenlike(obj: object, /) TypeIs[AsyncGenerator[Any, Any]]

Return True if the object looks like an asynchronous generator iterator, that is, implements collections.abc.AsyncGenerator, and False otherwise.

Example

>>> from collections.abc import AsyncGenerator
>>> class SimpleAsyncGenerator(AsyncGenerator):
...     async def asend(self, value):
...         return await super().send(value)
...     async def athrow(self, typ, val=None, tb=None):
...         return await super().throw(typ, val, tb)
>>> async def asyncgen_function():
...     return
...     yield
>>> isasyncgenlike(object())
False
>>> isasyncgenlike(asyncgen := asyncgen_function())
True
>>> isasyncgenlike(SimpleAsyncGenerator())
True
aiologic.meta.isgeneratorfactory(obj: Callable[[...], Generator[Any, Any, Any]], /) bool
aiologic.meta.isgeneratorfactory(obj: Callable[[_P], Awaitable[_T]], /) TypeGuard[Callable[[_P], Generator[Any, Any, _T]]]
aiologic.meta.isgeneratorfactory(obj: Callable[[_P], object], /) TypeGuard[Callable[[_P], Generator[Any, Any, Any]]]
aiologic.meta.isgeneratorfactory(obj: object, /) TypeGuard[Callable[[...], Generator[Any, Any, Any]]]

Return True if the object returns a generator iterator when called, False otherwise.

The following objects are treated as generator factories by default:

  1. A generator function. This is true for both user-defined functions and some compiled functions that look like such functions (for example, functions compiled via Cython).

  2. A generator type (a class whose instances look like generator iterators; see isgeneratorlike()).

  3. An object manually marked with markgeneratorfactory().

Example

>>> from collections.abc import Generator
>>> class SimpleGenerator(Generator):
...     def send(self, value):
...         return super().send(value)
...     def throw(self, typ, val=None, tb=None):
...         return super().throw(typ, val, tb)
>>> def generator_function():
...     return
...     yield
>>> isgeneratorfactory(lambda: None)
False
>>> isgeneratorfactory(generator_function)
True
>>> isgeneratorfactory(SimpleGenerator)
True

For all others, to determine whether an object is a generator factory, a recursive algorithm is used that handles at least the following cases:

  1. If it is a function defined by functools.partialmethod for some object, the latter is checked.

  2. If it is a partial object (an instance of functools.partial()), the object it wraps (functools.partial.func) is checked.

  3. If it is a bound method, the corresponding object (method.__func__) is checked.

  4. If it is a callable object, its __call__() method is checked.

Example

>>> from functools import partial, partialmethod
>>> class CustomGeneratorCallable:
...     def __call__(self):
...         return
...         yield
...     get = partialmethod(__call__)
>>> class ComplexGeneratorCallable:
...     __call__ = CustomGeneratorCallable()
>>> isgeneratorfactory(CustomGeneratorCallable())
True
>>> isgeneratorfactory(CustomGeneratorCallable().get)
True
>>> isgeneratorfactory(CustomGeneratorCallable().__call__)
True
>>> isgeneratorfactory(partial(CustomGeneratorCallable()))
True
>>> isgeneratorfactory(ComplexGeneratorCallable())
True
aiologic.meta.iscoroutinefactory(obj: Callable[[...], Coroutine[Any, Any, Any]], /) bool
aiologic.meta.iscoroutinefactory(obj: Callable[[_P], Awaitable[_T]], /) TypeGuard[Callable[[_P], Coroutine[Any, Any, _T]]]
aiologic.meta.iscoroutinefactory(obj: Callable[[_P], object], /) TypeGuard[Callable[[_P], Coroutine[Any, Any, Any]]]
aiologic.meta.iscoroutinefactory(obj: object, /) TypeGuard[Callable[[...], Coroutine[Any, Any, Any]]]

Return True if the object returns a coroutine when called, False otherwise.

The following objects are treated as coroutine factories by default:

  1. A coroutine function (a function defined with an async def syntax). This is true for both user-defined functions and some compiled functions that look like such functions (for example, functions compiled via Cython).

  2. A coroutine type (a class whose instances look like coroutines; see iscoroutinelike()).

  3. A generator-based coroutine function marked with asyncio.coroutine() or the corresponding standard marker (on Python <3.12).

  4. An object manually marked with inspect.markcoroutinefunction() or the corresponding standard marker (on Python ≥3.12).

  5. An object manually marked with markcoroutinefactory().

Example

>>> from collections.abc import Coroutine, Generator
>>> class SimpleCoroutine(Coroutine, Generator):
...     def __await__(self):
...         return self
...     def send(self, value):
...         return super().send(value)
...     def throw(self, typ, val=None, tb=None):
...         return super().throw(typ, val, tb)
>>> async def coroutine_function():
...     pass
>>> iscoroutinefactory(lambda: None)
False
>>> iscoroutinefactory(coroutine_function)
True
>>> iscoroutinefactory(SimpleCoroutine)
True

For all others, to determine whether an object is a coroutine factory, a recursive algorithm is used that handles at least the following cases:

  1. If it is a function defined by functools.partialmethod for some object, the latter is checked.

  2. If it is a partial object (an instance of functools.partial()), the object it wraps (functools.partial.func) is checked.

  3. If it is a bound method, the corresponding object (method.__func__) is checked.

  4. If it is a callable object, its __call__() method is checked.

Example

>>> from functools import partial, partialmethod
>>> class CustomCoroutineCallable:
...     async def __call__(self):
...         pass
...     get = partialmethod(__call__)
>>> class ComplexCoroutineCallable:
...     __call__ = CustomCoroutineCallable()
>>> iscoroutinefactory(CustomCoroutineCallable())
True
>>> iscoroutinefactory(CustomCoroutineCallable().get)
True
>>> iscoroutinefactory(CustomCoroutineCallable().__call__)
True
>>> iscoroutinefactory(partial(CustomCoroutineCallable()))
True
>>> iscoroutinefactory(ComplexCoroutineCallable())
True
aiologic.meta.isasyncgenfactory(obj: Callable[[...], AsyncGenerator[Any, Any]], /) bool
aiologic.meta.isasyncgenfactory(obj: Callable[[_P], object], /) TypeGuard[Callable[[_P], AsyncGenerator[Any, Any]]]
aiologic.meta.isasyncgenfactory(obj: object, /) TypeGuard[Callable[[...], AsyncGenerator[Any, Any]]]

Return True if the object returns an asynchronous generator iterator when called, False otherwise.

The following objects are treated as asynchronous generator factories by default:

  1. An asynchronous generator function. This is true for both user-defined functions and some compiled functions that look like such functions (for example, functions compiled via Cython).

  2. An asynchronous generator type (a class whose instances look like asynchronous generator iterators; see isasyncgenlike()).

  3. An object manually marked with markasyncgenfactory().

Example

>>> from collections.abc import AsyncGenerator
>>> class SimpleAsyncGenerator(AsyncGenerator):
...     async def asend(self, value):
...         return await super().send(value)
...     async def athrow(self, typ, val=None, tb=None):
...         return await super().throw(typ, val, tb)
>>> async def asyncgen_function():
...     return
...     yield
>>> isasyncgenfactory(lambda: None)
False
>>> isasyncgenfactory(asyncgen_function)
True
>>> isasyncgenfactory(SimpleAsyncGenerator)
True

For all others, to determine whether an object is an asynchronous generator factory, a recursive algorithm is used that handles at least the following cases:

  1. If it is a function defined by functools.partialmethod for some object, the latter is checked.

  2. If it is a partial object (an instance of functools.partial()), the object it wraps (functools.partial.func) is checked.

  3. If it is a bound method, the corresponding object (method.__func__) is checked.

  4. If it is a callable object, its __call__() method is checked.

Example

>>> from functools import partial, partialmethod
>>> class CustomAsyncGeneratorCallable:
...     async def __call__(self):
...         return
...         yield
...     get = partialmethod(__call__)
>>> class ComplexAsyncGeneratorCallable:
...     __call__ = CustomAsyncGeneratorCallable()
>>> isasyncgenfactory(CustomAsyncGeneratorCallable())
True
>>> isasyncgenfactory(CustomAsyncGeneratorCallable().get)
True
>>> isasyncgenfactory(CustomAsyncGeneratorCallable().__call__)
True
>>> isasyncgenfactory(partial(CustomAsyncGeneratorCallable()))
True
>>> isasyncgenfactory(ComplexAsyncGeneratorCallable())
True
aiologic.meta.markgeneratorfactory(factory: _CallableT, /) _CallableT

aiologic.meta.markcoroutinefactory(factory: _CallableT, /) _CallableT

aiologic.meta.markasyncgenfactory(factory: _CallableT, /) _CallableT

Transformers

aiologic.meta.generator(func: Callable[[_P], Generator[_YieldT, _SendT, _ReturnT]], /) Callable[[_P], Generator[_YieldT, _SendT, _ReturnT]]
aiologic.meta.generator(func: Callable[[_P], Coroutine[_YieldT, _SendT, _ReturnT]], /) Callable[[_P], Generator[_YieldT, _SendT, _ReturnT]]
aiologic.meta.generator(func: Callable[[_P], Awaitable[_T]], /) Callable[[_P], Generator[Any, Any, _T]]
aiologic.meta.generator(func: Callable[[_P], object], /) Callable[[_P], Generator[Any, Any, Any]]
aiologic.meta.generator(func: object, /) Callable[[...], Generator[Any, Any, Any]]

aiologic.meta.coroutine(func: Callable[[_P], Generator[_YieldT, _SendT, _ReturnT]], /) Callable[[_P], Coroutine[_YieldT, _SendT, _ReturnT]]
aiologic.meta.coroutine(func: Callable[[_P], Coroutine[_YieldT, _SendT, _ReturnT]], /) Callable[[_P], Coroutine[_YieldT, _SendT, _ReturnT]]
aiologic.meta.coroutine(func: Callable[[_P], Awaitable[_T]], /) Callable[[_P], Coroutine[Any, Any, _T]]
aiologic.meta.coroutine(func: Callable[[_P], object], /) Callable[[_P], Coroutine[Any, Any, Any]]
aiologic.meta.coroutine(func: object, /) Callable[[...], Coroutine[Any, Any, Any]]

Signatures

aiologic.meta.getsro(obj: object, /) Iterator[tuple[object, object | None, str]]

Functions

aiologic.meta.replaces(namespace: MutableMapping[str, Any], replacer: MissingType = MISSING, /) Callable[[_NamedCallableT], _NamedCallableT]
aiologic.meta.replaces(namespace: MutableMapping[str, Any], replacer: _NamedCallableT, /) _NamedCallableT

Wrap and replace the function of the same name in namespace.

Unlike functools.wraps(), excludes the __wrapped__ attribute to avoid memory leaks in a multithreaded environment.

Used for global rebinding.

Raises:

LookupError – if there is no function of the same name in namespace.

Example

>>> def sketch():
...     return 'parrot'
>>> def replace_sketch():
...     @replaces(globals())
...     def sketch():
...         return 'ex-parrot'
>>> sketch()
'parrot'
>>> replace_sketch()
>>> sketch()
'ex-parrot'
aiologic.meta.replaces_when_imported(namespace: MutableMapping[str, Any], module_name: str, replacer: MissingType = MISSING, /) Callable[[_NamedCallableT], _NamedCallableT]
aiologic.meta.replaces_when_imported(namespace: MutableMapping[str, Any], module_name: str, replacer: _NamedCallableT, /) _NamedCallableT

Wrap and replace the function of the same name in namespace, but only when the specified module_name is imported.

It has the same effect as replaces() but is delayed: when called, it registers a post import hook bound to the replaced function, which fires when someone imports the target module, and only then does the effect actually take place. Additionally, if namespace is a module namespace, the hook is deactivated when the module is reloaded (e.g., by importlib.reload()) to prevent incorrect replacements.

Used to implement support for optional dependencies.

Raises:

LookupError – if there is no function of the same name in namespace.

Example

>>> def in_asyncio_context():
...     return False
>>> @replaces_when_imported(globals(), 'asyncio')
... def in_asyncio_context():
...     import asyncio
...     @replaces(globals())
...     def in_asyncio_context():
...         return asyncio._get_running_loop() is not None
...     return in_asyncio_context()
aiologic.meta.copies(original: Callable[[_P], _T], replaced: MissingType = MISSING, /) Callable[[Callable[[_P], _T]], Callable[[_P], _T]]
aiologic.meta.copies(original: Callable[[_P], _T], replaced: Callable[[_P], _T], /) Callable[[_P], _T]

Replace with a copy of original if that is a user-defined function, and make the copy look like replaced function.

If original and replaced are the same function, it forces copying (copies(func, func) is always a new copy of func). Otherwise, does nothing on type checking to prevent type errors.

Used to optimize functions which delegate all the work to others.

Raises:

TypeError – if copying is forced and original is not a user-defined function.

Example

>>> def sig1():
...     return 'spam'
>>> @copies(sig1)
... def sig2():
...     return sig1()
>>> sig1() == sig2()
True
>>> sig1 is sig2
False
>>> sig1.__name__ == sig2.__name__
False
>>> sig1.__code__ is sig2.__code__
True

Modules

aiologic.meta.resolve_name(name: str, package: str | None) str

Resolve a relative module name to an absolute one.

Like importlib.util.resolve_name(), but raises ValueError instead of ImportError on Python ≥3.9 to achieve consistent behavior across all supported versions of Python.

Example

>>> resolve_name('x.y', 'a.b')  # an absolute one
'x.y'
>>> resolve_name('.x.y', 'a.b')
'a.b.x.y'
>>> resolve_name('..x.y', 'a.b')
'a.x.y'
>>> resolve_name('...x.y', 'a.b')
Traceback (most recent call last):
  ...
ValueError: `name` is beyond the top-level package
>>> resolve_name('.', 'a.b')
'a.b'
>>> resolve_name('..', 'a.b')
'a'
>>> resolve_name('...', 'a.b')
Traceback (most recent call last):
  ...
ValueError: `name` is beyond the top-level package

Imports

aiologic.meta.import_module(name: str, package: str | None = None) ModuleType

Import a module by name (absolute or relative to package).

Like importlib.import_module(), but raises ImportError instead of ValueError on Python <3.9 to achieve consistent behavior across all supported versions of Python.

Example

>>> import_module('sys')
<module 'sys' (built-in)>
>>> import_module('sys') is import_module('sys')
True
>>> import_module('.abc', 'collections').__name__
'collections.abc'
>>> import_module('..abc', 'collections').__name__
Traceback (most recent call last):
ImportError: attempted relative import beyond top-level package
aiologic.meta.import_from(module: ModuleType | str, name0: str, /, *, package: str | None = None) Any
aiologic.meta.import_from(module: ModuleType | str, name0: str, name1: str, /, *names: str, package: str | None = None) tuple[Any, ...]

Import objects by given names from the specified module.

If module is a string, import_module(module, package) is used to obtain the module object. It is also used to attempt to import a submodule if the module does not contain an attribute of the same name.

Raises:

ImportError – if any given name is not in the module.

Example

>>> pi = import_from('math', 'pi')
>>> pi
3.141592653589793
>>> pi, e = import_from('math', 'pi', 'e')
>>> pi, e
(3.141592653589793, 2.718281828459045)
>>> import_from('sys', 'circus')
Traceback (most recent call last):
ImportError: cannot import name 'circus' from 'sys' (...)
aiologic.meta.import_original(module: ModuleType | str, name0: str, /, *, package: str | None = None) Any
aiologic.meta.import_original(module: ModuleType | str, name0: str, name1: str, /, *names: str, package: str | None = None) tuple[Any, ...]

Import unpatched objects by given names from the specified module.

It behaves the same way as import_from(), except that it attempts to import the objects that have been replaced by some green library (e.g., as a result of calling gevent.monkey.patch_all()) instead of the patched ones. However, keep in mind that:

  1. Functions that are not built-in access their __globals__ attribute. Therefore, if the returned object was defined in the same module where its patched dependencies are located (as in the case of gevent), it may not behave as expected. For example, threading.Semaphore may still use the patched threading.Lock for its blocking operations.

  2. If the returned object was defined in a fake module (as in the case of eventlet), it may still not behave as expected. For example, all started threads may be treated as daemonic just because they are not registered in the global dictionary of the patched threading module.

  3. Not all original objects may be available, since some are removed rather than replaced.

Because of the above, you should avoid using any objects that depend in any way on the module’s state. Check the source code to determine which objects can be used reliably.

Unlike import_from(), it does not import submodules of patched modules.

aiologic.meta.isgreenpatched(module: ModuleType | str, /) bool

Return True if module has been monkey-patched by any of the supported green libraries, False otherwise.

Exports

aiologic.meta.export(package_namespace: ModuleType | MutableMapping[str, object], /) None

Prepare package_namespace for external use.

Its contents must be structured as follows:

  • Every non-public submodule/subpackage that is part of the implementation has a name that starts with the underscore character (package._util).

  • Every public submodule/subpackage that is available for direct use has a name that does not start with the underscore character (package.abc).

  • Every member of a public submodule/subpackage (including the package itself) follows the same naming rules.

The result of applying the function will be to update attributes of all public members so that they look as if they were defined directly in the package. If a public submodule/subpackage or class is encountered, the function is also applied recursively to its members. Additionally, for each public submodule/subpackage (including the package itself), a human-readable __all__ is built, which includes the names of all public members that are not submodules/subpackages.

Typically, the usage is as follows: export(globals()) near the end of __init__.py. This allows the package to be safely split into subpackages and submodules without breaking pickling on incompatible implementation changes and while preserving convenient representations (which is especially important for exceptions).

Caution

If the function updates the same object by different names (or in different namespaces), the result is undefined, especially when parallel calls are made. So avoid providing access to the same object in different ways.

aiologic.meta.export_dynamic(module_namespace: ModuleType | MutableMapping[str, object], link_name: str, target: str, /) None

Register a dynamic export (symbolic link) in the specified module_namespace.

On the first call, the function defines __getattr__() in module_namespace. When attempting to retrieve an undefined attribute from the module object by link_name, it imports target via import_from(), updates its attributes, caches it in the namespace, and returns it.

target can be an absolute path (package.module.attribute) or a relative path (..attribute). If it does not contain the dot character, the name relative to the module is implied (name is equivalent to .name).

Useful for defining optional package members that are not available in all environments.

Raises:
  • RuntimeError – if link_name cannot be registered.

  • ValueError – if target is beyond the top-level package.

aiologic.meta.export_deprecated(module_namespace: ModuleType | MutableMapping[str, object], link_name: str, target: str, /) None

Register a deprecated export (symbolic link) in the specified module_namespace.

Like export_dynamic(), but raises DeprecationWarning on the first attempt to access the attribute, and never updates attributes of the latter.

Useful for providing a temporary alias by the old name to a renamed object.

Helpers

class aiologic.meta.GeneratorCoroutineWrapper

Bases: Generator[_YieldT_co, _SendT_contra, _ReturnT_co], Coroutine[_YieldT_co, _SendT_contra, _ReturnT_co], Generic[_YieldT_co, _SendT_contra, _ReturnT_co]

__init__(wrapped: Generator[_YieldT_co, _SendT_contra, _ReturnT_co] | Coroutine[_YieldT_co, _SendT_contra, _ReturnT_co], /) None

__await__() Self

__iter__() Self

__next__() _YieldT_co

send(value: _SendT_contra, /) _YieldT_co

throw(exc_type: type[BaseException], exc_value: BaseException | object = None, traceback: TracebackType | None = None, /) _YieldT_co
throw(exc_type: BaseException, exc_value: None = None, traceback: TracebackType | None = None, /) _YieldT_co

close() None

property gi: Generator[_YieldT_co, _SendT_contra, _ReturnT_co]

The underlying generator-like object (see isgeneratorlike()), if the wrapped object is one. Otherwise, raises AttributeError.

Example

>>> def generator_function():
...     return
...     yield  # generator definition
>>> async def coroutine_function():
...     pass
>>> gen = GeneratorCoroutineWrapper(generator_function())
>>> gen.gi
<generator object generator_function at ...>
>>> gen.close()
>>> coro = GeneratorCoroutineWrapper(coroutine_function())
>>> coro.gi
Traceback (most recent call last):
AttributeError: the wrapped object is not a generator
>>> coro.close()
property cr: Coroutine[_YieldT_co, _SendT_contra, _ReturnT_co]

The underlying coroutine-like object (see iscoroutinelike()), if the wrapped object is one. Otherwise, raises AttributeError.

Example

>>> def generator_function():
...     return
...     yield  # generator definition
>>> async def coroutine_function():
...     pass
>>> gen = GeneratorCoroutineWrapper(generator_function())
>>> gen.cr
Traceback (most recent call last):
AttributeError: the wrapped object is not a coroutine
>>> gen.close()
>>> coro = GeneratorCoroutineWrapper(coroutine_function())
>>> coro.cr
<coroutine object coroutine_function at ...>
>>> coro.close()
property gi_code: CodeType

An associated code object of the wrapped object.

This property is provided for compatibility with types.GeneratorType instances. It is equivalent to one of the following:

If none is available, raises AttributeError.

Example

>>> def generator_function():
...     return
...     yield  # generator definition
>>> async def coroutine_function():
...     pass
>>> gen = GeneratorCoroutineWrapper(generator_function())
>>> gen.gi_code
<code object generator_function at ...>
>>> gen.close()
>>> coro = GeneratorCoroutineWrapper(coroutine_function())
>>> coro.gi_code
<code object coroutine_function at ...>
>>> coro.close()
property cr_code: CodeType

An associated code object of the wrapped object.

This property is provided for compatibility with types.CoroutineType instances. It is equivalent to one of the following:

If none is available, raises AttributeError.

Example

>>> def generator_function():
...     return
...     yield  # generator definition
>>> async def coroutine_function():
...     pass
>>> gen = GeneratorCoroutineWrapper(generator_function())
>>> gen.cr_code
<code object generator_function at ...>
>>> gen.close()
>>> coro = GeneratorCoroutineWrapper(coroutine_function())
>>> coro.cr_code
<code object coroutine_function at ...>
>>> coro.close()
property gi_frame: FrameType | None

An associated frame object of the wrapped object, or None (if execution has completed).

This property is provided for compatibility with types.GeneratorType instances. It is equivalent to one of the following:

If none is available, raises AttributeError.

Example

>>> def generator_function():
...     return
...     yield  # generator definition
>>> async def coroutine_function():
...     pass
>>> gen = GeneratorCoroutineWrapper(generator_function())
>>> gen.gi_frame
<frame at ...>
>>> gen.close()
>>> gen.gi_frame is None
True
>>> coro = GeneratorCoroutineWrapper(coroutine_function())
>>> coro.gi_frame
<frame at ...>
>>> coro.close()
>>> coro.gi_frame is None
True
property cr_frame: FrameType | None

An associated frame object of the wrapped object, or None (if execution has completed).

This property is provided for compatibility with types.CoroutineType instances. It is equivalent to one of the following:

If none is available, raises AttributeError.

Example

>>> def generator_function():
...     return
...     yield  # generator definition
>>> async def coroutine_function():
...     pass
>>> gen = GeneratorCoroutineWrapper(generator_function())
>>> gen.cr_frame
<frame at ...>
>>> gen.close()
>>> gen.cr_frame is None
True
>>> coro = GeneratorCoroutineWrapper(coroutine_function())
>>> coro.cr_frame
<frame at ...>
>>> coro.close()
>>> coro.cr_frame is None
True
property gi_running: bool

A boolean that is True if the wrapped object is currently being executed by the interpreter (created, not suspended, and not closed), False otherwise (see inspect.getgeneratorstate()).

This property is provided for compatibility with types.GeneratorType instances. It is equivalent to one of the following:

If none is available, raises AttributeError.

Example

>>> def generator_function():
...     yield from [target.gi_running]
>>> async def coroutine_function():
...     await GeneratorCoroutineWrapper(generator_function())
>>> gen = GeneratorCoroutineWrapper(generator_function())
>>> next(target := gen)
True
>>> gen.gi_running
False
>>> gen.close()
>>> coro = GeneratorCoroutineWrapper(coroutine_function())
>>> next(target := coro)
True
>>> coro.gi_running
False
>>> coro.close()
property cr_running: bool

A boolean that is True if the wrapped object is currently being executed by the interpreter (created, not suspended, and not closed), False otherwise (see inspect.getcoroutinestate()).

This property is provided for compatibility with types.CoroutineType instances. It is equivalent to one of the following:

If none is available, raises AttributeError.

Example

>>> def generator_function():
...     yield from [target.cr_running]
>>> async def coroutine_function():
...     await GeneratorCoroutineWrapper(generator_function())
>>> gen = GeneratorCoroutineWrapper(generator_function())
>>> next(target := gen)
True
>>> gen.cr_running
False
>>> gen.close()
>>> coro = GeneratorCoroutineWrapper(coroutine_function())
>>> next(target := coro)
True
>>> coro.cr_running
False
>>> coro.close()
property gi_suspended: bool

A boolean that is True if the wrapped object is currently suspended (created, not running, and not closed), False otherwise (see inspect.getgeneratorstate()).

This property is provided for compatibility with types.GeneratorType instances (Python ≥3.11). It is equivalent to one of the following:

If none is available, raises AttributeError.

Example

>>> def generator_function():
...     yield from [target.gi_suspended]
>>> async def coroutine_function():
...     await GeneratorCoroutineWrapper(generator_function())
>>> gen = GeneratorCoroutineWrapper(generator_function())
>>> next(target := gen)
False
>>> gen.gi_suspended
True
>>> gen.close()
>>> coro = GeneratorCoroutineWrapper(coroutine_function())
>>> next(target := coro)
False
>>> coro.gi_suspended
True
>>> coro.close()
property cr_suspended: bool

A boolean that is True if the wrapped object is currently suspended (created, not running, and not closed), False otherwise (see inspect.getcoroutinestate()).

This property is provided for compatibility with types.CoroutineType instances (Python ≥3.11). It is equivalent to one of the following:

If none is available, raises AttributeError.

Example

>>> def generator_function():
...     yield from [target.cr_suspended]
>>> async def coroutine_function():
...     await GeneratorCoroutineWrapper(generator_function())
>>> gen = GeneratorCoroutineWrapper(generator_function())
>>> next(target := gen)
False
>>> gen.cr_suspended
True
>>> gen.close()
>>> coro = GeneratorCoroutineWrapper(coroutine_function())
>>> next(target := coro)
False
>>> coro.cr_suspended
True
>>> coro.close()
property gi_yieldfrom: object | None

An associated iterated object of the wrapped object, or None.

This property is provided for compatibility with types.GeneratorType instances. It is equivalent to one of the following:

If none is available, raises AttributeError.

Example

>>> def generator_function():
...     yield from [target.gi_yieldfrom]
>>> async def coroutine_function():
...     await GeneratorCoroutineWrapper(generator_function())
>>> gen = GeneratorCoroutineWrapper(generator_function())
>>> next(target := gen) is None
True
>>> gen.gi_yieldfrom
<list_iterator object at ...>
>>> gen.close()
>>> coro = GeneratorCoroutineWrapper(coroutine_function())
>>> next(target := coro) is None
True
>>> coro.gi_yieldfrom
<aiologic.meta.GeneratorCoroutineWrapper object at ...>
>>> coro.close()
property cr_await: object | None

An associated iterated object of the wrapped object, or None.

This property is provided for compatibility with types.CoroutineType instances. It is equivalent to one of the following:

If none is available, raises AttributeError.

Example

>>> def generator_function():
...     yield from [target.cr_await]
>>> async def coroutine_function():
...     await GeneratorCoroutineWrapper(generator_function())
>>> gen = GeneratorCoroutineWrapper(generator_function())
>>> next(target := gen) is None
True
>>> gen.cr_await
<list_iterator object at ...>
>>> gen.close()
>>> coro = GeneratorCoroutineWrapper(coroutine_function())
>>> next(target := coro) is None
True
>>> coro.cr_await
<aiologic.meta.GeneratorCoroutineWrapper object at ...>
>>> coro.close()
property gi_origin: tuple[tuple[str, int, str], ...] | None

A tuple of (filename, line_number, function_name) tuples describing the traceback where the wrapped object was created, or None (see sys.set_coroutine_origin_tracking_depth()).

This property is provided for consistency with types.CoroutineType instances. It is equivalent to one of the following:

If none is available, raises AttributeError.

Example

>>> import sys
>>> def generator_function():
...     return
...     yield  # generator definition
>>> async def coroutine_function():
...     pass
>>> gen = GeneratorCoroutineWrapper(generator_function())
>>> gen.gi_origin
Traceback (most recent call last):
AttributeError: the wrapped object has not attribute 'gi_origin'
>>> gen.close()
>>> sys.set_coroutine_origin_tracking_depth(0)
>>> coro = GeneratorCoroutineWrapper(coroutine_function())
>>> coro.gi_origin is None
True
>>> coro.close()
>>> sys.set_coroutine_origin_tracking_depth(1)
>>> coro = GeneratorCoroutineWrapper(coroutine_function())
>>> coro.gi_origin
((..., ..., ...),)
>>> coro.close()
>>> sys.set_coroutine_origin_tracking_depth(0)
property cr_origin: tuple[tuple[str, int, str], ...] | None

A tuple of (filename, line_number, function_name) tuples describing the traceback where the wrapped object was created, or None (see sys.set_coroutine_origin_tracking_depth()).

This property is provided for compatibility with types.CoroutineType instances. It is equivalent to one of the following:

If none is available, raises AttributeError.

Example

>>> import sys
>>> def generator_function():
...     return
...     yield  # generator definition
>>> async def coroutine_function():
...     pass
>>> gen = GeneratorCoroutineWrapper(generator_function())
>>> gen.cr_origin
Traceback (most recent call last):
AttributeError: the wrapped object has not attribute 'cr_origin'
>>> gen.close()
>>> sys.set_coroutine_origin_tracking_depth(0)
>>> coro = GeneratorCoroutineWrapper(coroutine_function())
>>> coro.cr_origin is None
True
>>> coro.close()
>>> sys.set_coroutine_origin_tracking_depth(1)
>>> coro = GeneratorCoroutineWrapper(coroutine_function())
>>> coro.cr_origin
((..., ..., ...),)
>>> coro.close()
>>> sys.set_coroutine_origin_tracking_depth(0)
async aiologic.meta.await_for(awaitable: Awaitable[_T], /) _T

Wait for awaitable to complete.

Useful when you need to schedule waiting for an awaitable primitive via a function that only accepts asynchronous functions.