5. Distributed Programming (disasyncoro)

As described in Asynchronous Concurrenct Programming (asyncoro), when a coroutine is created an AsynCoro (singleton) instance is created if one is not already created. AsynCoro starts a scheduler that manages coroutines created in that program. AsynCoro in asyncoro does not start network services for distributed programming. However, when disasyncoro is used instead of asyncoro, network services are started so that coroutines can send messages to / receive messages from coroutines and use other network services in remote asyncoro. See also Distributed Communicating Processes (discoro) which supports distributed and parallel computing, so the clients can send computations (Python functions, files, modules etc.) to remote servers for executing coroutines there.

Network services, such as message passing, work transparently so that once references are obtained, they can be used just as with concurrent programming.

5.1. Examples

See Distributed Programming in tutorial for examples. There are many illustrative use cases in ‘examples’ directory under where asyncoro module is installed.

Following is a brief description of the examples included relevant to this section:

  • examples/tut_client.py and examples/tut_server.py exchange messages with one-to-one message passing to exchange messages between two remote coroutines. See also examples/remote_coro_client.py and examples/remote_coro_server.py where the client and server use ‘secret’ feature for exclusivity (to prevent other users from using them in a shared network).
  • examples/remote_channel_client.py and examples/remote_channel_server.py use broadcasting channels to exchange messages among a sender and local/remote recipients.
  • examples/chat_chan_client.py and examples/chat_chan_server.py use broadcasting over a channel to send messages to all participants to implement a simple chat (message) service. To use this and other ‘chat’ examples below, run the server, and multiple clients (either on same machine or other machines in local network). Messages typed in one client show up at other clients.
  • examples/chat_sock_client.py and examples/chat_sock_server.py use asynchronous network programming, coroutines and message passing to implement a chat (message) server that is used by clients to broadcast messages.
  • examples/rci_monitor_client.py and examples/rci_monitor_server.py illustrate another approach to execute remote coroutines: The server registers a function and client requests to execute coroutine with that function. Compare this to discoro_client.py where the client sends the computation itself to the remote server, so the client can execute arbitrary functions, whereas with RCI (Remote Coroutine Invocation) only registered functions can be executed by clients. The client uses monitoring to get the exit status of remote coroutines (which fail if the argument is an odd number).

5.2. Location

Some entities in asyncoro, such as coroutines, channels, are capable of providing services for remote peers. These entities should register themselves with names so that remote peers can use those names to get references. These references can then be used to invoke methods described in this section.

Such entities have location, which is network IP address and TCP port, associated to them that indicates where their origin is so that if a method is invoked on a reference at a remote peer, that peer knows where to direct that method. While these entities are being located (to get the reference) at remote peers, if only the name they are registered with is used, then all the known asyncoro’s are queried to lookup with the given name. If the name is unique across all the asyncoro’s, the reference would be for that specific entity. However, if the same name is used in more than one asyncoro, then the reference obtained may not be for the desired entity. In such case, the location of the asyncoro where the entity is registered can be given. The locations can be obtained by looking up for names of asyncoro with scheduler.locate(). If the host and TCP port are already known, then the locations can be created with:

class disasyncoro.Location(host, tcp_port)

host is either host name or IP address and tcp_port is TCP port where asyncoro is running. This information is printed by the scheduler when it starts.

5.3. AsynCoro

As explained in Asynchronous Concurrenct Programming (asyncoro), AsynCoro scheduler is started automatically (when a coroutine is started, for example) with default initialization. If the scheduler should be initialized with different parameters, it should be initialized before creating any coroutines, channels etc. with:

class disasyncoro.AsynCoro(node=None, udp_port=None, tcp_port=0, ext_ip_addr=None, name=None, discover_peers=True, peers=[], secret='', certfile=None, keyfile=None, notifier=None, dest_path_prefix=None, max_file_size=None)

node must be either host name or IP address (in either IPv4 or IPv6 format) and udp_port must be port number where asyncoro starts network services. If udp_port is set to 0, then port 51350 is used.

By default, any available port is used for tcp_port where asyncoros exchange messages. However, this port can be set to a specific number, which can be useful when dealing with firewall/port forwarding.

name, if given, must be a string that is unique for each instance of asyncoro. If this option is not given, it is set to string representing location where network services are running. Using name, the location where a specific asyncoro instance is running can be found with asyncoro’s locate(name)() method. This location can then be used to find and use services, such as sending messages to coroutines and channels running at that location.

If discover_peers is True (default), asyncoro scheduler broadcasts message to detect peers running on local network. If this option is False, that message is not broadcast, so this asyncoro will not detect peers and peers will not detect this asyncoro. Message passing with remote coroutines, channnels etc. can only be estabilished with them only on known peers.

secret is a string that must be set to same for all asyncoros that communicate. It is used to compute a hash that is used and checked in all communication, so that only asyncoros that use same secret can be peers. Note that the messages are not encrypted; if security of messages is needed, SSL options certfile and/or keyfile can be used.

certfile and keyfile are as per SSL module. See SSL (Security / Encryption) for more information.

ext_ip_addr must be IP address (in either IPv4 or IPv6 format) of gateway/firewall when asyncoro is used behind that gateway/firewall and other peers are outside that gateway/firewall. With this option, peers can communicate with asyncoro running on a local network and other peers are on remote network.

dest_path is path where files from remote peers are saved, when send_file() method is called (see below). If dest_path is not set, asyncoro will initialize it to tempfile.gettempdir(). If send_file sets dir to a relative path, the file will be saved at dest_path + dir.

max_file_size is maximum number of bytes a transferred file can be. If it is set to None or 0, there is no limit; if it is set to a number, then any file bigger than that limit is is rejected when send_file is called.

Note that Python programs can’t use multiple cores on an SMP node, other than by using multiprocessing. Instead of using multiprocessing in the case of SMP, multiple programs, each using asyncoro, can be started on a machine with multiple cores (as many times as needed), all on same udp_port, but different tcp_port per instance; see ‘discoro’ module in which multiple instances are started on a node with multiple processors. If tcp_port is set explicitly, each instance must use unique tcp_port (but same udp_port). if tcp_port is not set explicitly, asyncoro chooses differetn tcp_port for each instance. Within each such program, coroutines can use Asynchronous Concurrenct Programming (asyncoro) and API below to exchange messages among coroutines in all programs - although all programs are running on same machine in the case of SMP, coroutines running in one program are distributed/remote for coroutines running in another program.

When disasyncoro is used instead of asyncoro, following additional methods are available in AsynCoro:

scheduler.locate(name, timeout=None)

Note

This method must be used with yield. While locate methods of coroutines, channels etc., are static methods (to be invoked with the class) this method should be invoked on the scheduler instance (which can be obtained, for example, with scheduler = AsynCoro()).

Finds and returns location where peer AsynCoro with name is running. If no AsynCoro with name is currently running, then it will wait until a peer with name starts. This location can be used in other methods. The timeout is the maximum number (or fractions) of seconds for finding location. If timeout seconds elapse (if no peer with name is running or reachable), None is returned.

scheduler.peer(loc, udp_port=51350, stream_send=False, broadcast=False)

Note

This method must be used with yield.

Contacts asyncoro at given loc to establish communication. If all asyncoros are running on a local network, they can find each other automatically through UDP broadcasting. However, asyncoros running at remote network(s) need to be explicitly added with peer method. It is not needed to run this method on both sides - if one asyncoro successfully runs this method, they will find each other. By default all asyncoro instances running on a single node (can) use same UDP port, so peer method can be called once per node, even when more than one asyncoro instance is running on that node.

loc can be an instance of Location, host name or IP address. If it
is host name or IP address, it is treated as a Location with port set to 0.

If stream_send is False, asyncoro’s scheduler may send one message per connection; i.e., the process that transfers messages opens a connection to the peer, sends a message and the connection is closed, unless there are more messages queued for the same peer (in which case, same connection is used to send the message). As creating sockets and making connections is expensive, this may be inefficient. If stream_send is True, then the connection to given peer is not closed after sending all pending messages - the connection is kept open and used to send messages when they are queued. This can significantly improve performance in some cases. However, keeping connections alive takes system resources, so this option should be used when delays in message transfer are to be minimized at the cost of keeping sockes open to peers. If stream_send is True and port of loc is 0, then all all asyncoro’s (if more than one asyncoro is running on that node) running on the node will be streamed; if port is not 0, then messages to only asyncoro on that TCP port will be streamed.

If broadcast is True, then the information about source (asyncoro with which peer method is invoked) is broadcast on the network of loc. If source and target are on same network, as mentioned above, they discover each other. If they are on different networks, loc can be used to communicate with each peer by calling peer method for each peer. However, if there are many peers on remote network, broadcast can be set to True with one peer on the remote network and all available peers on that network will be discovered.

stream_send can be used to enable / disable streaming as and when necessary by calling this method (with stream_send=True to enable streaming or with stream_send=False to disable streaming), even if peer is already discovered..

scheduler.peers()

Is a regular function (not a generator method to create coroutine) that returns currently known peers as list. Each peer is an instance of Location.

scheduler.peer_status(coro)

Registers coro to receive notifications of peer status. Whenever asyncoro scheduler discovers a new peer or a peer disconnects, it sends a PeerStatus instance message to coro. PeerStatus has 3 attributes: location which is an instance of Location, name is name of peer and status which is either PeerStatus.Online if peer is discovered and PeerStatus.Offline if peer disconnected.

If this method is called with None (instead of a coroutine), peer status messages are not sent any more (to any previously registered coroutine).

scheduler.send_file(location, file, dir=None, overwrite=False, timeout=None)

Note

This method must be used with yield.

Sends (transfers) file file to peer at location. If dir is not None, it must be a relative path, in which case, the file is saved under peer’s dest_path + dir. This method returns 0 if the file was successfully saved at peer, 1 if the file exists at peer with same size, timestamp and permissions (so there is no need to transfer), os.stat structure if file exists at destination with different size/timestamp/permissions but overwrite is False, and -1 in case of any error. If the return value is 0, the sender may delete the file with del_file() later.

scheduler.del_file(location, file, dir=None, timeout=None)

Note

This method must be used with yield.

Deletes file file from peer at location. file and dir must be same as used to send the file (with send_file()). This method returns 0 if the file was successfully deleted and -1 in case of any error.

5.4. Distributed Coroutines

When disasyncoro module is used instead of asyncoro module, additional methods described below are added to Coroutine class to register them with names so they can be located by remote coroutines, used for message passing to/from remote coroutines, monitoring remote coroutines etc.

disasyncoro.register(name=None)

A coroutine that can be used for remote services must first register itself with register(name=None). If name is None, then name of the generator function of coroutine is used for registering. The name must be unique at that asyncoro; registering a different coroutine with the same name will fail. If same name is used for registering in different asyncoro’s, then locate() method can be given the location of specific asyncoro to get a reference to coroutine with that name at that location.

Coro.locate(name, location=None, timeout=None)

Note

This method must be used with yield.

This static method finds and returns reference to coroutine registered with name. If location is given (e.g., obtained by asyncoro’s locate), only the peer running at that location is queried for coroutine with that name; otherwise, all known peers are queried. If coroutine is successfully located, the return value is an instance of Coroutine. A remote coroutine reference, rcoro can be used for message passing with rcoro.send() and rcoro.deliver() methods to it, and monitor() method can be used to receive exit status notifications when it is finished, or can be terminated with rocro.terminate().

If a coroutine running at location loc1 registers itself with coro.register("server_coro"), then coroutines running at other peer(s) can obtain references to it with rcoro = yield Coro.locate("server_coro", loc1). If “server_coro” is unique among all asyncoro’s, then the same reference can be obtained with rcoro = yield Coro.locate("server_coro"). Following methods can then be used with the remote coroutine reference rcoro:

rcoro.send(msg)

Sends message msg to the coroutine rcoro (running at “loc1”). That remote coroutine will receive the messages with msg = yield coro.receive(). Messages are sent asynchronously - they are queued to be sent and a daemon coroutine sends them in the order submitted. Thus, the caller of rcoro.send() cannot determine if/when a message has been sent - in case of network failures, the message may be dropped. rcoro.deliver() may be used instead to guarantee message was sent or not. Alternately, the recipient can send acknowledgement back.

rcoro.deliver(msg, timeout=None)

Note

This method must be used with yield as recvd = yield rcoro.deliver(msg).

Sends message msg to the recipient, waits for acknowledgement from remote asyncoro that the message has been put in recipient’s message queue and returns that status. If the status is 1, the message has been successfully delivered (when recipient calls receive(), it gets the queued messages in the order they are received). Otherwise (e.g., timeout occurs before delivery, remote corutine reference rcoro is no longer valid), the status returned is 0.

coro.monitor(rcoro)

Note

This method must be used with yield as status = yield coro.monitor(rcoro).

Sets coro (the coroutine that invoked this method) as a monitor of remote coroutine rcoro. If successful, the value yielded is 0. Then when rcoro terminates (either normally or abnormally), the exit status is sent to the monitor coro.

coro.notify(rcoro)

Sets rcoro as a monitor of coroutine coro (the coroutine that invoked this method). If successful, the value yielded is 0. Then when coro terminates (either normally or abnormally), the exit status is sent to the monitor rcoro.

rcoro.terminate()

Terminates the coroutine.

5.5. Distributed Channels

Similar to coroutines, channels can be registered and such channels can be used in remote coroutines for message passing. A channel, say, chan, can be registed with

chan.register(name=None)

so that coroutines at peers can obtain a reference to it. If name is None, name of the channel created with is used for registering. The name must be unique, at least at the asyncoro where it is registered. If there are duplicate names at different asyncoros, finding a channel with Channel.locate() should be used with specific location (peer).

chan.unregister()

A registered channel can be unregistered with chan.unregister(name=None); the name must be same as the one used to register.

Channel.locate(name, location=None, timeout=None)

Note

This method must be used with yield as chan = yield Channel.locate("achannel").

Finds and returns reference to channel registered with name. If location is given (e.g., obtained by asyncoro’s locate), only the peer running at that location is queried; otherwise, all peers are queried. If channel is successfully located, the return value is an instance of Channel. A remote channel reference can be used for message passing with send() and deliver() methods to it.

If a channel, chan, is created and registered at location loc1 with chan.register("server_channel"), then coroutines running at other peer(s) can obtain references to it with rchan = yield Channel.locate("server_channel", loc1). If “server_channel” is unique channel among all the peers, then same can be obtained without giving loc1: rchan = yield Channel.locate("server_channel"). Following methods can then be used with the remote channel reference rchan:

rchan.send(msg)

Sends message msg to the channel (“server_channel” at “loc1”), so that current subscribers to that channel will receive that message. send() sends messages asynchronously - messages are queued to be sent and a daemon coroutine sends them in the order submitted. Thus, the caller of send() cannot determine if/when a message has been sent - in case of network failures, the message may fail to reach the recipient. deliver() may be used instead to guarantee message was sent or not.

rchan.deliver(msg, timeout=None, n=0)

Note

This method must be used with yield as recvd = yield rchan.deliver(msg).

Similar to send(), except that this method must be used with yield and it returns status of delivering the message to the channel. The method will wait until there are at least n subscribers to the channel until timeout seconds. Then it delivers the message to each of the subscribers. The value returned is the number of total recipients of the message. This can be less than n in case of network failures, timeouts, or some subscribers leaving while message is being delivered, or more than n if there are more than n subscribers. If n is 0, then the message will be delivered to all current subscribers. In any case, timeout is maximum amount of time in seconds (or fraction of second) for delivering the message.

rchan.subscribe(subscriber, timeout=None)

Note

This method must be used with yield as n = yield rchan.subscribe(coro).

Subscribes subscriber to rchan so that messages sent to rchan will be sent to subscriber. If subscriber is a coroutine, then messages can be received with msg = yield coro.receive(). Note that messages sent directly to the coroutine from other coroutine will also be received with the same method, so if it is necessary to distinguish if a message is received directly to or through a channel, then messages can be encapsulated with appropriate information.

rchan.unsubscribe(subscriber, timeout=None)

Note

This method must be used with yield as yield rchan.unsubscribe(coro).

Removes subscriber from rchan‘s subscribers so it will no longer receive messages sent to rchan.

5.6. RCI (Remote Coroutine Invocation)

As explained above, coroutines running at a location can be used to send messages and monitoring. This, however, cannot be used to start coroutines at remote peers. RCI, similar to Remote Procedure Call (RPC), allows peers to execute coroutines with registered generator methods. With RCI, a generator method should be used first to create and register. Coroutines running in remote peers can then obtain a reference to it with locate() method (similar to distributed coroutines and channels). The reference can then be used to start a coroutine with arguments.

class disasyncoro.RCI(method, name=None)

Creates an RCI instance with given method. method should be a generator function, similar to coroutine methods; in fact, the given method is used to create coroutine when remote coroutine calls this method. If name is not given, generator method’s name is used for registering it. As in the case of coroutines and channels, the name must be unique, at least at the asyncoro where it is registered.

rci.register()

Registers rci with the scheduler so it can be located (i.e., a reference to it can be obtained) by remote coroutines.

Assuming an RCI is created with rci1 = RCI(method1), it can be registered with rci1.register() so remote coroutines can locate it (see below) with the name “method1”.

locate(name, location=None, timeout=None)

Note

This method must be used with yield as rci = yield RCI.locate("rci_1").

Get an instance to RCI with the name name. Similar to distributed coroutines, location can be used to query specific asyncoro and timeout is number (or fraction) of seconds to locate.

The reference returned from locate() can be used to create (remote) coroutines (where RCI has been registered). If, as above, “method1” is registered at a remote peer at location loc1, a reference to it can be obtained with drci = yield RCI.locate("mehod1", loc1). If the name “method1” is unique among RCIs at all peer asyncoros, the same can be obtained with drci = yield RCI.locate("mehod1").

If locate() is successful, the reference obtained can be used to start remote coroutines (where RCI has been registered). With the above example, drci1 can be used as many times as necessary to create coroutines at that remote peer with rcoro = yield drci1(*args, **kwargs). The return value rcoro is reference to remote coroutine. This can be used for message passing, monitoring etc.

5.7. SSL (Security / Encryption)

If peers are on public / remote networks, SSL (Secure Sockets Layer) can be used to encrypt all communication, including data, so that the connection is private - only the sender and recipient see the data exchanged and other observers see encrypted data. disasyncoro provides a simple mechnism where digital certificates / symmetric keys are used to encrypt data on one side and decrypt data on the other side. Below are set of commands to generate the certificates using openssl tool (this is just an example - there are other approaches / methods):

openssl req -x509 -newkey rsa:2048 -keyout sskeycert.pem -nodes -out sskeycert.pem -sha256 -days 1000

This command generates self-signed key and certificate pair in file sskeycert.pem that should be used as certfile parameter where SSL is used. It is also possible to generate key and certificate in separate files, and use keyfile and certfile parameters appropriately.

Once the key/certificate pair is generated (or obtained by other means), they should be copied to each of the peers (over a secure channel, such as ssh) and the peers created with asyncoro.AsynCoro(..., certfile=sskeycert.pem) (or if key and certificate are stored in separate files, with asyncoro.AsynCoro(..., certfile=sscert, keyfile=sskey).