7. Tutorial / Examples

Note

asyncoro has been renamed pycos to better reflect its functionality and to avoid confusion with asyncore module.

Many examples are included in ‘examples’ directory where asyncoro module is installed (with PyPI / pip). See README file in that ‘examples’ directory for brief description of each of the programs. A few examples from them are explained below in more detail.

7.1. Asynchronous Concurrent Programming

asyncoro’s concurrency framework has some features similar to Actor Model. With asyncoro, computation units are created with coroutines. Each coroutine has a message queue from which it can receive message sent to it by other coroutines. In addition to this one-to-one communication between coroutines, asyncoro supports one-to-many communication with broadcasting channels.

7.1.1. Coroutines

Coroutine in asyncoro is a computational unit created with generator function, i.e., Python function with one or more yield statements. Creating coroutines is similar to creating threads, except that the process function must be generator function and coroutine is created with Coroutine instead of threading.Thread. If the generator functions used for creating coroutines have default argument coro=None, Coroutine constructor sets this parameter set to coroutine instance created. This parameter, thus, can be used to call methods on it (e.g., receive(), sleep() etc.).

An example program that creates coroutines is:

import asyncoro, random, time

def coro_proc(n, coro=None):
    s = random.uniform(0.5, 3)
    print('%f: coroutine %d sleeping for %f seconds' % (time.time(), n, s))
    yield coro.sleep(s)
    print('%f: coroutine %d terminating' % (time.time(), n))

# create 10 coroutines running generator function 'coro_proc'
for i in range(10):
    # coroutine function is called with 'i'
    asyncoro.Coro(coro_proc, i)

Coroutines are created with Coroutine constructor. The first argument must be a generator function and rest of the arguments should correspond to parameters in generator definition. In the above program, the generator function coro_proc has coro=None keyword argument, so Coroutine constructor sets coro to the coroutine created - this parameter should not be passed to the constructor. The constructor schedule execution of this coroutine in asncoro’s scheduler. In coro_proc, the expression coro.sleep(s) suspends execution of running coroutine for given time. During that time other coroutines that are ready to execute will be executed. The total time taken by the above program should be roughly same as maximum of the sleep times (at most 3 seconds in the above program).

Note that since sleep() is a generator function, it must be called with yield (otherwise, coro.sleep simply returns new generator instance and thus will not suspend the execution as desired).

7.1.2. Message Passing

With the concurrent and asychronous behavior of coroutines in asyncoro, communication among them is accomplished by sending and receiving messages. A message can be any Python object. In case message is being sent to a remote coroutine (i.e., coroutine running with another program), the message must be serializable. A coroutine can either send a message to another coroutine (one-to-one communication) or broadcast a message over channel to many coroutines (one-to-many communication). At the reeciver coroutine the messages are stored in a queue (similar to what is called Mailbox in other concurrecy frameworks) in the order they are received so that a receive() returns oldest message, blocking until a message becomes available.

With one-to-one communication, a coroutine invokes send() method on the receipient coroutine. Sending a message simply queues the message in recipient; it doesn’t, for example, wait for recipient to process the message. If necessary, deliver() method may be used instead of send() when sending message over network; it indicates how many recipients received the message - see Asynchronous Concurrenct Programming (asyncoro) for details.

An example client/server program with asyncoro is:

import asyncoro, random

def server_proc(coro=None):
    coro.set_daemon()
    while True:
        msg = yield coro.receive()
        print('processing %s' % (msg))

msg_id = 0

def client_proc(server, n, coro=None):
    global msg_id
    for x in range(3):
        yield coro.suspend(random.uniform(0.5, 3))
        msg_id += 1
        server.send('%d: %d / %d' % (msg_id, n, x))

server = asyncoro.Coro(server_proc)
for i in range(10):
    asyncoro.Coro(client_proc, server, i)

The main program first creates server coroutine with server_proc generator function, which has coro=None keyword parameter, so Coroutine constructor passes the coroutine instance as coro (thus, server in the main program is same as coro in server_proc). The main program creates 10 client coroutines with client_proc, passing server as the first argument and an identifier as second argument. The main program has no use for client coroutines, so it doesn’t save them. Each of the client coroutines suspends itself for a brief period and sends a unique message to the server. Since server_proc never terminates on its own, we indicate that it is a daemon process so that asyncoro can terminate it once all non-daemon coroutines (in this case client coroutines) are terminated (after sending 3 messages each); otherwise, asyncoro’s scheduler will never terminate as the server coroutine is still running.

Unlike with threads, asyncoro’s scheduler doesn’t preempt running coroutine. Thus, locking is not required with asyncoro. To illustrate this concept, msg_id, a global, shared variable, is updated in client_proc without having to worry about non-deterministic values. asyncoro, however, provides all locking primitives similar to thread locking primitives. Some of the methods in these locking primitives are generator methods (blocking operations in synchronous threading module), so they must be used with yield.

In this case the messages sent by clients are strings. If, say, server needs to send a reply back tot the client, then the messages can be in the form of dictionary, tuple, list etc. to pass client’s coroutine instance (e.g., as list [coro, msg_id, n, x] from which server can retrieve the client coroutine that sent the message).

7.1.3. Channels

If one-to-many or broadcast communication is needed, asyncoro’s Channel can be used. To receive messages on a channel, a coroutine must subscribe to it. After subscribing to a channel, any message sent to that channel will be received by each of its current subscribers.

These concepts are used in the program below where a client sends a series of numbers over a channel. Two coroutines receive these numbers to compute sum and product of those numbers:

import asyncoro, random

def seqsum(coro=None):
    # compute sum of numbers received over channel
    result = 0
    while True:
        msg = yield coro.receive()
        if msg is None:
            break
        result += msg
    print('sum: %f' % result)

def seqprod(coro=None):
    # compute product of numbers received over channel
    result = 1
    while True:
        msg = yield coro.receive()
        if msg is None:
            break
        result *= msg
    print('prod: %f' % result)

def client_proc(coro=None):
    channel = asyncoro.Channel('sum_prod')
    # create two coroutines to compute sum and product of
    # numbers sent over the channel
    sum_coro = asyncoro.Coro(seqsum)
    prod_coro = asyncoro.Coro(seqprod)
    yield channel.subscribe(sum_coro)
    yield channel.subscribe(prod_coro)
    for x in range(4):
        r = random.uniform(0.5, 3)
        channel.send(r)
        print('sent %f' % r)
    channel.send(None)
    yield channel.unsubscribe(sum_coro)
    yield channel.unsubscribe(prod_coro)

asyncoro.Coro(client_proc)

A coroutine can subscribe to as many channels as necessary. All such messages, as well as messages sent directly to a coroutine, are received with coro.receive() method.

A channel, c2, may subscribe to another channel, c1, so that any message sent to c1 will also be received by all of its subscribers, including c2, which in turn causes its subscribers to receive that message as well. In this case, a message sent to c2 will not be receieved by c1. This way a hierarchy of channels can be created to reflect the heirarchy of components in a system.

Care must be taken not to create cycles in subscription with channel hierarchy; e.g., channel c1 subscribing to channel c2 in the above example. asyncoro doesn’t detect cycles in subscriptions and will cause runtime exception due to recursion.

7.2. Asynchronous Network Programming

Some of Python library’s (synchronous) socket operations, such as connect, accept and recv are blocking operations; i.e., they wait for the operation complete. These blocking operations are not suitable with asyncoro, as during that time other eligible coroutines are also blocked from executing.

asyncoro provides Asynchronous Socket class to convert Python’s blocking socket to a non-blocking socket. Essentially Asynchronous Socket is a wrapper that implements blocking operations as generator functions that can be used in coroutines (with yield, as done with any generator function).

For example, below is the server program that accepts connections and processes each connection:

import socket, sys, asyncoro

def process(conn, coro=None):
    data = ''
    while True:
        data += yield conn.recv(128)
        if data[-1] == '/':
           break
    conn.close()
    print('received: %s' % data)

def server_proc(host, port, coro=None):
    coro.set_daemon()
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock = asyncoro.AsyncSocket(sock)
    sock.bind((host, port))
    sock.listen(128)

    while True:
        conn, addr = yield sock.accept()
        asyncoro.Coro(process, conn)

asyncoro.Coro(server_proc, '127.0.0.1', 8010)
while True:
    cmd = sys.stdin.readline().strip().lower()
    if cmd == 'exit' or cmd == 'quit':
        break

The two differences to note in ‘server_proc’ coroutine function compared to programming with threads: the TCP socket is converted to asynchronous socket with Asynchronous Socket so it can be used in coroutines, and accept is used with yield as this is a generator function (in AsyncSocket). Then a new coroutine is created to process the connection. The socket returned from accept of an asynchronous socket is also an asychronous socket, so no need to convert it with Asynchronous Socket. In the ‘process’ coroutine function, recv is used with yield as it is also a generator function of asynchronous socket.

Below is a client program that creates 10 coroutines each of which connects to the server above and sends a message. Each message ends with a marker ‘/’ so that the server can receive the full message.:

import socket, sys, asyncoro, random

def client_proc(host, port, n, coro=None):
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock = asyncoro.AsyncSocket(sock)
    yield sock.connect((host, port))
    msg = '%d: ' % n + '-' * random.randint(100,300) + '/'
    yield sock.sendall(msg)
    sock.close()

for n in range(1, 10):
    asyncoro.Coro(client_proc, '127.0.0.1', 8010, n)

Here again, the TCP socket is converted to asynchronous socket with Asynchronous Socket so it can be used in coroutines, and the operations connect and sendall are used with yield as these are generator functions.

In essence, using asyncoro for asynchronous network programming is very similar to thread programming, except for creating coroutines with Coroutine instead of threads, converting Python library’s sockets to asynchronous sockets with Asynchronous Socket, and using yield with certain methods.

7.3. Distributed Programming

asyncoro includes a scheduler (class AsynCoro) that runs coroutines, suspends them when necessary, deliver messages etc. When a coroutine is created with Coroutine as done in above programs, the scheduler is started if one has not been already started. The default behavior with the scheduler is to not start network services required for communication with coroutines or channels in another program. To use distributed programming, disasyncoro must be imported instead of asyncoro.

Coroutines and channels can be registered with asyncoro so that they can be located by coroutines running in a remote location. The reference (to remote coroutine or channel) obtained by locating can be used to send messages, monitor (in the case of coroutine) etc.

Using these features, the above client/server program can be separated in to client and server programs that run on two different locations. The server program is:

import random, sys
import asyncoro.disasyncoro as asyncoro

def server_proc(coro=None):
    coro.set_daemon()
    coro.register('server_coro')
    while True:
        msg = yield coro.receive()
        print('processing %s' % (msg))

server = asyncoro.Coro(server_proc)
while True:
    cmd = sys.stdin.readline().strip().lower()
    if cmd == 'quit' or cmd == 'exit':
        break

There are two differences with this version from the one in concurrency section above:

  • disasyncoro is imported to start network services for distributed programming.

  • The server coroutine registers itself with the name “server_coro” so that client can use that name to obtain a reference to the this server which can be used to send messages.

The client program is:

import random
import asyncoro.disasyncoro as asyncoro

def client_proc(n, coro=None):
    global msg_id
    server = yield Coro.locate('server_coro')
    for x in range(3):
        yield coro.suspend(random.uniform(0.5, 3))
        msg_id += 1
        server.send('%d: %d / %d' % (msg_id, n, x))

msg_id = 0
for i in range(10):
    asyncoro.Coro(client_proc, i)

In this case there are two differences compared to the version in concurrent programming section above:

  • As is done in the server, disasyncoro is imported to start network service.

  • In the client coroutine a reference to remote server is obtained using the name the server is registered with. locate() is a generator function so it must be called with yield.

The above client and server programs can be run either on the same computer or on different computers on the same network. Even if they are run on the same computer, the client and server coroutines are considered remote to each other. The client program can be run multiple instances simultaneously, if desired.

If the client and server programs are run on computers on the same network (i.e., they share same router or gateway), then the schedulers discover each other. If the programs are on computers on different networks, the scheduler in the client program needs to be informed about the location of server’s scheduler. This is done by adding the line yield AsynCoro().peer('remote_node') before using name location, where remote_node is either the IP address or name of the remote peer.

7.4. Distributed Communicating Processes

While RCI (Remote Coroutine Invocation) provides API for creating pre-defined functionality that can be executed remotely, module discoro provides support for clients to send computations that can be executed remotely, optionally running them in parallel in separate processes to use multiple processors. See Distributed Communicating Processes (discoro) for details.

Program below sends rcoro_proc generator function to a server running discoronode.py program, for creating (remote) coroutines to execute compute which simply sleeps for given number of seconds and sends back the same number (as the result). Coroutine client_proc creates discoro AsynCoro in the client program itself (alternately, discoro.py can be run as a separate program to which multiple clients can schedule computations). The same coroutine is also set as status_coro before scheduling computation so all status notifications are sent to it as messages, which are processed to know which discoro server processes are available to schedule new jobs, which jobs are finished etc. Alternately, jobs can simply be schduled to execute and scheduler will load balance coroutines; see discoro_client.py and discoro_client2.py in ‘examples’ directory under the installation path.:

import asyncoro.discoro as discoro
import asyncoro.disasyncoro as asyncoro

# this generator function is sent to remote server to run
# coroutines there
def rcoro_proc(n, coro=None):
    yield coro.sleep(n)
    raise StopIteration(n)

def client_proc(computation, njobs, coro=None):
    status = {'submitted': 0, 'done': 0}

    def submit_job(where, coro=None):
        arg = random.uniform(5, 20)
        rcoro = yield computation.run_at(where, rcoro_proc, arg)
        if isinstance(rcoro, asyncoro.Coro):
            print('%s processing %s' % (rcoro.location, arg))
        else:
            print('Job %s failed: %s' % (status['submitted'], str(rcoro)))
        status['submitted'] += 1

    discoro.Scheduler()
    computation.status_coro = coro
    if (yield computation.schedule()):
        raise Exception('Failed to schedule computation')
    # job submitter assumes that a process can run at most one coroutine at  a time,
    # although more than one coroutine (many thousands, if necessary) can be run
    while True:
        msg = yield coro.receive()
        if isinstance(msg, asyncoro.MonitorException):
            rcoro = msg.args[0]
            if msg.args[1][0] == StopIteration:
                print('Remote coroutine %s finished with %s' % (rcoro.location, msg.args[1][1]))
            else:
                asyncoro.logger.warning('Remote coroutine %s terminated with "%s"' %
                                        (rcoro.location, str(msg.args[1])))
             status['done'] += 1
             # because jobs are submitted with 'yield' with coroutines,
             # and 'submitted' is incremented after 'yield', it is
             # likely that more than 'njobs' are submitted
             if status['done'] >= njobs and status['done'] == status['submitted']:
                 break
            if status['submitted'] < njobs:
                # schedule another job at this process
                asyncoro.Coro(submit_job, rcoro.location)
        elif isinstance(msg, discoro.StatusMessage):
             # a new process is ready (if special initialization is
             # required for preparing process, schedule it)
            if msg.status == discoro.Scheduler.ProcInitialized:
                asyncoro.Coro(submit_job, msg.location)
        else:
            asyncoro.logger.debug('Ignoring status message %s' % msg)
    yield computation.close()

if __name__ == '__main__':
    import logging, random
    asyncoro.logger.setLevel(logging.DEBUG)
    computation = discoro.Computation([rcoro_proc])
    # run 10 jobs
    asyncoro.Coro(client_proc, computation, 10)

To test, run discoronode.py program on a computer in local network and this client program.

If the tasks and client don’t need to communicate (as in the example above), it is easier to use dispy project. If the tasks and client need to communicate, separating scheduler and client would make it easier. See ‘discoro_client*.py’ files in the examples directory under installation directory of asyncoro for additional use cases.