API reference
Synchronization primitives
Events
- class aiologic.Event
Bases:
object…
- __getnewargs__() tuple[Any, ...]
Returns arguments that can be used to create new instances with the same initial values.
Used by:
The current state does not affect the arguments.
Example
>>> orig = Event() >>> copy = Event(*orig.__getnewargs__())
- __bool__() bool
Returns
Trueif the event is set.Used by the standard truth testing procedure.
Example
>>> finished = Event() # event is unset >>> bool(finished) False >>> finished.set() # event is set >>> bool(finished) True
- class aiologic.REvent
Bases:
Event…
- __getnewargs__() tuple[Any, ...]
Returns arguments that can be used to create new instances with the same initial values.
Used by:
The current state does not affect the arguments.
Example
>>> orig = REvent() >>> copy = REvent(*orig.__getnewargs__())
- __bool__() bool
Returns
Trueif the event is set.Used by the standard truth testing procedure.
Example
>>> running = REvent() # event is unset >>> bool(running) False >>> running.set() # event is set >>> bool(running) True >>> running.clear() # event is unset >>> bool(running) False
- class aiologic.CountdownEvent
Bases:
object…
- static __new__(cls, /, initial_value: int | DefaultType = DEFAULT) Self
…
- __getnewargs__() tuple[Any, ...]
Returns arguments that can be used to create new instances with the same initial values.
Used by:
The current state does not affect the arguments.
Example
>>> orig = CountdownEvent(1) >>> orig.initial_value 1 >>> copy = CountdownEvent(*orig.__getnewargs__()) >>> copy.initial_value 1
- __bool__() bool
Returns
Trueif the event is set.Used by the standard truth testing procedure.
Example
>>> done = CountdownEvent() # event is set >>> bool(done) True >>> done.up() # event is unset >>> bool(done) False >>> done.down() # event is set >>> bool(done) True
Barriers
- class aiologic.Latch
Bases:
object…
- static __new__(cls, /, parties: int | DefaultType = DEFAULT) Self
…
- __getnewargs__() tuple[Any, ...]
Returns arguments that can be used to create new instances with the same initial values.
Used by:
The current state does not affect the arguments.
Example
>>> orig = Latch(4) >>> orig.parties 4 >>> copy = Latch(*orig.__getnewargs__()) >>> copy.parties 4
- __bool__() bool
Returns
Trueif the barrier has been passed or broken.Used by the standard truth testing procedure.
Example
>>> started = Latch(1) # barrier is filling >>> bool(started) False >>> started.wait() # barrier is draining >>> bool(started) True >>> started.abort() # barrier is broken >>> bool(started) True
- class aiologic.Barrier
Bases:
object…
- static __new__(cls, /, parties: int | DefaultType = DEFAULT) Self
…
- __getnewargs__() tuple[Any, ...]
Returns arguments that can be used to create new instances with the same initial values.
Used by:
The current state does not affect the arguments.
Example
>>> orig = Barrier(4) >>> orig.parties 4 >>> copy = Barrier(*orig.__getnewargs__()) >>> copy.parties 4
- async __aexit__(exc_type: type[BaseException] | None, exc_value: BaseException | None, traceback: TracebackType | None) None
…
- __exit__(exc_type: type[BaseException] | None, exc_value: BaseException | None, traceback: TracebackType | None) None
…
- class aiologic.RBarrier
Bases:
Barrier…
- static __new__(cls, /, parties: int | DefaultType = DEFAULT) Self
…
- __getnewargs__() tuple[Any, ...]
Returns arguments that can be used to create new instances with the same initial values.
Used by:
The current state does not affect the arguments.
Example
>>> orig = RBarrier(4) >>> orig.parties 4 >>> copy = RBarrier(*orig.__getnewargs__()) >>> copy.parties 4
- async __aexit__(exc_type: type[BaseException] | None, exc_value: BaseException | None, traceback: TracebackType | None) None
…
- __exit__(exc_type: type[BaseException] | None, exc_value: BaseException | None, traceback: TracebackType | None) None
…
- exception aiologic.BrokenBarrierError
Bases:
RuntimeError…
Semaphores
- class aiologic.Semaphore
Bases:
object…
- static __new__(cls, /, initial_value: int | DefaultType = DEFAULT, max_value: None = None) Self
- static __new__(cls, /, initial_value: int | DefaultType, max_value: int | DefaultType) BoundedSemaphore
- static __new__(cls, /, *, max_value: int | DefaultType) BoundedSemaphore
…
- __getnewargs__() tuple[Any, ...]
Returns arguments that can be used to create new instances with the same initial values.
Used by:
The current state does not affect the arguments.
Example
>>> orig = Semaphore(2) >>> orig.initial_value 2 >>> copy = Semaphore(*orig.__getnewargs__()) >>> copy.initial_value 2
- async __aexit__(exc_type: type[BaseException] | None, exc_value: BaseException | None, traceback: TracebackType | None) None
…
- __exit__(exc_type: type[BaseException] | None, exc_value: BaseException | None, traceback: TracebackType | None) None
…
- class aiologic.BoundedSemaphore
Bases:
Semaphore…
- static __new__(cls, /, initial_value: int | DefaultType = DEFAULT, max_value: int | DefaultType = DEFAULT) Self
…
- __getnewargs__() tuple[Any, ...]
Returns arguments that can be used to create new instances with the same initial values.
Used by:
The current state does not affect the arguments.
Example
>>> orig = BoundedSemaphore(2) >>> orig.max_value 2 >>> copy = BoundedSemaphore(*orig.__getnewargs__()) >>> copy.max_value 2
- async __aexit__(exc_type: type[BaseException] | None, exc_value: BaseException | None, traceback: TracebackType | None) None
…
- __exit__(exc_type: type[BaseException] | None, exc_value: BaseException | None, traceback: TracebackType | None) None
…
- class aiologic.BinarySemaphore
Bases:
Semaphore…
- static __new__(cls, /, initial_value: int | DefaultType = DEFAULT, max_value: None = None) Self
- static __new__(cls, /, initial_value: int | DefaultType, max_value: int | DefaultType) BoundedBinarySemaphore
- static __new__(cls, /, *, max_value: int | DefaultType) BoundedBinarySemaphore
…
- __getnewargs__() tuple[Any, ...]
Returns arguments that can be used to create new instances with the same initial values.
Used by:
The current state does not affect the arguments.
Example
>>> orig = BinarySemaphore(0) >>> orig.initial_value 0 >>> copy = BinarySemaphore(*orig.__getnewargs__()) >>> copy.initial_value 0
- async __aexit__(exc_type: type[BaseException] | None, exc_value: BaseException | None, traceback: TracebackType | None) None
…
- __exit__(exc_type: type[BaseException] | None, exc_value: BaseException | None, traceback: TracebackType | None) None
…
- class aiologic.BoundedBinarySemaphore
Bases:
BinarySemaphore,BoundedSemaphore…
- static __new__(cls, /, initial_value: int | DefaultType = DEFAULT, max_value: int | DefaultType = DEFAULT) Self
…
- __getnewargs__() tuple[Any, ...]
Returns arguments that can be used to create new instances with the same initial values.
Used by:
The current state does not affect the arguments.
Example
>>> orig = BoundedBinarySemaphore(0) >>> orig.max_value 0 >>> copy = BoundedBinarySemaphore(*orig.__getnewargs__()) >>> copy.max_value 0
- async __aexit__(exc_type: type[BaseException] | None, exc_value: BaseException | None, traceback: TracebackType | None) None
…
- __exit__(exc_type: type[BaseException] | None, exc_value: BaseException | None, traceback: TracebackType | None) None
…
Capacity limiters
- class aiologic.CapacityLimiter
Bases:
object…
- static __new__(cls, /, total_tokens: int | DefaultType = DEFAULT) Self
…
- __getnewargs__() tuple[Any, ...]
Returns arguments that can be used to create new instances with the same initial values.
Used by:
The current state does not affect the arguments.
Example
>>> orig = CapacityLimiter(3) >>> orig.total_tokens 3 >>> copy = CapacityLimiter(*orig.__getnewargs__()) >>> copy.total_tokens 3
- __bool__() bool
Returns
Trueif the capacity limiter is used by any task.Used by the standard truth testing procedure.
Example
>>> reading = CapacityLimiter() >>> bool(reading) False >>> with reading: # capacity limiter is in use ... bool(reading) True >>> bool(reading) False
- async __aexit__(exc_type: type[BaseException] | None, exc_value: BaseException | None, traceback: TracebackType | None) None
…
- __exit__(exc_type: type[BaseException] | None, exc_value: BaseException | None, traceback: TracebackType | None) None
…
- async_borrowed() bool
Return
Trueif the current async task holds any token.Example
>>> limiter = CapacityLimiter() >>> limiter.async_borrowed() False >>> async with limiter: ... limiter.async_borrowed() True >>> limiter.async_borrowed() False
- green_borrowed() bool
Return
Trueif the current green task holds any token.Example
>>> limiter = CapacityLimiter() >>> limiter.green_borrowed() False >>> with limiter: ... limiter.green_borrowed() True >>> limiter.green_borrowed() False
- property available_tokens: int
The current number of tokens available to be borrowed.
It may not change after release if all the released tokens have been reassigned to waiting tasks.
- property borrowed_tokens: int
The current number of tokens that have been borrowed.
It may not change after release if all the released tokens have been reassigned to waiting tasks.
- property borrowers: MappingProxyType
The read-only proxy of the dictionary that maps tasks’ identifiers to their respective recursion levels. Contains identifiers of only those tasks that hold any token. Updated automatically when the current state changes.
It may not contain identifiers of those tasks to which tokens were reassigned during release if they have not yet resumed execution.
- class aiologic.RCapacityLimiter
Bases:
CapacityLimiter…
- static __new__(cls, /, total_tokens: int | DefaultType = DEFAULT) Self
…
- __getnewargs__() tuple[Any, ...]
Returns arguments that can be used to create new instances with the same initial values.
Used by:
The current state does not affect the arguments.
Example
>>> orig = RCapacityLimiter(3) >>> orig.total_tokens 3 >>> copy = RCapacityLimiter(*orig.__getnewargs__()) >>> copy.total_tokens 3
- __bool__() bool
Returns
Trueif the capacity limiter is used by any task.Used by the standard truth testing procedure.
Example
>>> reading = RCapacityLimiter() >>> bool(reading) False >>> with reading: # capacity limiter is in use ... bool(reading) True >>> bool(reading) False
- async __aexit__(exc_type: type[BaseException] | None, exc_value: BaseException | None, traceback: TracebackType | None) None
…
- __exit__(exc_type: type[BaseException] | None, exc_value: BaseException | None, traceback: TracebackType | None) None
…
- async_borrowed() bool
Return
Trueif the current async task holds any token.Example
>>> limiter = RCapacityLimiter() >>> limiter.async_borrowed() False >>> async with limiter: ... limiter.async_borrowed() True >>> limiter.async_borrowed() False
- green_borrowed() bool
Return
Trueif the current green task holds any token.Example
>>> limiter = RCapacityLimiter() >>> limiter.green_borrowed() False >>> with limiter: ... limiter.green_borrowed() True >>> limiter.green_borrowed() False
- async_count() int
Return the recursion level of the current async task.
Example
>>> limiter = RCapacityLimiter() >>> limiter.async_count() 0 >>> async with limiter: ... limiter.async_count() 1 >>> limiter.async_count() 0
- green_count() int
Return the recursion level of the current green task.
Example
>>> limiter = RCapacityLimiter() >>> limiter.green_count() 0 >>> with limiter: ... limiter.green_count() 1 >>> limiter.green_count() 0
- property available_tokens: int
The current number of tokens available to be borrowed.
It may not change after release if all the released tokens have been reassigned to waiting tasks.
- property borrowed_tokens: int
The current number of tokens that have been borrowed.
It may not change after release if all the released tokens have been reassigned to waiting tasks.
- property borrowers: MappingProxyType
The read-only proxy of the dictionary that maps tasks’ identifiers to their respective recursion levels. Contains identifiers of only those tasks that hold any token. Updated automatically when the current state changes.
It may not contain identifiers of those tasks to which tokens were reassigned during release if they have not yet resumed execution.
Locks
- class aiologic.Lock
Bases:
object…
- __getnewargs__() tuple[Any, ...]
Returns arguments that can be used to create new instances with the same initial values.
Used by:
The current state does not affect the arguments.
Example
>>> orig = Lock() >>> copy = Lock(*orig.__getnewargs__())
- __bool__() bool
Returns
Trueif the lock is used by any task.Used by the standard truth testing procedure.
Example
>>> writing = Lock() >>> bool(writing) False >>> with writing: # lock is in use ... bool(writing) True >>> bool(writing) False
- async __aexit__(exc_type: type[BaseException] | None, exc_value: BaseException | None, traceback: TracebackType | None) None
…
- __exit__(exc_type: type[BaseException] | None, exc_value: BaseException | None, traceback: TracebackType | None) None
…
- async_owned() bool
Return
Trueif the current async task owns the lock.Unlike the
ownerproperty, always reliable.Example
>>> lock = Lock() >>> lock.async_owned() False >>> async with lock: ... lock.async_owned() True >>> lock.async_owned() False
- green_owned() bool
Return
Trueif the current green task owns the lock.Unlike the
ownerproperty, always reliable.Example
>>> lock = Lock() >>> lock.green_owned() False >>> with lock: ... lock.green_owned() True >>> lock.green_owned() False
- locked() bool
Return
Trueif anyone owns the lock.Example
>>> import asyncio >>> async def own_the_lock(): ... async with lock: ... await asyncio.sleep(3600) >>> lock = Lock() >>> lock.locked() False >>> task = asyncio.create_task(own_the_lock()) >>> lock.locked() True >>> task.cancel() >>> lock.locked() False
- property owner: tuple[str, int] | None
The current identifier of the task that owns the lock, or
Noneif no one owns the lock.It is not reliable during release, as it may temporarily be the identifier of a task that has cancelled the
async_acquire()orgreen_acquire()call (e.g., due to a timeout).
- class aiologic.RLock
Bases:
Lock…
- __getnewargs__() tuple[Any, ...]
Returns arguments that can be used to create new instances with the same initial values.
Used by:
The current state does not affect the arguments.
Example
>>> orig = RLock() >>> copy = RLock(*orig.__getnewargs__())
- __bool__() bool
Returns
Trueif the lock is used by any task.Used by the standard truth testing procedure.
Example
>>> writing = RLock() >>> bool(writing) False >>> with writing: # lock is in use ... bool(writing) True >>> bool(writing) False
- async __aexit__(exc_type: type[BaseException] | None, exc_value: BaseException | None, traceback: TracebackType | None) None
…
- __exit__(exc_type: type[BaseException] | None, exc_value: BaseException | None, traceback: TracebackType | None) None
…
- async_owned() bool
Return
Trueif the current async task owns the lock.Unlike the
ownerproperty, always reliable.Example
>>> lock = RLock() >>> lock.async_owned() False >>> async with lock: ... lock.async_owned() True >>> lock.async_owned() False
- green_owned() bool
Return
Trueif the current green task owns the lock.Unlike the
ownerproperty, always reliable.Example
>>> lock = RLock() >>> lock.green_owned() False >>> with lock: ... lock.green_owned() True >>> lock.green_owned() False
- async_count() int
Return the recursion level of the current async task.
Unlike the
countproperty, always reliable.Example
>>> lock = RLock() >>> lock.async_count() 0 >>> async with lock: ... lock.async_count() 1 >>> lock.async_count() 0
- green_count() int
Return the recursion level of the current green task.
Unlike the
countproperty, always reliable.Example
>>> lock = RLock() >>> lock.green_count() 0 >>> with lock: ... lock.green_count() 1 >>> lock.green_count() 0
- locked() bool
Return
Trueif anyone owns the lock.Example
>>> import asyncio >>> async def own_the_lock(): ... async with lock: ... await asyncio.sleep(3600) >>> lock = RLock() >>> lock.locked() False >>> task = asyncio.create_task(own_the_lock()) >>> lock.locked() True >>> task.cancel() >>> lock.locked() False
- property owner: tuple[str, int] | None
The current identifier of the task that owns the lock, or
Noneif no one owns the lock.It is not reliable during release, as it may temporarily be the identifier of a task that has cancelled the
async_acquire()orgreen_acquire()call (e.g., due to a timeout).
- property count: int
The current recursion level of the task that owns the lock, or
0if no one owns the lock.It is not reliable during release, as it may temporarily be the recursion level of a task that has cancelled the
async_acquire()orgreen_acquire()call (e.g., due to a timeout).
- aiologic.synchronized(wrapped: _AALock, /) _AASynchronizer
- aiologic.synchronized(wrapped: _ASLock, /) _ASSynchronizer
- aiologic.synchronized(wrapped: _SSLock, /) _SSSynchronizer
- aiologic.synchronized(wrapped: _MMLock, /) _MMSynchronizer
- aiologic.synchronized(wrapped: _SynchronizedType[_LockT], /) _SynchronizedDecorator
- aiologic.synchronized(wrapped: _CallableT, /) _CallableT
- aiologic.synchronized(wrapped: object, /) _SynchronizedDecorator
…
Condition variables
- class aiologic.Condition
Bases:
Generic[_T_co,_S_co]…
- static __new__(cls, /, lock: DefaultType = DEFAULT, timer: DefaultType = DEFAULT) Condition[RLock, Callable[[], int]]
- static __new__(cls, /, lock: DefaultType = DEFAULT, *, timer: _S_co) Condition[RLock, _S_co]
- static __new__(cls, /, lock: _T_co, timer: DefaultType = DEFAULT) Condition[_T_co, Callable[[], int]]
- static __new__(cls, /, lock: _T_co, timer: _S_co) Self
…
- __getnewargs__() tuple[Any, ...]
Returns arguments that can be used to create new instances with the same initial values.
Used by:
The current state does not affect the arguments.
Example
>>> orig = Condition() >>> copy = Condition(*orig.__getnewargs__()) >>> copy.lock is orig.lock True
- __bool__() bool
Returns
Trueif the underlying lock is used by any task.If there is no lock, returns
False.Used by the standard truth testing procedure.
Example
>>> accessing = Condition() >>> bool(accessing) False >>> with accessing: # condition variable is in use ... bool(accessing) True >>> bool(accessing) False
- async __aexit__(exc_type: type[BaseException] | None, exc_value: BaseException | None, traceback: TracebackType | None) None
…
- __exit__(exc_type: type[BaseException] | None, exc_value: BaseException | None, traceback: TracebackType | None) None
…
- async for_(predicate: Callable[[], _T], timeout: float | None = None, *, delegate: bool = True) _T
…
- property lock: _T_co
The underlying lock used by the condition variable.
- property timer: _S_co
The callable object used by the condition variable.
Communication primitives
Queues
- class aiologic.SimpleQueue
Bases:
Generic[_T]…
- static __new__(cls, items: Iterable[_T] | MissingType = MISSING, /) Self
…
- __getnewargs__() tuple[Any, ...]
Returns arguments that can be used to create new instances with the same state.
Used by:
The current state affects the arguments.
Example
>>> orig = SimpleQueue('items') >>> orig.green_get() 'i' >>> copy = SimpleQueue(*orig.__getnewargs__()) >>> copy.green_get() 't'
- __bool__() bool
Returns
Trueif the queue is not empty.Used by the standard truth testing procedure.
Example
>>> items = SimpleQueue() # queue is empty >>> bool(items) False >>> items.green_put('spam') # queue is not empty >>> bool(items) True >>> item = items.green_get() # queue is empty >>> bool(items) False
- __len__() int
Returns the number of items in the queue.
Used by the built-in function
len().Example
>>> items = SimpleQueue() # queue has no items >>> len(items) 0 >>> items.green_put('spam') # queue has one item >>> len(items) 1 >>> item = items.green_get() # queue has no items >>> len(items) 0
- property putting: int
The current number of tasks waiting to put.
It is always
0for simple queues.
- class aiologic.SimpleLifoQueue
Bases:
SimpleQueue[_T]…
- static __new__(cls, items: Iterable[_T] | MissingType = MISSING, /) Self
…
- __getnewargs__() tuple[Any, ...]
Returns arguments that can be used to create new instances with the same state.
Used by:
The current state affects the arguments.
Example
>>> orig = SimpleLifoQueue('items') >>> orig.green_get() 's' >>> copy = SimpleLifoQueue(*orig.__getnewargs__()) >>> copy.green_get() 'm'
- __bool__() bool
Returns
Trueif the queue is not empty.Used by the standard truth testing procedure.
Example
>>> items = SimpleLifoQueue() # queue is empty >>> bool(items) False >>> items.green_put('spam') # queue is not empty >>> bool(items) True >>> item = items.green_get() # queue is empty >>> bool(items) False
- __len__() int
Returns the number of items in the queue.
Used by the built-in function
len().Example
>>> items = SimpleLifoQueue() # queue has no items >>> len(items) 0 >>> items.green_put('spam') # queue has one item >>> len(items) 1 >>> item = items.green_get() # queue has no items >>> len(items) 0
- property putting: int
The current number of tasks waiting to put.
It is always
0for simple queues.
- class aiologic.Queue
Bases:
Generic[_T]…
- static __new__(cls, /, maxsize: int | None = None) Self
- static __new__(cls, items: Iterable[_T] | MissingType = MISSING, /, maxsize: int | None = None) Self
…
- __getnewargs__() tuple[Any, ...]
Returns arguments that can be used to create new instances with the same state.
Used by:
The current state affects the arguments.
Example
>>> orig = Queue('items') >>> orig.green_get() 'i' >>> copy = Queue(*orig.__getnewargs__()) >>> copy.green_get() 't'
- __bool__() bool
Returns
Trueif the queue is not empty.Used by the standard truth testing procedure.
Example
>>> items = Queue() # queue is empty >>> bool(items) False >>> items.green_put('spam') # queue is not empty >>> bool(items) True >>> item = items.green_get() # queue is empty >>> bool(items) False
- __len__() int
Returns the number of items in the queue.
Used by the built-in function
len().Example
>>> items = Queue() # queue has no items >>> len(items) 0 >>> items.green_put('spam') # queue has one item >>> len(items) 1 >>> item = items.green_get() # queue has no items >>> len(items) 0
- property putting: int
The current number of tasks waiting to put.
It represents the length of the waiting queue and thus changes immediately.
- class aiologic.LifoQueue
Bases:
Queue[_T]…
- static __new__(cls, /, maxsize: int | None = None) Self
- static __new__(cls, items: Iterable[_T] | MissingType = MISSING, /, maxsize: int | None = None) Self
…
- __getnewargs__() tuple[Any, ...]
Returns arguments that can be used to create new instances with the same state.
Used by:
The current state affects the arguments.
Example
>>> orig = LifoQueue('items') >>> orig.green_get() 's' >>> copy = LifoQueue(*orig.__getnewargs__()) >>> copy.green_get() 'm'
- __bool__() bool
Returns
Trueif the queue is not empty.Used by the standard truth testing procedure.
Example
>>> items = LifoQueue() # queue is empty >>> bool(items) False >>> items.green_put('spam') # queue is not empty >>> bool(items) True >>> item = items.green_get() # queue is empty >>> bool(items) False
- __len__() bool
Returns the number of items in the queue.
Used by the built-in function
len().Example
>>> items = LifoQueue() # queue has no items >>> len(items) 0 >>> items.green_put('spam') # queue has one item >>> len(items) 1 >>> item = items.green_get() # queue has no items >>> len(items) 0
- property putting: int
The current number of tasks waiting to put.
It represents the length of the waiting queue and thus changes immediately.
- class aiologic.PriorityQueue
Bases:
Queue[_RichComparableT]…
- static __new__(cls, /, maxsize: int | None = None) Self
- static __new__(cls, items: Iterable[_RichComparableT] | MissingType = MISSING, /, maxsize: int | None = None) Self
…
- __getnewargs__() tuple[Any, ...]
Returns arguments that can be used to create new instances with the same state.
Used by:
The current state affects the arguments.
Example
>>> orig = PriorityQueue('items') >>> orig.green_get() 'e' >>> copy = PriorityQueue(*orig.__getnewargs__()) >>> copy.green_get() 'i'
- __bool__() bool
Returns
Trueif the queue is not empty.Used by the standard truth testing procedure.
Example
>>> items = PriorityQueue() # queue is empty >>> bool(items) False >>> items.green_put('spam') # queue is not empty >>> bool(items) True >>> item = items.green_get() # queue is empty >>> bool(items) False
- __len__() bool
Returns the number of items in the queue.
Used by the built-in function
len().Example
>>> items = PriorityQueue() # queue has no items >>> len(items) 0 >>> items.green_put('spam') # queue has one item >>> len(items) 1 >>> item = items.green_get() # queue has no items >>> len(items) 0
- async async_put(item: _RichComparableT, *, blocking: bool = True, timeout: float | None = None) None
…
- property putting: int
The current number of tasks waiting to put.
It represents the length of the waiting queue and thus changes immediately.
Non-blocking primitives
Flags
- class aiologic.Flag
Bases:
Generic[_T]…
- static __new__(cls, /, marker: _T | MissingType = MISSING) Self
…
- __getnewargs__() tuple[Any, ...]
Returns arguments that can be used to create new instances with the same state.
Used by:
The current state affects the arguments.
Example
>>> orig = Flag() >>> orig.set('value') # change the state >>> orig.get() 'value' >>> copy = Flag(*orig.__getnewargs__()) >>> copy.get() 'value'
- __bool__() bool
Returns
Trueif the flag is set.Used by the standard truth testing procedure.
Example
>>> cancelled = Flag() # flag is unset >>> bool(cancelled) False >>> cancelled.set() # flag is set >>> bool(cancelled) True
- get(default: _T | MissingType = MISSING, *, default_factory: MissingType = MISSING) _T
- get(default: _D, *, default_factory: MissingType = MISSING) _T | _D
- get(default: MissingType = MISSING, *, default_factory: Callable[[], _T]) _T
- get(default: MissingType = MISSING, *, default_factory: Callable[[], _D]) _T | _D
…
- set(marker: MissingType = MISSING) bool
- set(marker: _T) bool
…
Resource guards
- class aiologic.ResourceGuard
Bases:
object…
- static __new__(cls, _: DefaultType = DEFAULT, /, action: str | DefaultType = DEFAULT) Self
…
- __getnewargs__() tuple[Any, ...]
Returns arguments that can be used to create new instances with the same initial values.
Used by:
The current state does not affect the arguments.
Example
>>> orig = ResourceGuard(action='waiting') >>> orig.action 'waiting' >>> copy = ResourceGuard(*orig.__getnewargs__()) >>> copy.action 'waiting'
- __bool__() bool
Returns
Trueif the resource guard is used by any task.Used by the standard truth testing procedure.
Example
>>> using = ResourceGuard() >>> bool(using) False >>> with using: # resource guard is in use ... bool(using) True >>> bool(using) False
- __exit__(exc_type: type[BaseException] | None, exc_value: BaseException | None, traceback: TracebackType | None) None
…
- exception aiologic.BusyResourceError
Bases:
RuntimeError…
Low-level primitives
Waiters (low-level)
- class aiologic.lowlevel.Waiter
Bases:
ProtocolThe simplest synchronization primitive.
It represents a blocking call that can either be completed from outside by a task/thread or be cancelled (due to a timeout or an exception). One wait is one instance.
Unlike all other primitives, it is bound to the current event loop upon creation.
- wake() None
Reschedule (resume/notify) the task that is blocked.
It can be called multiple times and in parallel: all calls are expected to be serialized by the event loop (if any). If no task is blocked on the primitive, it has no effect.
Note that while redundant calls are allowed, they can lead to excessive load on the event loop in the form of callback flood. If your scenario involves either multiple notifiers or premature task rescheduling (e.g., due to a timeout), consider using events instead of waiters.
- class aiologic.lowlevel.AsyncWaiter
-
The return type of
create_async_waiter().- __await__(timeout: float | None = None) Generator[Any, Any, bool]
Block (put to sleep) the task until
wake()is called from any thread, and then returnTrue.It must be called exactly once (or never) during the object’s lifetime, even if the first call was cancelled due to a timeout. Otherwise, the behavior is undefined.
Used by the
awaitexpressions.
- async with_(timeout: float | None = None) bool
Block (put to sleep) the task until
wake()is called from any thread, and then returnTrue.It must be called exactly once (or never) during the object’s lifetime, even if the first call was cancelled due to a timeout. Otherwise, the behavior is undefined.
- wake() None
Reschedule (resume/notify) the task that is blocked.
It can be called multiple times and in parallel: all calls are expected to be serialized by the event loop (if any). If no task is blocked on the primitive, it has no effect.
Note that while redundant calls are allowed, they can lead to excessive load on the event loop in the form of callback flood. If your scenario involves either multiple notifiers or premature task rescheduling (e.g., due to a timeout), consider using events instead of waiters.
- property shield: bool
A boolean that is
Trueif the__await__()/with_()call will be shielded from external cancellation,Falseotherwise.The effect is mostly equivalent to applying the
shield()universal decorator to the primitive/method, but it is more efficient. Also, any non-negative timeout passed to the method will be ignored.It can be rewritten, but any changes will only take effect until the call.
- class aiologic.lowlevel.GreenWaiter
-
The return type of
create_green_waiter().- wait(timeout: float | None = None) bool
Block (put to sleep) the task until
wake()is called from any thread, and then returnTrue.It must be called exactly once (or never) during the object’s lifetime, even if the first call was cancelled due to a timeout. Otherwise, the behavior is undefined.
- wake() None
Reschedule (resume/notify) the task that is blocked.
It can be called multiple times and in parallel: all calls are expected to be serialized by the event loop (if any). If no task is blocked on the primitive, it has no effect.
Note that while redundant calls are allowed, they can lead to excessive load on the event loop in the form of callback flood. If your scenario involves either multiple notifiers or premature task rescheduling (e.g., due to a timeout), consider using events instead of waiters.
- property shield: bool
A boolean that is
Trueif thewait()call will be shielded from external cancellation,Falseotherwise.The effect is mostly equivalent to applying the
shield()universal decorator to the method, but it is more efficient. Also, any non-negative timeout passed to the method will be ignored.It can be rewritten, but any changes will only take effect until the call.
- aiologic.lowlevel.create_async_waiter(*, shield: bool = False) AsyncWaiter
Create a new instance for a blocking async call.
- Parameters:
shield – See the
shieldproperty.
- aiologic.lowlevel.create_green_waiter(*, shield: bool = False) GreenWaiter
Create a new instance for a blocking green call.
- Parameters:
shield – See the
shieldproperty.
Events (low-level)
- class aiologic.lowlevel.AsyncEvent
-
The return type of
create_async_waiter().- abstractmethod __await__(timeout: float | None = None) Generator[Any, Any, bool]
Block (put to sleep) the task until
set()is called from any thread, and then returnTrue.It must be called exactly once (or never) during the object’s lifetime, even if the first call was cancelled due to a timeout. Otherwise, a
RuntimeErroris raised.If the event is already in the set state (and the method has not yet been called), this is equivalent to
async_checkpoint().
- abstractmethod async with_(timeout: float | None = None) bool
Block (put to sleep) the task until
set()is called from any thread, and then returnTrue.It must be called exactly once (or never) during the object’s lifetime, even if the first call was cancelled due to a timeout. Otherwise, a
RuntimeErroris raised.If the event is already in the set state (and the method has not yet been called), this is equivalent to
async_checkpoint().
- abstractmethod __bool__() bool
Return
Trueif theset()method was successfully called,Falseotherwise. Mutually exclusive withcancelled().It is reliable only after the task has been rescheduled (that is, primarily for checks on the task’s own side). Attempts at parallel checks from notifiers may return false values (situations where the
set()method has already been called by someone else, but bothis_set()andcancelled()returnFalse; the same applies to races between the task and one notifier when the former is cancelled).Used by the standard truth testing procedure.
- abstractmethod set() bool
Put the event into the set state, and return
Trueif this was the first successful attempt (no one preempted it, and the call to the__await__()/with_()method was not cancelled),Falseotherwise.If the task is already blocked, it is rescheduled. Otherwise, the subsequent call will behave only as a checkpoint (no actual waiting).
- abstractmethod is_set() bool
Return
Trueif theset()method was successfully called,Falseotherwise. Mutually exclusive withcancelled().It is reliable only after the task has been rescheduled (that is, primarily for checks on the task’s own side). Attempts at parallel checks from notifiers may return false values (situations where the
set()method has already been called by someone else, but bothis_set()andcancelled()returnFalse; the same applies to races between the task and one notifier when the former is cancelled).
- abstractmethod cancelled() bool
Return
Trueif the call was cancelled (interrupted by a timeout or any exception),Falseotherwise. Mutually exclusive withis_set().It is reliable only after the task has been rescheduled (that is, primarily for checks on the task’s own side). Attempts at parallel checks from notifiers may return false values (situations where the
set()method has already been called by someone else, but bothis_set()andcancelled()returnFalse; the same applies to races between the task and one notifier when the former is cancelled).
- abstract property shield: bool
A boolean that is
Trueif the__await__()/with_()call will be shielded from external cancellation,Falseotherwise.The effect is mostly equivalent to applying the
shield()universal decorator to the primitive/method, but it is more efficient. Also, any non-negative timeout passed to the method will be ignored.It can be rewritten, but any changes will only take effect until the call.
- abstract property force: bool
A boolean that is
Trueif the__await__()/with_()call will have a forced checkpoint (that is, the call is guaranteed to switch to the event loop and check for cancellation),Falseotherwise.The effect is mostly equivalent to applying the
enable_checkpoints()universal decorator to the primitive/method, but it is more efficient.It can be rewritten, but any changes will only take effect until the call.
- class aiologic.lowlevel.GreenEvent
-
The return type of
create_green_waiter().- abstractmethod wait(timeout: float | None = None) bool
Block (put to sleep) the task until
set()is called from any thread, and then returnTrue.It must be called exactly once (or never) during the object’s lifetime, even if the first call was cancelled due to a timeout. Otherwise, a
RuntimeErroris raised.If the event is already in the set state (and the method has not yet been called), this is equivalent to
green_checkpoint().
- abstractmethod __bool__() bool
Return
Trueif theset()method was successfully called,Falseotherwise. Mutually exclusive withcancelled().It is reliable only after the task has been rescheduled (that is, primarily for checks on the task’s own side). Attempts at parallel checks from notifiers may return false values (situations where the
set()method has already been called by someone else, but bothis_set()andcancelled()returnFalse; the same applies to races between the task and one notifier when the former is cancelled).Used by the standard truth testing procedure.
- abstractmethod set() bool
Put the event into the set state, and return
Trueif this was the first successful attempt (no one preempted it, and the call to thewait()method was not cancelled),Falseotherwise.If the task is already blocked, it is rescheduled. Otherwise, the subsequent call will behave only as a checkpoint (no actual waiting).
- abstractmethod is_set() bool
Return
Trueif theset()method was successfully called,Falseotherwise. Mutually exclusive withcancelled().It is reliable only after the task has been rescheduled (that is, primarily for checks on the task’s own side). Attempts at parallel checks from notifiers may return false values (situations where the
set()method has already been called by someone else, but bothis_set()andcancelled()returnFalse; the same applies to races between the task and one notifier when the former is cancelled).
- abstractmethod cancelled() bool
Return
Trueif the call was cancelled (interrupted by a timeout or any exception),Falseotherwise. Mutually exclusive withis_set().It is reliable only after the task has been rescheduled (that is, primarily for checks on the task’s own side). Attempts at parallel checks from notifiers may return false values (situations where the
set()method has already been called by someone else, but bothis_set()andcancelled()returnFalse; the same applies to races between the task and one notifier when the former is cancelled).
- abstract property shield: bool
A boolean that is
Trueif thewait()call will be shielded from external cancellation,Falseotherwise.The effect is mostly equivalent to applying the
shield()universal decorator to the method, but it is more efficient. Also, any non-negative timeout passed to the method will be ignored.It can be rewritten, but any changes will only take effect until the call.
- abstract property force: bool
A boolean that is
Trueif thewait()call will have a forced checkpoint (that is, the call is guaranteed to switch to the event loop and check for cancellation),Falseotherwise.The effect is mostly equivalent to applying the
enable_checkpoints()universal decorator to the method, but it is more efficient.It can be rewritten, but any changes will only take effect until the call.
- final class aiologic.lowlevel.SetEvent
Bases:
GreenEvent,AsyncEventThe singleton type for
SET_EVENT.
- final class aiologic.lowlevel.DummyEvent
Bases:
GreenEvent,AsyncEventThe singleton type for
DUMMY_EVENT.
- final class aiologic.lowlevel.CancelledEvent
Bases:
GreenEvent,AsyncEventThe singleton type for
CANCELLED_EVENT.
- aiologic.lowlevel.SET_EVENT: SetEvent
The singleton instance representing an event that is always set.
- aiologic.lowlevel.DUMMY_EVENT: DummyEvent
The same as
SET_EVENT.
- aiologic.lowlevel.CANCELLED_EVENT: CancelledEvent
The singleton instance representing an event that is always cancelled.
- aiologic.lowlevel.create_async_event(*, locking: bool = False, shield: bool = False, force: bool = False) AsyncEvent
Create a new instance for a blocking async call (with state).
- Parameters:
locking – If set to
True, the instance will also include the slots ofThreadOnceLock, allowing the latter’s methods to be used with the returned object. This can be used when one-time synchronization on the event is required, but having a separateThreadOnceLockinstance is expensive in terms of memory.shield – See the
shieldproperty.force – See the
forceproperty.
- aiologic.lowlevel.create_green_event(*, locking: bool = False, shield: bool = False, force: bool = False) GreenEvent
Create a new instance for a blocking green call (with state).
- Parameters:
locking – If set to
True, the instance will also include the slots ofThreadOnceLock, allowing the latter’s methods to be used with the returned object. This can be used when one-time synchronization on the event is required, but having a separateThreadOnceLockinstance is expensive in terms of memory.shield – See the
shieldproperty.force – See the
forceproperty.
Locks (low-level)
- final class aiologic.lowlevel.ThreadLock
Bases:
objectThe return type of
create_thread_lock().
- final class aiologic.lowlevel.ThreadRLock
Bases:
objectThe return type of
create_thread_rlock().
- final class aiologic.lowlevel.ThreadOnceLock
Bases:
objectThe return type of
create_thread_oncelock().
- final class aiologic.lowlevel.ThreadDummyLock
Bases:
objectA singleton class for
THREAD_DUMMY_LOCK.
- aiologic.lowlevel.THREAD_DUMMY_LOCK: ThreadDummyLock
A singleton object that mimics a reentrant lock but does nothing.
Can be used as a replacement for a once lock after release (to reduce memory usage).
- aiologic.lowlevel.create_thread_lock() ThreadLock
Create a new instance of a primitive lock that blocks threads.
The same as
threading.Lock, but not affected by monkey patching.
- aiologic.lowlevel.create_thread_rlock() ThreadRLock
Create a new instance of a reentrant lock that blocks threads.
The same as
threading.RLock, but not affected by monkey patching.
- aiologic.lowlevel.create_thread_oncelock() ThreadOnceLock
Create a new instance of a once lock that mimics a reentrant lock but does nothing after release (when the counter reaches zero).
It wakes up all threads at once, thereby solving the square problem, which makes it suitable for creating thread-safe initialization (or any other one-time actions).
Unlike
threading.RLock, it is signal-safe.
- aiologic.lowlevel.once(wrapped: MissingType = MISSING, /, *, reentrant: bool = False) Callable[[Callable[[], _T]], Callable[[], _T]]
- aiologic.lowlevel.once(wrapped: Callable[[], _T], /) Callable[[], _T]
Transform wrapped into a one-time function.
Blocks threads attempting to execute the function in parallel and wakes them up at once upon completion. The result is stored in the closure of the new function and is returned on each subsequent call.
- Parameters:
reentrant – Unless set to
True, recursive attempts to call the function will raise theRuntimeErrorexception. Also affects signal handlers and destructors.- Raises:
RuntimeError – if called recursively and
reentrant=False.
Queues (low-level)
- class aiologic.lowlevel.lazydeque
Bases:
MutableSequence[_T]…
- class aiologic.lowlevel.lazyqueue
Bases:
Generic[_T]…
Advanced topics
Libraries
- aiologic.lowlevel.current_async_library(*, failsafe: Literal[False] = False) str
- aiologic.lowlevel.current_async_library(*, failsafe: Literal[True]) str | None
Detect which async library is currently running.
- Parameters:
failsafe – Unless set to
True, the function will raise an exception when there is no current async library. Otherwise the function returnsNonein that case.- Returns:
A string like
"trio"orNone.- Raises:
AsyncLibraryNotFoundError – if the current async library was not recognized.
Example
async def async_sleep(seconds: float) -> None: match library := aiologic.lowlevel.current_async_library(): case "asyncio": await asyncio.sleep(seconds) case "curio": await curio.sleep(seconds) case "trio": await trio.sleep(seconds) case _: msg = f"unsupported async library {library!r}" raise RuntimeError(msg)
- aiologic.lowlevel.current_green_library(*, failsafe: Literal[False] = False) str
- aiologic.lowlevel.current_green_library(*, failsafe: Literal[True]) str | None
Detect which green library is currently running.
- Parameters:
failsafe – Unless set to
True, the function will raise an exception when there is no current green library. Otherwise the function returnsNonein that case.- Returns:
A string like
"gevent"orNone.- Raises:
GreenLibraryNotFoundError – if the current green library was not recognized.
Example
def green_sleep(seconds: float) -> None: match library := aiologic.lowlevel.current_green_library(): case "threading": time.sleep(seconds) case "eventlet": eventlet.sleep(seconds) case "gevent": gevent.sleep(seconds) case _: msg = f"unsupported green library {library!r}" raise RuntimeError(msg)
- aiologic.lowlevel.current_async_library_tlocal: threading.local
Thread-local data to control the return value of
aiologic.lowlevel.current_async_library().- current_async_library_tlocal.name: str | None = None
Unless set to a non-
Noneobject, the function detects the current async library with its own algorithms. Otherwise the function returns exactly the set object.
Example
library = aiologic.lowlevel.current_async_library_tlocal.name aiologic.lowlevel.current_async_library_tlocal.name = "someio" try: ... # aiologic.lowlevel.current_async_library() == "someio" finally: aiologic.lowlevel.current_async_library_tlocal.name = library
- aiologic.lowlevel.current_green_library_tlocal: threading.local
Thread-local data to control the return value of
aiologic.lowlevel.current_green_library().- current_green_library_tlocal.name: str | None = None
Unless set to a non-
Noneobject, the function detects the current green library with its own algorithms. Otherwise the function returns exactly the set object.
Example
library = aiologic.lowlevel.current_green_library_tlocal.name aiologic.lowlevel.current_green_library_tlocal.name = "somelet" try: ... # aiologic.lowlevel.current_green_library() == "somelet" finally: aiologic.lowlevel.current_green_library_tlocal.name = library
- exception aiologic.lowlevel.AsyncLibraryNotFoundError
Bases:
RuntimeErrorException raised by the
aiologic.lowlevel.current_async_library()function if the current async library was not recognized.
- exception aiologic.lowlevel.GreenLibraryNotFoundError
Bases:
RuntimeErrorException raised by the
aiologic.lowlevel.current_green_library()function if the current green library was not recognized.
Execution units
- aiologic.lowlevel.current_thread() threading.Thread
…
Cancellation and timeouts
- aiologic.lowlevel.shield(wrapped: _AwaitableT, /) _AwaitableT
- aiologic.lowlevel.shield(wrapped: _CallableT, /) _CallableT
…
Checkpoints and fairness
- async aiologic.lowlevel.async_checkpoint(*, force: bool = False) None
A pure async checkpoint.
It checks for cancellation and allows the scheduler to switch to another task. In many ways, it is similar to
async_sleep(0), but has the following differences:It can have a more efficient implementation.
It can be enabled/disabled in the current context.
The latter distinguishes aiologic checkpoints from Trio/AnyIO checkpoints. You can control whether checkpoints are enabled or not in the following ways (in order of priority):
Set
AIOLOGIC_ASYNC_CHECKPOINTSenvironment variable to any non-empty value. This will enable async checkpoints for all async libraries (in all threads). The empty value has the opposite effect.Set
AIOLOGIC_<ASYNC_LIBRARY>_CHECKPOINTSenvironment variable to any non-empty value. This will enable async checkpoints for the specified async library (in all threads). The empty value has the opposite effect.Use
enable_checkpoints()/disable_checkpoints()to control the state of checkpoints in the current context.Pass
force=Trueto force a checkpoint.
Note
High-level primitives (and some low-level ones) implement checkpoints for all blocking calls (regardless of whether waiting is required or not), and enabling/disabling checkpoints also affects them, so you can use checkpoints even without checkpoint functions if there are blocking calls (it is enough to explicitly enable checkpoints).
async with lock: # an async checkpoint (if enabled) # ...exclusive access...
Furthermore, the library implements a “one blocking call, one checkpoint” concept, which gives a predictable number of context switches per call (zero or one if disabled, exactly one if enabled).
By default, async checkpoints are enabled for Trio only.
- aiologic.lowlevel.green_checkpoint(*, force: bool = False) None
A pure green checkpoint.
It checks for cancellation and allows the scheduler to switch to another task. In many ways, it is similar to
green_sleep(0), but has the following differences:It can have a more efficient implementation.
It can be enabled/disabled in the current context.
The latter distinguishes aiologic checkpoints from Trio/AnyIO checkpoints. You can control whether checkpoints are enabled or not in the following ways (in order of priority):
Set
AIOLOGIC_GREEN_CHECKPOINTSenvironment variable to any non-empty value. This will enable green checkpoints for all green libraries (in all threads). The empty value has the opposite effect.Set
AIOLOGIC_<GREEN_LIBRARY>_CHECKPOINTSenvironment variable to any non-empty value. This will enable green checkpoints for the specified green library (in all threads). The empty value has the opposite effect.Use
enable_checkpoints()/disable_checkpoints()to control the state of checkpoints in the current context.Pass
force=Trueto force a checkpoint.
Note
High-level primitives (and some low-level ones) implement checkpoints for all blocking calls (regardless of whether waiting is required or not), and enabling/disabling checkpoints also affects them, so you can use checkpoints even without checkpoint functions if there are blocking calls (it is enough to explicitly enable checkpoints).
with lock: # a green checkpoint (if enabled) # ...exclusive access...
Furthermore, the library implements a “one blocking call, one checkpoint” concept, which gives a predictable number of context switches per call (zero or one if disabled, exactly one if enabled).
By default, green checkpoints are not enabled for any library.
- async aiologic.lowlevel.async_checkpoint_if_cancelled(*, force: bool = False) None
Issue an async checkpoint if the calling context has been cancelled.
Used in conjunction with
shield()to check for cancellation before shielding, and is not suitable for any other use case.
- aiologic.lowlevel.green_checkpoint_if_cancelled(*, force: bool = False) None
Issue a green checkpoint if the calling context has been cancelled.
Used in conjunction with
shield()to check for cancellation before shielding, and is not suitable for any other use case.
- aiologic.lowlevel.async_checkpoint_enabled() bool
Return
Trueif async checkpoints are enabled in the current context,Falseotherwise.
- aiologic.lowlevel.green_checkpoint_enabled() bool
Return
Trueif green checkpoints are enabled in the current context,Falseotherwise.
- aiologic.lowlevel.enable_checkpoints(wrapped: MissingType = MISSING, /) _CheckpointsManager
- aiologic.lowlevel.enable_checkpoints(wrapped: _AwaitableT, /) _AwaitableT
- aiologic.lowlevel.enable_checkpoints(wrapped: _CallableT, /) _CallableT
Enable checkpoints in the current context.
If a callable object is passed, it is wrapped with a universal decorator and a callable proxy is returned. If an awaitable object is passed, it is also wrapped and an awaitable proxy is returned. If nothing is passed, a sync/async context manager is returned.
To distinguish between green and async functions,
aiologic.meta.iscoroutinefactory()is used. Therefore, if you implement your own callable object that returns a coroutine object, consider usingaiologic.meta.markcoroutinefactory().Example
>>> async_checkpoint_enabled() False >>> async with enable_checkpoints(): ... async_checkpoint_enabled() True >>> async_checkpoint_enabled() False >>> async def test(): ... return async_checkpoint_enabled() >>> await test() False >>> await enable_checkpoints(test)() # for a callable object True >>> await enable_checkpoints(test()) # for an awaitable object True >>> await test() False
- aiologic.lowlevel.disable_checkpoints(wrapped: MissingType = MISSING, /) _NoCheckpointsManager
- aiologic.lowlevel.disable_checkpoints(wrapped: _AwaitableT, /) _AwaitableT
- aiologic.lowlevel.disable_checkpoints(wrapped: _CallableT, /) _CallableT
Disable checkpoints in the current context.
Like
enable_checkpoints(), but with the opposite effect.Example
>>> with enable_checkpoints(): ... green_checkpoint_enabled() ... with disable_checkpoints(): ... green_checkpoint_enabled() ... green_checkpoint_enabled() True False True
Safety and reentrancy
- aiologic.lowlevel.signal_safety_enabled() bool
Return
Trueif signal-safety is enabled in the current context,Falseotherwise.
- aiologic.lowlevel.enable_signal_safety(wrapped: MissingType = MISSING, /) _SignalSafetyManager
- aiologic.lowlevel.enable_signal_safety(wrapped: _AwaitableT, /) _AwaitableT
- aiologic.lowlevel.enable_signal_safety(wrapped: _CallableT, /) _CallableT
Enable signal-safety in the current context.
If a callable object is passed, it is wrapped with a universal decorator and a callable proxy is returned. If an awaitable object is passed, it is also wrapped and an awaitable proxy is returned. If nothing is passed, a sync/async context manager is returned.
To distinguish between green and async functions,
aiologic.meta.iscoroutinefactory()is used. Therefore, if you implement your own callable object that returns a coroutine object, consider usingaiologic.meta.markcoroutinefactory().Example
>>> signal_safety_enabled() False >>> async with enable_signal_safety(): ... signal_safety_enabled() True >>> signal_safety_enabled() False >>> async def test(): ... return signal_safety_enabled() >>> await test() False >>> await enable_signal_safety(test)() # for a callable object True >>> await enable_signal_safety(test()) # for an awaitable object True >>> await test() False
- aiologic.lowlevel.disable_signal_safety(wrapped: MissingType = MISSING, /) _NoSignalSafetyManager
- aiologic.lowlevel.disable_signal_safety(wrapped: _AwaitableT, /) _AwaitableT
- aiologic.lowlevel.disable_signal_safety(wrapped: _CallableT, /) _CallableT
Disable signal-safety in the current context.
Like
enable_signal_safety(), but with the opposite effect.Example
>>> with enable_signal_safety(): ... signal_safety_enabled() ... with disable_signal_safety(): ... signal_safety_enabled() ... signal_safety_enabled() True False True
Metaprogramming
Singletons and markers
- class aiologic.meta.SingletonEnum
Bases:
EnumA base class for type-checker-friendly singleton classes whose instances will be defined at the module level.
Unlike
enum.Enum, it prohibits setting attributes that are not explicitly declared via__slots__.Example
>>> class SingletonType(SingletonEnum): ... __slots__ = ('_x',) ... SINGLETON = 'SINGLETON' >>> SINGLETON = SingletonType.SINGLETON >>> repr(SINGLETON) == f"{__name__}.SINGLETON" True >>> SINGLETON._x = 1 # ok >>> SINGLETON._y = 2 Traceback (most recent call last): AttributeError: 'SingletonType' object has no attribute '_y'
- final class aiologic.meta.DefaultType
Bases:
SingletonEnumA singleton class for
DEFAULT; mimicsNoneType.
- final class aiologic.meta.MissingType
Bases:
SingletonEnumA singleton class for
MISSING; mimicsNoneType.
- aiologic.meta.DEFAULT: DefaultType
A singleton object for default values; mimics
None.Used as a marker to indicate that some object will be used by default (without any special behavior).
- aiologic.meta.MISSING: MissingType
A singleton object for default values; mimics
None.Used as a marker to indicate that some special behavior will be used by default (cannot be achieved by passing any value).
Can also be used outside of parameters to indicate that there is no object.
Introspecting
- aiologic.meta.lookup_static(owner: type, name: str, /) Any
- aiologic.meta.lookup_static(owner: type, name: str, /, default: _T) Any | _T
…
- aiologic.meta.resolve_special(owner: type, name: str, instance: None = None, /) Any
- aiologic.meta.resolve_special(owner: type[_T1], name: str, instance: _T1, /) Any
- aiologic.meta.resolve_special(owner: type, name: str, instance: None = None, /, *, default: _T2) Any | _T2
- aiologic.meta.resolve_special(owner: type[_T1], name: str, instance: _T1, /, *, default: _T2) Any | _T2
…
- aiologic.meta.issubclass_static(obj: object, class_or_tuple: type | tuple[_ClassInfo, ...], /) bool
…
- aiologic.meta.isinstance_static(obj: object, class_or_tuple: type | tuple[_ClassInfo, ...], /) bool
…
- aiologic.meta.isgeneratorlike(obj: object, /) TypeIs[Generator[Any, Any, Any]]
Return
Trueif the object looks like a generator iterator, that is, implementscollections.abc.Generator, andFalseotherwise.Example
>>> from collections.abc import Generator >>> class SimpleGenerator(Generator): ... def send(self, value): ... return super().send(value) ... def throw(self, typ, val=None, tb=None): ... return super().throw(typ, val, tb) >>> def generator_function(): ... return ... yield >>> isgeneratorlike(object()) False >>> isgeneratorlike(gen := generator_function()) True >>> isgeneratorlike(SimpleGenerator()) True
- aiologic.meta.iscoroutinelike(obj: object, /) TypeIs[Coroutine[Any, Any, Any]]
Return
Trueif the object looks like a coroutine, that is, implementscollections.abc.Coroutine, andFalseotherwise.Example
>>> from collections.abc import Coroutine, Generator >>> class SimpleCoroutine(Coroutine, Generator): ... def __await__(self): ... return self ... def send(self, value): ... return super().send(value) ... def throw(self, typ, val=None, tb=None): ... return super().throw(typ, val, tb) >>> async def coroutine_function(): ... pass >>> iscoroutinelike(object()) False >>> iscoroutinelike(coro := coroutine_function()) True >>> iscoroutinelike(SimpleCoroutine()) True >>> await coro # to avoid `RuntimeWarning`
Caution
Some objects, such as generator-based coroutines (see
types.coroutine()), may not have the__await__()method but still behave like coroutine objects. They are also treated as coroutine-like objects. So if you want to get an iterator for such an object, consider usingawait_for(obj).__await__().
- aiologic.meta.isasyncgenlike(obj: object, /) TypeIs[AsyncGenerator[Any, Any]]
Return
Trueif the object looks like an asynchronous generator iterator, that is, implementscollections.abc.AsyncGenerator, andFalseotherwise.Example
>>> from collections.abc import AsyncGenerator >>> class SimpleAsyncGenerator(AsyncGenerator): ... async def asend(self, value): ... return await super().send(value) ... async def athrow(self, typ, val=None, tb=None): ... return await super().throw(typ, val, tb) >>> async def asyncgen_function(): ... return ... yield >>> isasyncgenlike(object()) False >>> isasyncgenlike(asyncgen := asyncgen_function()) True >>> isasyncgenlike(SimpleAsyncGenerator()) True
- aiologic.meta.isgeneratorfactory(obj: Callable[[...], Generator[Any, Any, Any]], /) bool
- aiologic.meta.isgeneratorfactory(obj: Callable[[_P], Awaitable[_T]], /) TypeGuard[Callable[[_P], Generator[Any, Any, _T]]]
- aiologic.meta.isgeneratorfactory(obj: Callable[[_P], object], /) TypeGuard[Callable[[_P], Generator[Any, Any, Any]]]
- aiologic.meta.isgeneratorfactory(obj: object, /) TypeGuard[Callable[[...], Generator[Any, Any, Any]]]
Return
Trueif the object returns a generator iterator when called,Falseotherwise.The following objects are treated as generator factories by default:
A generator function. This is true for both user-defined functions and some compiled functions that look like such functions (for example, functions compiled via Cython).
A generator type (a class whose instances look like generator iterators; see
isgeneratorlike()).An object manually marked with
markgeneratorfactory().
Example
>>> from collections.abc import Generator >>> class SimpleGenerator(Generator): ... def send(self, value): ... return super().send(value) ... def throw(self, typ, val=None, tb=None): ... return super().throw(typ, val, tb) >>> def generator_function(): ... return ... yield >>> isgeneratorfactory(lambda: None) False >>> isgeneratorfactory(generator_function) True >>> isgeneratorfactory(SimpleGenerator) True
For all others, to determine whether an object is a generator factory, a recursive algorithm is used that handles at least the following cases:
If it is a function defined by
functools.partialmethodfor some object, the latter is checked.If it is a partial object (an instance of
functools.partial()), the object it wraps (functools.partial.func) is checked.If it is a bound method, the corresponding object (
method.__func__) is checked.If it is a callable object, its
__call__()method is checked.
Example
>>> from functools import partial, partialmethod >>> class CustomGeneratorCallable: ... def __call__(self): ... return ... yield ... get = partialmethod(__call__) >>> class ComplexGeneratorCallable: ... __call__ = CustomGeneratorCallable() >>> isgeneratorfactory(CustomGeneratorCallable()) True >>> isgeneratorfactory(CustomGeneratorCallable().get) True >>> isgeneratorfactory(CustomGeneratorCallable().__call__) True >>> isgeneratorfactory(partial(CustomGeneratorCallable())) True >>> isgeneratorfactory(ComplexGeneratorCallable()) True
- aiologic.meta.iscoroutinefactory(obj: Callable[[...], Coroutine[Any, Any, Any]], /) bool
- aiologic.meta.iscoroutinefactory(obj: Callable[[_P], Awaitable[_T]], /) TypeGuard[Callable[[_P], Coroutine[Any, Any, _T]]]
- aiologic.meta.iscoroutinefactory(obj: Callable[[_P], object], /) TypeGuard[Callable[[_P], Coroutine[Any, Any, Any]]]
- aiologic.meta.iscoroutinefactory(obj: object, /) TypeGuard[Callable[[...], Coroutine[Any, Any, Any]]]
Return
Trueif the object returns a coroutine when called,Falseotherwise.The following objects are treated as coroutine factories by default:
A coroutine function (a function defined with an
async defsyntax). This is true for both user-defined functions and some compiled functions that look like such functions (for example, functions compiled via Cython).A coroutine type (a class whose instances look like coroutines; see
iscoroutinelike()).A generator-based coroutine function marked with
asyncio.coroutine()or the corresponding standard marker (on Python <3.12).An object manually marked with
inspect.markcoroutinefunction()or the corresponding standard marker (on Python ≥3.12).An object manually marked with
markcoroutinefactory().
Example
>>> from collections.abc import Coroutine, Generator >>> class SimpleCoroutine(Coroutine, Generator): ... def __await__(self): ... return self ... def send(self, value): ... return super().send(value) ... def throw(self, typ, val=None, tb=None): ... return super().throw(typ, val, tb) >>> async def coroutine_function(): ... pass >>> iscoroutinefactory(lambda: None) False >>> iscoroutinefactory(coroutine_function) True >>> iscoroutinefactory(SimpleCoroutine) True
For all others, to determine whether an object is a coroutine factory, a recursive algorithm is used that handles at least the following cases:
If it is a function defined by
functools.partialmethodfor some object, the latter is checked.If it is a partial object (an instance of
functools.partial()), the object it wraps (functools.partial.func) is checked.If it is a bound method, the corresponding object (
method.__func__) is checked.If it is a callable object, its
__call__()method is checked.
Example
>>> from functools import partial, partialmethod >>> class CustomCoroutineCallable: ... async def __call__(self): ... pass ... get = partialmethod(__call__) >>> class ComplexCoroutineCallable: ... __call__ = CustomCoroutineCallable() >>> iscoroutinefactory(CustomCoroutineCallable()) True >>> iscoroutinefactory(CustomCoroutineCallable().get) True >>> iscoroutinefactory(CustomCoroutineCallable().__call__) True >>> iscoroutinefactory(partial(CustomCoroutineCallable())) True >>> iscoroutinefactory(ComplexCoroutineCallable()) True
- aiologic.meta.isasyncgenfactory(obj: Callable[[...], AsyncGenerator[Any, Any]], /) bool
- aiologic.meta.isasyncgenfactory(obj: Callable[[_P], object], /) TypeGuard[Callable[[_P], AsyncGenerator[Any, Any]]]
- aiologic.meta.isasyncgenfactory(obj: object, /) TypeGuard[Callable[[...], AsyncGenerator[Any, Any]]]
Return
Trueif the object returns an asynchronous generator iterator when called,Falseotherwise.The following objects are treated as asynchronous generator factories by default:
An asynchronous generator function. This is true for both user-defined functions and some compiled functions that look like such functions (for example, functions compiled via Cython).
An asynchronous generator type (a class whose instances look like asynchronous generator iterators; see
isasyncgenlike()).An object manually marked with
markasyncgenfactory().
Example
>>> from collections.abc import AsyncGenerator >>> class SimpleAsyncGenerator(AsyncGenerator): ... async def asend(self, value): ... return await super().send(value) ... async def athrow(self, typ, val=None, tb=None): ... return await super().throw(typ, val, tb) >>> async def asyncgen_function(): ... return ... yield >>> isasyncgenfactory(lambda: None) False >>> isasyncgenfactory(asyncgen_function) True >>> isasyncgenfactory(SimpleAsyncGenerator) True
For all others, to determine whether an object is an asynchronous generator factory, a recursive algorithm is used that handles at least the following cases:
If it is a function defined by
functools.partialmethodfor some object, the latter is checked.If it is a partial object (an instance of
functools.partial()), the object it wraps (functools.partial.func) is checked.If it is a bound method, the corresponding object (
method.__func__) is checked.If it is a callable object, its
__call__()method is checked.
Example
>>> from functools import partial, partialmethod >>> class CustomAsyncGeneratorCallable: ... async def __call__(self): ... return ... yield ... get = partialmethod(__call__) >>> class ComplexAsyncGeneratorCallable: ... __call__ = CustomAsyncGeneratorCallable() >>> isasyncgenfactory(CustomAsyncGeneratorCallable()) True >>> isasyncgenfactory(CustomAsyncGeneratorCallable().get) True >>> isasyncgenfactory(CustomAsyncGeneratorCallable().__call__) True >>> isasyncgenfactory(partial(CustomAsyncGeneratorCallable())) True >>> isasyncgenfactory(ComplexAsyncGeneratorCallable()) True
- aiologic.meta.markgeneratorfactory(factory: _CallableT, /) _CallableT
…
- aiologic.meta.markcoroutinefactory(factory: _CallableT, /) _CallableT
…
- aiologic.meta.markasyncgenfactory(factory: _CallableT, /) _CallableT
…
Transformers
- aiologic.meta.generator(func: Callable[[_P], Generator[_YieldT, _SendT, _ReturnT]], /) Callable[[_P], Generator[_YieldT, _SendT, _ReturnT]]
- aiologic.meta.generator(func: Callable[[_P], Coroutine[_YieldT, _SendT, _ReturnT]], /) Callable[[_P], Generator[_YieldT, _SendT, _ReturnT]]
- aiologic.meta.generator(func: Callable[[_P], Awaitable[_T]], /) Callable[[_P], Generator[Any, Any, _T]]
- aiologic.meta.generator(func: Callable[[_P], object], /) Callable[[_P], Generator[Any, Any, Any]]
- aiologic.meta.generator(func: object, /) Callable[[...], Generator[Any, Any, Any]]
…
- aiologic.meta.coroutine(func: Callable[[_P], Generator[_YieldT, _SendT, _ReturnT]], /) Callable[[_P], Coroutine[_YieldT, _SendT, _ReturnT]]
- aiologic.meta.coroutine(func: Callable[[_P], Coroutine[_YieldT, _SendT, _ReturnT]], /) Callable[[_P], Coroutine[_YieldT, _SendT, _ReturnT]]
- aiologic.meta.coroutine(func: Callable[[_P], Awaitable[_T]], /) Callable[[_P], Coroutine[Any, Any, _T]]
- aiologic.meta.coroutine(func: Callable[[_P], object], /) Callable[[_P], Coroutine[Any, Any, Any]]
- aiologic.meta.coroutine(func: object, /) Callable[[...], Coroutine[Any, Any, Any]]
…
Signatures
Functions
- aiologic.meta.replaces(namespace: MutableMapping[str, Any], replacer: MissingType = MISSING, /) Callable[[_NamedCallableT], _NamedCallableT]
- aiologic.meta.replaces(namespace: MutableMapping[str, Any], replacer: _NamedCallableT, /) _NamedCallableT
Wrap and replace the function of the same name in namespace.
Unlike
functools.wraps(), excludes the__wrapped__attribute to avoid memory leaks in a multithreaded environment.Used for global rebinding.
- Raises:
LookupError – if there is no function of the same name in namespace.
Example
>>> def sketch(): ... return 'parrot' >>> def replace_sketch(): ... @replaces(globals()) ... def sketch(): ... return 'ex-parrot' >>> sketch() 'parrot' >>> replace_sketch() >>> sketch() 'ex-parrot'
- aiologic.meta.replaces_when_imported(namespace: MutableMapping[str, Any], module_name: str, replacer: MissingType = MISSING, /) Callable[[_NamedCallableT], _NamedCallableT]
- aiologic.meta.replaces_when_imported(namespace: MutableMapping[str, Any], module_name: str, replacer: _NamedCallableT, /) _NamedCallableT
Wrap and replace the function of the same name in namespace, but only when the specified module_name is imported.
It has the same effect as
replaces()but is delayed: when called, it registers a post import hook bound to the replaced function, which fires when someone imports the target module, and only then does the effect actually take place. Additionally, if namespace is a module namespace, the hook is deactivated when the module is reloaded (e.g., byimportlib.reload()) to prevent incorrect replacements.Used to implement support for optional dependencies.
- Raises:
LookupError – if there is no function of the same name in namespace.
Example
>>> def in_asyncio_context(): ... return False >>> @replaces_when_imported(globals(), 'asyncio') ... def in_asyncio_context(): ... import asyncio ... @replaces(globals()) ... def in_asyncio_context(): ... return asyncio._get_running_loop() is not None ... return in_asyncio_context()
- aiologic.meta.copies(original: Callable[[_P], _T], replaced: MissingType = MISSING, /) Callable[[Callable[[_P], _T]], Callable[[_P], _T]]
- aiologic.meta.copies(original: Callable[[_P], _T], replaced: Callable[[_P], _T], /) Callable[[_P], _T]
Replace with a copy of original if that is a user-defined function, and make the copy look like replaced function.
If original and replaced are the same function, it forces copying (
copies(func, func)is always a new copy offunc). Otherwise, does nothing on type checking to prevent type errors.Used to optimize functions which delegate all the work to others.
- Raises:
TypeError – if copying is forced and original is not a user-defined function.
Example
>>> def sig1(): ... return 'spam' >>> @copies(sig1) ... def sig2(): ... return sig1() >>> sig1() == sig2() True >>> sig1 is sig2 False >>> sig1.__name__ == sig2.__name__ False >>> sig1.__code__ is sig2.__code__ True
Modules
- aiologic.meta.resolve_name(name: str, package: str | None) str
Resolve a relative module name to an absolute one.
Like
importlib.util.resolve_name(), but raisesValueErrorinstead ofImportErroron Python ≥3.9 to achieve consistent behavior across all supported versions of Python.Example
>>> resolve_name('x.y', 'a.b') # an absolute one 'x.y' >>> resolve_name('.x.y', 'a.b') 'a.b.x.y' >>> resolve_name('..x.y', 'a.b') 'a.x.y' >>> resolve_name('...x.y', 'a.b') Traceback (most recent call last): ... ValueError: `name` is beyond the top-level package >>> resolve_name('.', 'a.b') 'a.b' >>> resolve_name('..', 'a.b') 'a' >>> resolve_name('...', 'a.b') Traceback (most recent call last): ... ValueError: `name` is beyond the top-level package
Imports
- aiologic.meta.import_module(name: str, package: str | None = None) ModuleType
Import a module by name (absolute or relative to package).
Like
importlib.import_module(), but raisesImportErrorinstead ofValueErroron Python <3.9 to achieve consistent behavior across all supported versions of Python.Example
>>> import_module('sys') <module 'sys' (built-in)> >>> import_module('sys') is import_module('sys') True >>> import_module('.abc', 'collections').__name__ 'collections.abc' >>> import_module('..abc', 'collections').__name__ Traceback (most recent call last): ImportError: attempted relative import beyond top-level package
- aiologic.meta.import_from(module: ModuleType | str, name0: str, /, *, package: str | None = None) Any
- aiologic.meta.import_from(module: ModuleType | str, name0: str, name1: str, /, *names: str, package: str | None = None) tuple[Any, ...]
Import objects by given names from the specified module.
If module is a string,
import_module(module, package)is used to obtain the module object. It is also used to attempt to import a submodule if the module does not contain an attribute of the same name.- Raises:
ImportError – if any given name is not in the module.
Example
>>> pi = import_from('math', 'pi') >>> pi 3.141592653589793 >>> pi, e = import_from('math', 'pi', 'e') >>> pi, e (3.141592653589793, 2.718281828459045) >>> import_from('sys', 'circus') Traceback (most recent call last): ImportError: cannot import name 'circus' from 'sys' (...)
- aiologic.meta.import_original(module: ModuleType | str, name0: str, /, *, package: str | None = None) Any
- aiologic.meta.import_original(module: ModuleType | str, name0: str, name1: str, /, *names: str, package: str | None = None) tuple[Any, ...]
Import unpatched objects by given names from the specified module.
It behaves the same way as
import_from(), except that it attempts to import the objects that have been replaced by some green library (e.g., as a result of callinggevent.monkey.patch_all()) instead of the patched ones. However, keep in mind that:Functions that are not built-in access their
__globals__attribute. Therefore, if the returned object was defined in the same module where its patched dependencies are located (as in the case of gevent), it may not behave as expected. For example,threading.Semaphoremay still use the patchedthreading.Lockfor its blocking operations.If the returned object was defined in a fake module (as in the case of eventlet), it may still not behave as expected. For example, all started threads may be treated as daemonic just because they are not registered in the global dictionary of the patched
threadingmodule.Not all original objects may be available, since some are removed rather than replaced.
Because of the above, you should avoid using any objects that depend in any way on the module’s state. Check the source code to determine which objects can be used reliably.
Unlike
import_from(), it does not import submodules of patched modules.
Exports
- aiologic.meta.export(package_namespace: ModuleType | MutableMapping[str, object], /) None
Prepare package_namespace for external use.
Its contents must be structured as follows:
Every non-public submodule/subpackage that is part of the implementation has a name that starts with the underscore character (
package._util).Every public submodule/subpackage that is available for direct use has a name that does not start with the underscore character (
package.abc).Every member of a public submodule/subpackage (including the package itself) follows the same naming rules.
The result of applying the function will be to update attributes of all public members so that they look as if they were defined directly in the package. If a public submodule/subpackage or class is encountered, the function is also applied recursively to its members. Additionally, for each public submodule/subpackage (including the package itself), a human-readable
__all__is built, which includes the names of all public members that are not submodules/subpackages.Typically, the usage is as follows:
export(globals())near the end of__init__.py. This allows the package to be safely split into subpackages and submodules without breaking pickling on incompatible implementation changes and while preserving convenient representations (which is especially important for exceptions).Caution
If the function updates the same object by different names (or in different namespaces), the result is undefined, especially when parallel calls are made. So avoid providing access to the same object in different ways.
- aiologic.meta.export_dynamic(module_namespace: ModuleType | MutableMapping[str, object], link_name: str, target: str, /) None
Register a dynamic export (symbolic link) in the specified module_namespace.
On the first call, the function defines
__getattr__()in module_namespace. When attempting to retrieve an undefined attribute from the module object by link_name, it imports target viaimport_from(), updates its attributes, caches it in the namespace, and returns it.target can be an absolute path (
package.module.attribute) or a relative path (..attribute). If it does not contain the dot character, the name relative to the module is implied (nameis equivalent to.name).Useful for defining optional package members that are not available in all environments.
- Raises:
RuntimeError – if link_name cannot be registered.
ValueError – if target is beyond the top-level package.
- aiologic.meta.export_deprecated(module_namespace: ModuleType | MutableMapping[str, object], link_name: str, target: str, /) None
Register a deprecated export (symbolic link) in the specified module_namespace.
Like
export_dynamic(), but raisesDeprecationWarningon the first attempt to access the attribute, and never updates attributes of the latter.Useful for providing a temporary alias by the old name to a renamed object.
Helpers
- class aiologic.meta.GeneratorCoroutineWrapper
Bases:
Generator[_YieldT_co,_SendT_contra,_ReturnT_co],Coroutine[_YieldT_co,_SendT_contra,_ReturnT_co],Generic[_YieldT_co,_SendT_contra,_ReturnT_co]…
- __init__(wrapped: Generator[_YieldT_co, _SendT_contra, _ReturnT_co] | Coroutine[_YieldT_co, _SendT_contra, _ReturnT_co], /) None
…
- __next__() _YieldT_co
…
- send(value: _SendT_contra, /) _YieldT_co
…
- throw(exc_type: type[BaseException], exc_value: BaseException | object = None, traceback: TracebackType | None = None, /) _YieldT_co
- throw(exc_type: BaseException, exc_value: None = None, traceback: TracebackType | None = None, /) _YieldT_co
…
- property gi: Generator[_YieldT_co, _SendT_contra, _ReturnT_co]
The underlying generator-like object (see
isgeneratorlike()), if the wrapped object is one. Otherwise, raisesAttributeError.Example
>>> def generator_function(): ... return ... yield # generator definition >>> async def coroutine_function(): ... pass >>> gen = GeneratorCoroutineWrapper(generator_function()) >>> gen.gi <generator object generator_function at ...> >>> gen.close() >>> coro = GeneratorCoroutineWrapper(coroutine_function()) >>> coro.gi Traceback (most recent call last): AttributeError: the wrapped object is not a generator >>> coro.close()
- property cr: Coroutine[_YieldT_co, _SendT_contra, _ReturnT_co]
The underlying coroutine-like object (see
iscoroutinelike()), if the wrapped object is one. Otherwise, raisesAttributeError.Example
>>> def generator_function(): ... return ... yield # generator definition >>> async def coroutine_function(): ... pass >>> gen = GeneratorCoroutineWrapper(generator_function()) >>> gen.cr Traceback (most recent call last): AttributeError: the wrapped object is not a coroutine >>> gen.close() >>> coro = GeneratorCoroutineWrapper(coroutine_function()) >>> coro.cr <coroutine object coroutine_function at ...> >>> coro.close()
- property gi_code: CodeType
An associated code object of the wrapped object.
This property is provided for compatibility with
types.GeneratorTypeinstances. It is equivalent to one of the following:If none is available, raises
AttributeError.Example
>>> def generator_function(): ... return ... yield # generator definition >>> async def coroutine_function(): ... pass >>> gen = GeneratorCoroutineWrapper(generator_function()) >>> gen.gi_code <code object generator_function at ...> >>> gen.close() >>> coro = GeneratorCoroutineWrapper(coroutine_function()) >>> coro.gi_code <code object coroutine_function at ...> >>> coro.close()
- property cr_code: CodeType
An associated code object of the wrapped object.
This property is provided for compatibility with
types.CoroutineTypeinstances. It is equivalent to one of the following:If none is available, raises
AttributeError.Example
>>> def generator_function(): ... return ... yield # generator definition >>> async def coroutine_function(): ... pass >>> gen = GeneratorCoroutineWrapper(generator_function()) >>> gen.cr_code <code object generator_function at ...> >>> gen.close() >>> coro = GeneratorCoroutineWrapper(coroutine_function()) >>> coro.cr_code <code object coroutine_function at ...> >>> coro.close()
- property gi_frame: FrameType | None
An associated frame object of the wrapped object, or
None(if execution has completed).This property is provided for compatibility with
types.GeneratorTypeinstances. It is equivalent to one of the following:If none is available, raises
AttributeError.Example
>>> def generator_function(): ... return ... yield # generator definition >>> async def coroutine_function(): ... pass >>> gen = GeneratorCoroutineWrapper(generator_function()) >>> gen.gi_frame <frame at ...> >>> gen.close() >>> gen.gi_frame is None True >>> coro = GeneratorCoroutineWrapper(coroutine_function()) >>> coro.gi_frame <frame at ...> >>> coro.close() >>> coro.gi_frame is None True
- property cr_frame: FrameType | None
An associated frame object of the wrapped object, or
None(if execution has completed).This property is provided for compatibility with
types.CoroutineTypeinstances. It is equivalent to one of the following:If none is available, raises
AttributeError.Example
>>> def generator_function(): ... return ... yield # generator definition >>> async def coroutine_function(): ... pass >>> gen = GeneratorCoroutineWrapper(generator_function()) >>> gen.cr_frame <frame at ...> >>> gen.close() >>> gen.cr_frame is None True >>> coro = GeneratorCoroutineWrapper(coroutine_function()) >>> coro.cr_frame <frame at ...> >>> coro.close() >>> coro.cr_frame is None True
- property gi_running: bool
A boolean that is
Trueif the wrapped object is currently being executed by the interpreter (created, not suspended, and not closed),Falseotherwise (seeinspect.getgeneratorstate()).This property is provided for compatibility with
types.GeneratorTypeinstances. It is equivalent to one of the following:If none is available, raises
AttributeError.Example
>>> def generator_function(): ... yield from [target.gi_running] >>> async def coroutine_function(): ... await GeneratorCoroutineWrapper(generator_function()) >>> gen = GeneratorCoroutineWrapper(generator_function()) >>> next(target := gen) True >>> gen.gi_running False >>> gen.close() >>> coro = GeneratorCoroutineWrapper(coroutine_function()) >>> next(target := coro) True >>> coro.gi_running False >>> coro.close()
- property cr_running: bool
A boolean that is
Trueif the wrapped object is currently being executed by the interpreter (created, not suspended, and not closed),Falseotherwise (seeinspect.getcoroutinestate()).This property is provided for compatibility with
types.CoroutineTypeinstances. It is equivalent to one of the following:If none is available, raises
AttributeError.Example
>>> def generator_function(): ... yield from [target.cr_running] >>> async def coroutine_function(): ... await GeneratorCoroutineWrapper(generator_function()) >>> gen = GeneratorCoroutineWrapper(generator_function()) >>> next(target := gen) True >>> gen.cr_running False >>> gen.close() >>> coro = GeneratorCoroutineWrapper(coroutine_function()) >>> next(target := coro) True >>> coro.cr_running False >>> coro.close()
- property gi_suspended: bool
A boolean that is
Trueif the wrapped object is currently suspended (created, not running, and not closed),Falseotherwise (seeinspect.getgeneratorstate()).This property is provided for compatibility with
types.GeneratorTypeinstances (Python ≥3.11). It is equivalent to one of the following:If none is available, raises
AttributeError.Example
>>> def generator_function(): ... yield from [target.gi_suspended] >>> async def coroutine_function(): ... await GeneratorCoroutineWrapper(generator_function()) >>> gen = GeneratorCoroutineWrapper(generator_function()) >>> next(target := gen) False >>> gen.gi_suspended True >>> gen.close() >>> coro = GeneratorCoroutineWrapper(coroutine_function()) >>> next(target := coro) False >>> coro.gi_suspended True >>> coro.close()
- property cr_suspended: bool
A boolean that is
Trueif the wrapped object is currently suspended (created, not running, and not closed),Falseotherwise (seeinspect.getcoroutinestate()).This property is provided for compatibility with
types.CoroutineTypeinstances (Python ≥3.11). It is equivalent to one of the following:If none is available, raises
AttributeError.Example
>>> def generator_function(): ... yield from [target.cr_suspended] >>> async def coroutine_function(): ... await GeneratorCoroutineWrapper(generator_function()) >>> gen = GeneratorCoroutineWrapper(generator_function()) >>> next(target := gen) False >>> gen.cr_suspended True >>> gen.close() >>> coro = GeneratorCoroutineWrapper(coroutine_function()) >>> next(target := coro) False >>> coro.cr_suspended True >>> coro.close()
- property gi_yieldfrom: object | None
An associated iterated object of the wrapped object, or
None.This property is provided for compatibility with
types.GeneratorTypeinstances. It is equivalent to one of the following:If none is available, raises
AttributeError.Example
>>> def generator_function(): ... yield from [target.gi_yieldfrom] >>> async def coroutine_function(): ... await GeneratorCoroutineWrapper(generator_function()) >>> gen = GeneratorCoroutineWrapper(generator_function()) >>> next(target := gen) is None True >>> gen.gi_yieldfrom <list_iterator object at ...> >>> gen.close() >>> coro = GeneratorCoroutineWrapper(coroutine_function()) >>> next(target := coro) is None True >>> coro.gi_yieldfrom <aiologic.meta.GeneratorCoroutineWrapper object at ...> >>> coro.close()
- property cr_await: object | None
An associated iterated object of the wrapped object, or
None.This property is provided for compatibility with
types.CoroutineTypeinstances. It is equivalent to one of the following:If none is available, raises
AttributeError.Example
>>> def generator_function(): ... yield from [target.cr_await] >>> async def coroutine_function(): ... await GeneratorCoroutineWrapper(generator_function()) >>> gen = GeneratorCoroutineWrapper(generator_function()) >>> next(target := gen) is None True >>> gen.cr_await <list_iterator object at ...> >>> gen.close() >>> coro = GeneratorCoroutineWrapper(coroutine_function()) >>> next(target := coro) is None True >>> coro.cr_await <aiologic.meta.GeneratorCoroutineWrapper object at ...> >>> coro.close()
- property gi_origin: tuple[tuple[str, int, str], ...] | None
A tuple of
(filename, line_number, function_name)tuples describing the traceback where the wrapped object was created, orNone(seesys.set_coroutine_origin_tracking_depth()).This property is provided for consistency with
types.CoroutineTypeinstances. It is equivalent to one of the following:If none is available, raises
AttributeError.Example
>>> import sys >>> def generator_function(): ... return ... yield # generator definition >>> async def coroutine_function(): ... pass >>> gen = GeneratorCoroutineWrapper(generator_function()) >>> gen.gi_origin Traceback (most recent call last): AttributeError: the wrapped object has not attribute 'gi_origin' >>> gen.close() >>> sys.set_coroutine_origin_tracking_depth(0) >>> coro = GeneratorCoroutineWrapper(coroutine_function()) >>> coro.gi_origin is None True >>> coro.close() >>> sys.set_coroutine_origin_tracking_depth(1) >>> coro = GeneratorCoroutineWrapper(coroutine_function()) >>> coro.gi_origin ((..., ..., ...),) >>> coro.close() >>> sys.set_coroutine_origin_tracking_depth(0)
- property cr_origin: tuple[tuple[str, int, str], ...] | None
A tuple of
(filename, line_number, function_name)tuples describing the traceback where the wrapped object was created, orNone(seesys.set_coroutine_origin_tracking_depth()).This property is provided for compatibility with
types.CoroutineTypeinstances. It is equivalent to one of the following:If none is available, raises
AttributeError.Example
>>> import sys >>> def generator_function(): ... return ... yield # generator definition >>> async def coroutine_function(): ... pass >>> gen = GeneratorCoroutineWrapper(generator_function()) >>> gen.cr_origin Traceback (most recent call last): AttributeError: the wrapped object has not attribute 'cr_origin' >>> gen.close() >>> sys.set_coroutine_origin_tracking_depth(0) >>> coro = GeneratorCoroutineWrapper(coroutine_function()) >>> coro.cr_origin is None True >>> coro.close() >>> sys.set_coroutine_origin_tracking_depth(1) >>> coro = GeneratorCoroutineWrapper(coroutine_function()) >>> coro.cr_origin ((..., ..., ...),) >>> coro.close() >>> sys.set_coroutine_origin_tracking_depth(0)