2. Asynchronous Concurrenct Programming (asyncoro)

Note

asyncoro has been renamed pycos to better reflect its functionality and to avoid confusion with asyncore module.

asyncoro provides API for asynchronous and concurrent programming with coroutines using Python’s generator functions. Coroutines are like light weight threads - creating and running coroutines is very efficient. Moreover, unlike in the case of thread programming, a coroutine continues to run until it voluntarily gives up control (when yield is used), so locking is not needed to protect critical sections.

Programs developed with asyncoro have same logic and structure as programs with threads, except for a few syntactic changes. Although the API below has many methods, most of them are for additional features of asyncoro (such as message passing, hot swapping, monitoring etc.), and not needed for simple programs that are similar to thread based programs. The differences compared to threaded programming are:

  • Instead of creating threads, coroutines should be created with Coro. The process function (first argument to Coro) should be a generator function (i.e., function with yield statements),

  • Sockets, pipes etc, should be converted to asynchronous versions with Asynchronous Socket, Asynchronous Pipe etc.,

  • I/O operations, such as AsyncSocket’s send(), receive(), accept(), blocking operations, such as coroutine’s sleep(), Event’s wait(), etc., are implemented with generator methods; these should be used with yield (e.g., as data = yield async_sock.receve(1024)),

  • asyncoro’s locking primitives (asyncoro.Event, asyncoro.Condition, etc.) should be used in place of Python’s threading counterparts with yield on blocking operations (e.g., as yield async_event.wait()),

  • Coroutine’s sleep() should be used in place of time.sleep() (e.g., as yield coro.sleep(2)).

Coroutines in asyncoro are essentially generator functions that suspend execution when yield is used and are resumed by asyncoro’s scheduler (AsynCoro) after the asynchronous operation is complete. Usually yield is used with an asynchronous call, such as socket’s connect(), send() or pipe’s read(), communicate(), waiting for a message etc. With such statements, the asynchronous call is initiated and control goes to scheduler which schedules another coroutine, if one is ready to execute. When the asynchronous operation is complete, the coroutine that called the opreation becomes ready to execute. Thus, the coroutines in asyncoro are not strictly cooperative tasks that pass control to each other, but each yield statement transfers control to asyncoro’s scheduler, which manages them. However, asyncoro supports message passing, suspend/resume calls etc., so that coroutines can cooperate in a way that is easier to program and understand.

Unlike with threads, there is no forced preemption with coroutines - at any time at most one coroutine is executing and it continues to execute until yield is called. Thus, there is no need for locking critical sections with asyncoro.

asyncoro framework consists of AsynCoro scheduler, Coro to create coroutines from generator functions, Channel to broadcast messages, Asynchronous Socket to convert regular synchronous sockets to asynchronous (non-blocking) sockets, Asynchronous Pipe for pipes, Asynchronous File for files, Computation and discoro_server() for distributed / parallel computing, Lock and RLock for locking (although locking is not required with asyncoro), Condition, Event, Semaphore primitives very similar to thread primitives (except for a few syntactic changes noted above).

2.1. Examples

See Asynchronous Concurrent Programming, Channels and Message Passing in tutorial for examples. There are many illustrative use cases in ‘examples’ directory under where asyncoro module is installed.

Following is a brief description of the examples included relevant to this section:

  • examples/tut_coros.py creates a number of coroutines that each suspend execution for a brief period. The number of coroutines created can be increased to thousands or tens of thousands to show asyncoro can scale well.

  • examples/tut_client_server.py shows message passing (send() and receive() methods of coroutines) between local client and server coroutines. The remote version and local version are similar, except that remote versions register/locate coroutines.

  • examples/tut_channel.py uses broadcasting Channel to exchange messages in local coroutines.

2.2. AsynCoro scheduler

AsynCoro is a (singleton) scheduler that runs coroutines similar to the way operating system’s scheduler runs multiple processes. It is initialized automatically (for example, when a coroutine is created), so for most purposes the scheduler is transparent. The scheduler in asyncoro manages coroutines, message passing, I/O events, timeouts, wait/resume events etc., in a single concurrent program; it doesn’t provide distributed programming for message passing over network. disasyncoro extends AsynCoro with features supporting distributed programming, remote execution of coroutines etc. If the scheduler instance is needed, it can be obtained with either AsynCoro() or AsynCoro.instance().

Unlike in other asychronous frameworks, in asyncoro there is no explicit event loop - the I/O events are processed by the scheduler and methods in Asynchronous Socket, Asynchronous Pipe etc. For example, recv() method (which must be used with yield) sets up an internal function to execute when the socket has data to read and suspends the caller coroutine. The scheduler can execute any other coroutines that are ready while the I/O operation is pending. When the data has been read, the suspended coroutine is resumed with the data read so that Asynchronous Socket's recv() works just as socket.recv(), except for using yield. Thus, programming with asyncoro is very similar to that with threads, except for using yield with certain methods.

class asyncoro.AsynCoro

Creates and returns singleton scheduler. If a scheduler instance has already been created (such as when a coroutine was created), a new instance won’t be created. disasyncoro extends AsynCoro for distributed programming and the constructor there has various options to customize.

The scheduler following methods:

instance()

This static method returns instance of AsynCoro scheduler; use it as scheduler = AsynCoro.instance(). If the instance has not been started (yet), it creates one and returns it.

cur_coro()

This static method returns coroutine (instance of Coroutine) being executed; use it as coro = AsynCoro.cur_coro(). As mentioned below, if coroutine’s generator function has coro=None parameter, Coroutine constructor initializes it to the coroutine instance (which is a way to document that method is used for creating coroutines).

join()

Note

This method must be called from (main) thread only - calling from a coroutine will deadlock entire coroutine framework.

Waits for all scheduled non-daemon coroutines to finish. After join returns, more coroutines can be created (which are then added to scheduler).

terminate()

Note

This method must be called from (main) thread only - calling from a coroutine will deadlock entire coroutine framework.

Terminates all scheduled coroutines and then the scheduler itself. If necessary, a new scheduler instance may be created with AsynCoro() or AsynCoro.instance().

The scheduler runs in a separate thread from user program. The scheduler terminates when all non-daemon coroutines are terminated, similar to Python’s threading module.

2.3. Coroutine

asyncoro’s Coro class creates coroutines (light weight processes). Coroutines are similar to threads in regular Python programs, except for a few differences as noted above.

class asyncoro.Coro(target[, arg1, arg2, ...])

Creates a coroutine, where target is a generator function (a function with yield statements), arg1, arg2 etc. are arguments or keyword arguments to target. If target generator function has coro=None keyword argument, Coro constructor replaces None with the instance of Coro created, so coroutine can use this to invoke methods in Coro class (see below). Alternately, the instance can be obtained with the static method coro = AsynCoro.cur_coro().

Consider the generator function (where sock is asynchronous socket and all statements are asynchronous, so all are used with yield):

def get_reply(sock, msg, coro=None):
    yield sock.sendall(msg)
    yield coro.sleep(1)
    reply = yield sock.recv(1024)

A coroutine for processing above function can be created with, for example, Coro(get_reply, conn, "ping"). Coro constructor creates coroutine with the method get_reply with parameters sock=conn, msg="ping" and coro set to the just created coroutine instance. (If coro=None argument is not used, the coroutine instance can be obtained with coro = AsynCoro.cur_coro().) The coroutine is then added to AsynCoro scheduler so it executes concurrently with other coroutines - there is no need to start it explicitly, as done with threads. Note that generator functions cannot use return statement. With asyncoro a return statement such as return v can be replaced with raise StopIteration(v). If a generator/coroutine does not use StopIteration, then the last value yielded in the generator becomes the return value. Thus, in the example above get_reply does not use StopIteration, so buffer received (in the last yield) is equivalent to return value of get_reply.

Blocking operations, such as socket.recv(), socket.connect(), are implemented as generator functions in asynchronous implementation of socket Asynchronous Socket. These functions simply initiate the operation; yield should be used with them (as in the example above) so scheduler can run other eligible coroutines while the operation is pending. Calling these methods without yield simply returns generator function itself, instead of result of the method call. So care must be taken to use yield when calling generator functions. Using yield where it is not needed is not an error; e.g., resume() method of coroutines can be used without yield, but when used with yield, the caller gives control to scheduler which may execute resumed coroutine right away. In rest of the documentation, methods that need to be called with yield are noted so.

In rest of the documentation we follow the convention of using coro=None keyword argument in generator methods and use coro variable to refer to the coroutine, i.e., instance of Coro, executing the generator function. This variable can be used to invoke methods of Coro, use it in other coroutines, for example, to send messages to it, or wake up from sleep etc. A coroutine has following methods:

suspend(timeout=None, alarm_value=None)
sleep(timeout=None, alarm_value=None)

Note

This method must always be used with yield as yield coro.sleep().

Suspends coroutine coro until timeout. If timeout is a positive number (float or int), the scheduler suspends execution of coroutine until that many seconds (or fractions of second). If timeout is None, the coroutine is not woken up by the scheduler - some other coroutine needs to resume it. The value yielded by this method is the value it is resumed with or alarm_value if resumed by the scheduler due to timeout. If timeout=0, this method returns alarm_value without suspending the coroutine.

For example, if a coroutine executes v = yield coro.sleep(2.9), it is suspended for 2.9 seconds. If before timeout, another coroutine wakes up this coroutine (with resume() method) with a value, v is set to that value. Otherwise, after 2.9 seconds, this coroutine is resumed with None (default alarm_value) so v is set to None and coroutine continues execution. During the time coroutine is suspended, scheduler executes other scheduled coroutines. Within a coroutine coro.sleep (or coro.suspend) must be used (with yield) instead of time.sleep; calling time.sleep will deadlock entire coroutine framework.

resume(update=None)
wakeup(update=None)

Wakes up (suspended) coroutine coro. As explained above, the suspended coroutine gets update (any Python object) as the value of yield statement that caused it to suspend. If sleep/resume synchronization is needed (so that resume waits until specific suspend is ready to receive), Event locking primitive can be used so that resuming coroutine waits on an event variable and suspending coroutine sets the event before going to sleep.

send(msg)

Sends message msg (any Python object) to the coroutine on which this method is invoked. If the coroutine is currently waiting for messages (with receive()), then it is resumed with msg. If it is not currently waiting for messages, the message is queued so that next receive() returns the message without suspending.

Message can be any Python object when sender and recipients are in same program/asyncoro (i.e., messages are not sent over network). However, when sender and reecipient are on different asyncoro instances (over network), the messages must be serializable at the sender and unserializable at the receiver. If message includes any objects that have unserializable attributes, then their classes have to provide __getstate__() method to serialize the objects, and the remote program should have __setstate__() for those classes; see Pickle protocol.

If the recipient is in a remote asyncoro, send() simply queues messages for transfer over network. A daemon coroutine in asyncoro transfers the messages in the order they are queued. This process, by default, may transfer each message with a new connection. As creating sockets and making connections is expensive, it may be rather inefficient, especially if messages are sent frequently. See peer() method in Distributed Programming (disasyncoro) for specifying that messages to peers should be sent as stream, using same connection.

deliver(msg, timeout=None)

Note

This method must always be used with yield as recvd = yield rcoro.deliver(msg).

Similar to send() except that this method must be used with yield and it returns status of delivering the message to the recipient. If it is 1, the message has been successfully placed in recipient coroutine’s message queue (when recipient calls receive(), it gets the queued messages in the order they are received). If timeout is given and message couldn’t be delivered before timeout, the return value is 0. If timeout is None, delivery will not timeout. For local coroutines (i.e., coroutines executing in the same program) timeout has no effect - if the recipient is valid, message will be delivered successfully. However, if the recipient is a remote coroutine (see Distributed Programming (disasyncoro)), network delays / failures may cause delivery to be delayed or delivery may fail (i.e., there is a possibility of delivery waiting forever); to avoid such issues, appropriate timeout may be used.

receive(timeout=None, alarm_value=None)
recv(timeout=None, alarm_value=None)

Note

This method must always be used with yield as msg = yield coro.receive().

Returns earliest queued message if there are pending messages, or suspends coro until either a message is sent to it or timeout seconds elapse. If called with timeout=0, this method will not suspend the coroutine - it will return either earliest queued message if available or alarm_value.

recv is synonym for receive.

set_daemon(flag=True)

Marks the coroutine a daemon (process that never terminates) if flag is True. Similar to threading module, AsynCoro scheduler waits for all non-daemon coroutines to terminate before exiting. The daemon status can be toggled by calling set_daemon() with flag set to True or False.

hot_swappable(flag)

Marks if the coroutine’s generator function can be replaced. This method can be used to set (with flag=True) or clear (with flag=False) the flag. With hot swapping, a coroutine’s code can be updated (to new functionality) while the application is running.

hot_swap(target[, arg1, arg2, ...])

Requests AsynCoro to replace coroutine’s generator function with target([arg1, arg2, …]). AsynCoro then throws HotSwapException in the coroutine when:

  • coro indicated it can handle hot swap (i.e., last called hot_swappable with flag=True),

  • it is currently executing at top-level in the call stack (i.e., has not called other generator functions), and

  • has no pending asynchronous operations (socket I/O, tasks scheduled with AsyncThreadPool, etc.).

The new generator is set as args[0] of HotSwapException, so the coro can inspect new generator, if necessary, and can do any preparation for hot swapping, e.g., saving state (perhaps by sending state as a message to itself which can be retrieved in the new generator with receive()), or even ignore hot swap request. If/when it is ready for swap, it must re-raise the same HotSwapException (with the new generator as args[0]). This causes AsynCoro to close current generator function, replace it with the new generator function and schedule new generator for execution (from the beginning). Any messages (i.e., resume updates) queued in the previous generator are not reset, so new generator can process queued messages (e.g., use receive() in a loop with timeout=0 until receive() returns alarm_value). Note that hot_swap() changes generator function of a particular coroutine for which it is called. If there are many coroutines using that generator function, hot_swap() may be called for each such coroutine.

monitor(observe)

Note

This method must always be used with yield as v = yield coro.monitor(observe).

Sets coro as the monitor of coroutine observe. Then, when the coroutine observe is finished (either because coroutine’s generator function finished exceution or was terminated by AsynCoro because of an uncaught exception), AsynCoro sends the status as message with MonitorException to coro. MonitorException args[0] is set to the affected coroutine observe and args[1] is set to the exception tuple: If observe finished execution, the tuple is a pair, with first element set to (type) StopIteration and second element instance of StopIteration with the last value yielded by observe, and if observe was terminated due to uncaught exception, the tuple will have either 2 or 3 elements, with first element set to the type of exception, second element set to the uncaught exception, and third element set to trace, if available. The monitor coroutine can inspect MonitorException and possibly restart the affected coroutine (see below). A coroutine can be monitored by more than one monitor, and a monitor can monitor more than one coroutine. This method must always be used with yield.

throw(*args)

Throws exception *args to coroutine (at the point where it is currently executing).

terminate()

Terminates the coroutine. This is useful, for example, to terminate server processes that otherwise never terminate.

value()

Note

This method must be called from a thread, not a coroutine.

Returns the last value yielded by the coroutine, possibly waiting until coroutine terminates. This method should not be called from a coroutine - this will cause entire coroutine framework to deadlock. This method is meant for main thread in the user program to wait for (main) coroutine(s) it creates.

finish()

Note

This method must always be used in a coroutine with yield as v = yield other.finish().

Returns the last value yielded by the coroutine other, possibly waiting until it terminates.

Faults in (local or remote) coroutines can be detected with monitor(), and fault-toerant coroutines can be developed with hot_swap().

2.4. Locking Primitives

class asyncoro.Lock
class asyncoro.RLock
class asyncoro.Semaphore
class asyncoro.Event
class asyncoro.Condition

Note

With asyncoro locking is not needed, as there is no forced preemption - at any time at most one coroutine is executing and the control is transfered to the scheduler only when yield statement is encountered. (In fact, the implementation of asynchronous locking primitives in asyncoro updates lists and counters without locking.) So with asyncoro Lock and RLock are optional.

asyncoro provides asynchronous implementations of Lock, RLock, Semaphore, Event and Condition primitives. They are similar to versions in threading module. Any operation that would block in threading module must be called with yield appropriately. For example, acquiring a lock is a blocking operation, so it should be invoked as yield lock.acquire(). Similarly, Event’s wait method or Condition’s wait method must be used as yield event.wait() or yield condition.wait(). For example, Condition variable cv in a client should be used as (compare to example at threading module):

while True:
  yield cv.acquire()
  while not an_item_is_available():
      yield cv.wait()
  get_an_available_item()
  cv.release()

See documentation strings in asyncoro module for more details on which methods should be used with yield and which methods need not be.

2.5. Channel

Channel is a broadcast mechanism with which coroutines can exchange messages. Messages sent to Channel are sent to its subscribers (recipients). While a message can be sent one-to-one with coroutine’s send() or deliver() methods on the receiving coroutine, channels can be used to broadcast a message so all its subscribers get that message.

class asyncoro.Channel(name, transform=None)

Creates channel with name, which must be unique. If transform, is given, it must a function that is called before a message is sent to subscribers. The function is called with name of the channel and the message. It should return transformed message or None. If None is returned, the message is dropped - subscribers will not receive the message. Otherwise, transformed message is sent to subscribers.

A channel has following methods.

subsribe(subscriber, timeout=None)

Note

This method must be used with yield as yield channel.subscribe(coro)

Subscribes subscriber (a coroutine or even another channel) to the channel. Any messages sent to the channel are then sent to each subscriber; i.e., messages are broadcast to all subscribers. It is possible to chain or create hierarchical channels with channels subscribing to other channels. If timeout is a positive number, the call fails if subscription is not successfull (e.g., the channel couldn’t be located) before that many seconds.

send(message)

Calls transform function of the channel (see above) if it has one. If the function returns None, the message is ignored. Otherwise the message is sent to current subscribers. Messages sent over a channel are queued (buffered) at receiving coroutines. A coroutine coro, for example, that has subscribed to the channel can receive messages with msg = yield coro.receive().

deliver(message, timeout=None, n=0)

Note

This method must be used with yield as recvd = yield channel.deliver(msg)

Similar to send(), except that it waits until at least n subscribers are subscribed. It returns total number of end-point recipients (coroutines) the message has been delivered to - in case of heirarchical channels, it is the sum of recipients of all the channels. This may be less than n (e.g., delivering message to a subscriber may fail, due to network error), or more (e.g., there are more subscribers, or some subscribers are channels with more than one subscriber). If n is 0, then the message will be delivered to all current subscribers. In any case, timeout is maximum amount of time in seconds (or fraction of second) for delivering the message. Thus, for example, if timeout occurs before n subscribers are subscribed to the channel, the method returns 0.

unsubsribe(subscriber, timeout=None)

Note

This method must be used with yield as yield channel.unsubscribe(coro)

Unsubscribes the subscriber (coroutine or another channel), so future messages to the channel are not sent to that subscriber. If timeout is a positive number, it is the number of seconds for unsubscribe request to complete.

close()

Close the channel. The channel can’t be used for message passing after closing.

set_transform(transform)

Set/change transform as the method to call when message is sent to this channel. See Channel constructor and send().

2.6. Message Passing

Coroutine’s send(), receive() and deliver() offer one-to-one message passing and Channel’s send() and deliver() offer one-to-many / broadcast message passing.

asyncoro delivers messages in the order they have been sent with either one-to-one or broadcast message passing (i.e., with either send or deliver methods of coroutines or channels). In other words, asyncoro guarantees temporal order of messages.

2.7. AsyncThreadPool

asyncoro framework and all coroutines run in a single thread. It implements concurrency (running more than one process) by interleaving coroutines - suspending a coroutine that is waiting for some event and running a coroutine that is ready to execute. All the blocking operations, such as sending/receiving data (sockets, message passing), or sleeping, are implemented with generator funtions that schedule the operation and suspend the coroutine. However, asyncoro framework doesn’t implement every blocking operation. Sometimes, it is necessary to use functions in other modules that block the thread until the operation is complete. For example, reading standard input will block the thread until the read method is complete. If such functions are used in a coroutine, entire asyncoro framework and all coroutines are blocked; i.e., asyncoro scheduler itself is blocked, so even if there are other coroutines eligible to run, they won’t be executed. AsyncThreadPool class can be used to run such blocking functions in separate threads so asyncoro itself is not affected by them.

class asyncoro.AsyncThreadPool(num_threads)

Creates a pool with given number of threads. When a blocking function is scheduled, an available thread in the pool is used to execute that function. More threads will allow more blocking functions to be running simultaneously, but take more system resources.

async_task(target, \*args, \*\*kwargs)

Note

This method must be used with yield as val = yield pool.async_task(target, args, kwargs)

Schedules given target function with arguments *args and keyword arguments **kwargs for execution with a thread in the pool. If all threads are currently executing other functions, then the function will be executed when a thread becomes available (i.e., done with currently executing function).

The value returned by this method is the value returned by the function.

join()

Waits for all scheduled blocking functions to finish. This method should be called from main thread, not from any coroutine, as this method is a blocking operation.

terminate()

Waits for all scheduled blocking functions to finish and then terminate the threads; the pool can no longer be used for scheduling tasks. This method should be called from main thread, not from any coroutine, as this method is a blocking operation.

See examples/chat_client.py which uses thread pool (with 1 thread) to execute sys.stdin.readline (a bloking function).