7. Tutorial / Examples¶
Note
asyncoro has been renamed pycos to better reflect its functionality and to avoid confusion with asyncore module.
Many examples are included in ‘examples’ directory where asyncoro module is installed (with PyPI / pip). See README file in that ‘examples’ directory for brief description of each of the programs. A few examples from them are explained below in more detail.
7.1. Asynchronous Concurrent Programming¶
asyncoro’s concurrency framework has some features similar to Actor Model. With asyncoro, computation units are created with coroutines. Each coroutine has a message queue from which it can receive message sent to it by other coroutines. In addition to this one-to-one communication between coroutines, asyncoro supports one-to-many communication with broadcasting channels.
7.1.1. Coroutines¶
Coroutine in asyncoro is a computational unit created with generator
function, i.e., Python function with one or more yield
statements. Creating coroutines is similar to creating threads, except
that the process function must be generator function and coroutine is
created with Coroutine instead of threading.Thread
. If the
generator functions used for creating coroutines have default argument
coro=None, Coroutine constructor sets this parameter set to
coroutine instance created. This parameter, thus, can be used to call
methods on it (e.g., receive()
, sleep()
etc.).
An example program that creates coroutines is:
import asyncoro, random, time
def coro_proc(n, coro=None):
s = random.uniform(0.5, 3)
print('%f: coroutine %d sleeping for %f seconds' % (time.time(), n, s))
yield coro.sleep(s)
print('%f: coroutine %d terminating' % (time.time(), n))
# create 10 coroutines running generator function 'coro_proc'
for i in range(10):
# coroutine function is called with 'i'
asyncoro.Coro(coro_proc, i)
Coroutines are created with Coroutine constructor. The first argument must be a generator function and rest of the arguments should correspond to parameters in generator definition. In the above program, the generator function coro_proc has coro=None keyword argument, so Coroutine constructor sets coro to the coroutine created - this parameter should not be passed to the constructor. The constructor schedule execution of this coroutine in asncoro’s scheduler. In coro_proc, the expression coro.sleep(s) suspends execution of running coroutine for given time. During that time other coroutines that are ready to execute will be executed. The total time taken by the above program should be roughly same as maximum of the sleep times (at most 3 seconds in the above program).
Note that since sleep()
is a generator function, it must
be called with yield (otherwise, coro.sleep simply returns new
generator instance and thus will not suspend the execution as
desired).
7.1.2. Message Passing¶
With the concurrent and asychronous behavior of coroutines in
asyncoro, communication among them is accomplished by sending and
receiving messages. A message can be any Python object. In case
message is being sent to a remote coroutine (i.e., coroutine running
with another program), the message must be serializable. A coroutine
can either send a message to another coroutine (one-to-one
communication) or broadcast a message over channel to many coroutines
(one-to-many communication). At the reeciver coroutine the messages
are stored in a queue (similar to what is called Mailbox in other
concurrecy frameworks) in the order they are received so that a
receive()
returns oldest message, blocking until a message
becomes available.
With one-to-one communication, a coroutine invokes send()
method on the receipient coroutine. Sending a message simply queues
the message in recipient; it doesn’t, for example, wait for recipient
to process the message. If necessary, deliver()
method may
be used instead of send()
when sending message over
network; it indicates how many recipients received the message
- see Asynchronous Concurrenct Programming (asyncoro) for details.
An example client/server program with asyncoro is:
import asyncoro, random
def server_proc(coro=None):
coro.set_daemon()
while True:
msg = yield coro.receive()
print('processing %s' % (msg))
msg_id = 0
def client_proc(server, n, coro=None):
global msg_id
for x in range(3):
yield coro.suspend(random.uniform(0.5, 3))
msg_id += 1
server.send('%d: %d / %d' % (msg_id, n, x))
server = asyncoro.Coro(server_proc)
for i in range(10):
asyncoro.Coro(client_proc, server, i)
The main program first creates server coroutine with server_proc generator function, which has coro=None keyword parameter, so Coroutine constructor passes the coroutine instance as coro (thus, server in the main program is same as coro in server_proc). The main program creates 10 client coroutines with client_proc, passing server as the first argument and an identifier as second argument. The main program has no use for client coroutines, so it doesn’t save them. Each of the client coroutines suspends itself for a brief period and sends a unique message to the server. Since server_proc never terminates on its own, we indicate that it is a daemon process so that asyncoro can terminate it once all non-daemon coroutines (in this case client coroutines) are terminated (after sending 3 messages each); otherwise, asyncoro’s scheduler will never terminate as the server coroutine is still running.
Unlike with threads, asyncoro’s scheduler doesn’t preempt running coroutine. Thus, locking is not required with asyncoro. To illustrate this concept, msg_id, a global, shared variable, is updated in client_proc without having to worry about non-deterministic values. asyncoro, however, provides all locking primitives similar to thread locking primitives. Some of the methods in these locking primitives are generator methods (blocking operations in synchronous threading module), so they must be used with yield.
In this case the messages sent by clients are strings. If, say, server needs to send a reply back tot the client, then the messages can be in the form of dictionary, tuple, list etc. to pass client’s coroutine instance (e.g., as list [coro, msg_id, n, x] from which server can retrieve the client coroutine that sent the message).
7.1.3. Channels¶
If one-to-many or broadcast communication is needed, asyncoro’s Channel can be used. To receive messages on a channel, a coroutine must subscribe to it. After subscribing to a channel, any message sent to that channel will be received by each of its current subscribers.
These concepts are used in the program below where a client sends a series of numbers over a channel. Two coroutines receive these numbers to compute sum and product of those numbers:
import asyncoro, random
def seqsum(coro=None):
# compute sum of numbers received over channel
result = 0
while True:
msg = yield coro.receive()
if msg is None:
break
result += msg
print('sum: %f' % result)
def seqprod(coro=None):
# compute product of numbers received over channel
result = 1
while True:
msg = yield coro.receive()
if msg is None:
break
result *= msg
print('prod: %f' % result)
def client_proc(coro=None):
channel = asyncoro.Channel('sum_prod')
# create two coroutines to compute sum and product of
# numbers sent over the channel
sum_coro = asyncoro.Coro(seqsum)
prod_coro = asyncoro.Coro(seqprod)
yield channel.subscribe(sum_coro)
yield channel.subscribe(prod_coro)
for x in range(4):
r = random.uniform(0.5, 3)
channel.send(r)
print('sent %f' % r)
channel.send(None)
yield channel.unsubscribe(sum_coro)
yield channel.unsubscribe(prod_coro)
asyncoro.Coro(client_proc)
A coroutine can subscribe to as many channels as necessary. All such messages, as well as messages sent directly to a coroutine, are received with coro.receive() method.
A channel, c2, may subscribe to another channel, c1, so that any message sent to c1 will also be received by all of its subscribers, including c2, which in turn causes its subscribers to receive that message as well. In this case, a message sent to c2 will not be receieved by c1. This way a hierarchy of channels can be created to reflect the heirarchy of components in a system.
Care must be taken not to create cycles in subscription with channel hierarchy; e.g., channel c1 subscribing to channel c2 in the above example. asyncoro doesn’t detect cycles in subscriptions and will cause runtime exception due to recursion.
7.2. Asynchronous Network Programming¶
Some of Python library’s (synchronous) socket operations, such as connect, accept and recv are blocking operations; i.e., they wait for the operation complete. These blocking operations are not suitable with asyncoro, as during that time other eligible coroutines are also blocked from executing.
asyncoro provides Asynchronous Socket class to convert Python’s blocking socket to a non-blocking socket. Essentially Asynchronous Socket is a wrapper that implements blocking operations as generator functions that can be used in coroutines (with yield, as done with any generator function).
For example, below is the server program that accepts connections and processes each connection:
import socket, sys, asyncoro
def process(conn, coro=None):
data = ''
while True:
data += yield conn.recv(128)
if data[-1] == '/':
break
conn.close()
print('received: %s' % data)
def server_proc(host, port, coro=None):
coro.set_daemon()
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock = asyncoro.AsyncSocket(sock)
sock.bind((host, port))
sock.listen(128)
while True:
conn, addr = yield sock.accept()
asyncoro.Coro(process, conn)
asyncoro.Coro(server_proc, '127.0.0.1', 8010)
while True:
cmd = sys.stdin.readline().strip().lower()
if cmd == 'exit' or cmd == 'quit':
break
The two differences to note in ‘server_proc’ coroutine function compared to programming with threads: the TCP socket is converted to asynchronous socket with Asynchronous Socket so it can be used in coroutines, and accept is used with yield as this is a generator function (in AsyncSocket). Then a new coroutine is created to process the connection. The socket returned from accept of an asynchronous socket is also an asychronous socket, so no need to convert it with Asynchronous Socket. In the ‘process’ coroutine function, recv is used with yield as it is also a generator function of asynchronous socket.
Below is a client program that creates 10 coroutines each of which connects to the server above and sends a message. Each message ends with a marker ‘/’ so that the server can receive the full message.:
import socket, sys, asyncoro, random
def client_proc(host, port, n, coro=None):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock = asyncoro.AsyncSocket(sock)
yield sock.connect((host, port))
msg = '%d: ' % n + '-' * random.randint(100,300) + '/'
yield sock.sendall(msg)
sock.close()
for n in range(1, 10):
asyncoro.Coro(client_proc, '127.0.0.1', 8010, n)
Here again, the TCP socket is converted to asynchronous socket with Asynchronous Socket so it can be used in coroutines, and the operations connect and sendall are used with yield as these are generator functions.
In essence, using asyncoro for asynchronous network programming is very similar to thread programming, except for creating coroutines with Coroutine instead of threads, converting Python library’s sockets to asynchronous sockets with Asynchronous Socket, and using yield with certain methods.
7.3. Distributed Programming¶
asyncoro includes a scheduler (class AsynCoro) that runs coroutines,
suspends them when necessary, deliver messages etc. When a coroutine
is created with Coroutine as done in above programs, the scheduler
is started if one has not been already started. The default behavior
with the scheduler is to not start network services required for
communication with coroutines or channels in another program. To use
distributed programming, disasyncoro
must be imported instead
of asyncoro
.
Coroutines and channels can be registered with asyncoro so that they can be located by coroutines running in a remote location. The reference (to remote coroutine or channel) obtained by locating can be used to send messages, monitor (in the case of coroutine) etc.
Using these features, the above client/server program can be separated in to client and server programs that run on two different locations. The server program is:
import random, sys
import asyncoro.disasyncoro as asyncoro
def server_proc(coro=None):
coro.set_daemon()
coro.register('server_coro')
while True:
msg = yield coro.receive()
print('processing %s' % (msg))
server = asyncoro.Coro(server_proc)
while True:
cmd = sys.stdin.readline().strip().lower()
if cmd == 'quit' or cmd == 'exit':
break
There are two differences with this version from the one in concurrency section above:
disasyncoro
is imported to start network services for distributed programming.The server coroutine registers itself with the name “server_coro” so that client can use that name to obtain a reference to the this server which can be used to send messages.
The client program is:
import random
import asyncoro.disasyncoro as asyncoro
def client_proc(n, coro=None):
global msg_id
server = yield Coro.locate('server_coro')
for x in range(3):
yield coro.suspend(random.uniform(0.5, 3))
msg_id += 1
server.send('%d: %d / %d' % (msg_id, n, x))
msg_id = 0
for i in range(10):
asyncoro.Coro(client_proc, i)
In this case there are two differences compared to the version in concurrent programming section above:
As is done in the server,
disasyncoro
is imported to start network service.In the client coroutine a reference to remote server is obtained using the name the server is registered with.
locate()
is a generator function so it must be called with yield.
The above client and server programs can be run either on the same computer or on different computers on the same network. Even if they are run on the same computer, the client and server coroutines are considered remote to each other. The client program can be run multiple instances simultaneously, if desired.
If the client and server programs are run on computers on the same
network (i.e., they share same router or gateway), then the schedulers
discover each other. If the programs are on computers on different
networks, the scheduler in the client program needs to be informed
about the location of server’s scheduler. This is done by adding the
line yield AsynCoro().peer('remote_node')
before using name
location, where remote_node is either the IP address or name of the
remote peer.
7.4. Distributed Communicating Processes¶
While RCI (Remote Coroutine Invocation) provides API for creating pre-defined functionality
that can be executed remotely, module discoro
provides support
for clients to send computations that can be executed remotely,
optionally running them in parallel in separate processes to use
multiple processors. See Distributed Communicating Processes (discoro) for details.
Program below sends rcoro_proc generator function to a server
running discoronode.py
program, for creating (remote) coroutines
to execute compute which simply sleeps for given number of seconds
and sends back the same number (as the result). Coroutine
client_proc creates discoro AsynCoro in the client
program itself (alternately, discoro.py
can be run as a separate
program to which multiple clients can schedule computations). The same
coroutine is also set as status_coro before scheduling computation
so all status notifications are sent to it as messages, which are
processed to know which discoro server processes are available to
schedule new jobs, which jobs are finished etc. Alternately, jobs can
simply be schduled to execute and scheduler will load balance
coroutines; see discoro_client.py and discoro_client2.py in ‘examples’
directory under the installation path.:
import asyncoro.discoro as discoro
import asyncoro.disasyncoro as asyncoro
# this generator function is sent to remote server to run
# coroutines there
def rcoro_proc(n, coro=None):
yield coro.sleep(n)
raise StopIteration(n)
def client_proc(computation, njobs, coro=None):
status = {'submitted': 0, 'done': 0}
def submit_job(where, coro=None):
arg = random.uniform(5, 20)
rcoro = yield computation.run_at(where, rcoro_proc, arg)
if isinstance(rcoro, asyncoro.Coro):
print('%s processing %s' % (rcoro.location, arg))
else:
print('Job %s failed: %s' % (status['submitted'], str(rcoro)))
status['submitted'] += 1
discoro.Scheduler()
computation.status_coro = coro
if (yield computation.schedule()):
raise Exception('Failed to schedule computation')
# job submitter assumes that a process can run at most one coroutine at a time,
# although more than one coroutine (many thousands, if necessary) can be run
while True:
msg = yield coro.receive()
if isinstance(msg, asyncoro.MonitorException):
rcoro = msg.args[0]
if msg.args[1][0] == StopIteration:
print('Remote coroutine %s finished with %s' % (rcoro.location, msg.args[1][1]))
else:
asyncoro.logger.warning('Remote coroutine %s terminated with "%s"' %
(rcoro.location, str(msg.args[1])))
status['done'] += 1
# because jobs are submitted with 'yield' with coroutines,
# and 'submitted' is incremented after 'yield', it is
# likely that more than 'njobs' are submitted
if status['done'] >= njobs and status['done'] == status['submitted']:
break
if status['submitted'] < njobs:
# schedule another job at this process
asyncoro.Coro(submit_job, rcoro.location)
elif isinstance(msg, discoro.StatusMessage):
# a new process is ready (if special initialization is
# required for preparing process, schedule it)
if msg.status == discoro.Scheduler.ProcInitialized:
asyncoro.Coro(submit_job, msg.location)
else:
asyncoro.logger.debug('Ignoring status message %s' % msg)
yield computation.close()
if __name__ == '__main__':
import logging, random
asyncoro.logger.setLevel(logging.DEBUG)
computation = discoro.Computation([rcoro_proc])
# run 10 jobs
asyncoro.Coro(client_proc, computation, 10)
To test, run discoronode.py
program on a computer in local network
and this client program.
If the tasks and client don’t need to communicate (as in the example above), it is easier to use dispy project. If the tasks and client need to communicate, separating scheduler and client would make it easier. See ‘discoro_client*.py’ files in the examples directory under installation directory of asyncoro for additional use cases.