6. Distributed Communicating Processes (discoro)

Note

asyncoro has been renamed pycos to better reflect its functionality and to avoid confusion with asyncore module.

discoro module provides API for sending computation fragments (code and data) to remote server processes for executing distributed communicating processes. Whereas RCI (Remote Coroutine Invocation) provides API for creating remote coroutines with pre-defined generator functions, discoro’s API provides generic framework that can be used by clients to send different computaions to create coroutines at remote servers. There are three components in discoro:

  • Node / Servers (discoronode) program should be running on each of the nodes that run the servers to execute coroutines for clients,
  • Scheduler (discoro) that schedules client computations, manages nodes, remote coroutines etc.
  • Computation API for clients to create computations, schedule it with the scheduler and to run remote coroutines.

6.1. Examples

There are many illustrative use cases in ‘discoro_*.py’ files in the ‘examples’ directory under where asyncoro module is installed. To use these examples, run ‘discoronode.py’ program on one or more of the nodes (most examples use one or two servers) along with an example file. The examples are written to illustrate various features, and not necessarily in a simple way, or error checking is not comprehensive. The comments in the programs explain usage / notes.

Compared to dispy project, there are a few additional steps involved in distributing and getting results with asyncoro/discoro; however, asyncoro/discoro offer many features, such as communicating with computation (even computations communicating among themselves), data streaming, live analytics etc.

Following is a brief description of the examples included relevant to this section:

  • discoro_client1.py illustrates how to use discoro to distribute computations to remote servers to run them as coroutines on those servers and get results back to client.
  • discoro_client2.py is a variation of discoro_client1.py. In this example, http server is used to monitor cluster, nodes, remote coroutines.
  • discoro_client3.py shows how to exchange messages with objects (instances of class) between client and remote coroutines.
  • discoro_client4.py sends files at the client to remote process to execute computations that process those files and the remote process in turn sends the results in files back to the client.
  • discoro_client5.py runs an external program (discoro_client5_proc.py) at remote servers. The program reads from standard input and writes to standard output. Asynchronous pipes and message passing are used to send input from client to this program executing on remote servers, and get the output back to client.
  • discoro_client6.py uses streaming of data to remote coroutines for efficient communication. The example also shows how to implement live/real-time analytics and send them to client.
  • discoro_client6_channel.py is same as discoro_client6.py, except it uses channel to broadcast data to remote coroutines.
  • discoro_client7.py is an alternate implementation of discoro_client1.py; it uses messages from discoro scheduler to schedule remote coroutines and get results.
  • discoro_client8.py demonstrates that long-runnning computations without yield often can be executed. In this case time.sleep is used to simulate computation. Note that time.sleep blocks entire asyncoro framework so no other coroutines can execute until next yield. With version 4.1 (and above) I/O processing, message passing, sending heartbeat messages to scheduler etc. are handled by a separate (called “reactive”) asyncoro scheduler that is not affected by user’s coroutines. So messages sent by client are received and queued by reactive scheduler.
  • discoro_client9_node.py uses status messages from discoro scheduler to distribute data files to nodes and run node specific setup coroutine to load the data in memory. This data is then processed in computations to illustrate in-memory processing. This example doesn’t work with Windows (due to lack of ‘fork’ in Windows), so nodes running Windows are filtered out using DiscoroNodeAllocate.
  • discoro_client9_server.py is similar to discoro_client9_node.py above, except that instead of initializing (memory in) nodes, each server in each node is initialized by distributing one file per server (note that one node may run as many servers as there are processors on that node), which is then read in memory on that server for in-memory processing at server level.
  • discoro_httpd1.py shows how to use HTTP Server module to provide HTTP interface to monitor discoro cluster.
  • discoro_httpd2.py is a variant of discoro_httpd1.py to use status_coro to process messages from discoro scheduler (in this case just to print when a remote coroutine is finished) while also using HTTP Server (which chains messages from discoro scheduler to client’s status_proc).
  • discoro_ssh_ec2.py is a variation of discoro_client8.py that uses Amazon EC2 for cloud computing; see Cloud Computing for more details.

6.2. Node / Servers

discoronode program (discoronode.py) is used to start server processes at a node. These server processes are used by discoro scheduler to run computations submitted by clients. The program, by default, starts one server for each processor available so that CPU intensive computations can utilize all the processors efficiently. Each server runs as a separate process so computations running in one server process don’t interfere with computations in another server process on the same node. However, multiple coroutines can be scheduled on one server so that if computations have I/O (such as communicating with other computations / client, or reading/writing data), asyncoro can run another coroutine that is ready to execute on that server process. All coroutines running in one server process share the address space, run on one CPU; however, as asyncoro doesn’t pre-empt a running coroutine until yield is used, there is no need to use locking critical sections, as done with threads. If all computations are same and do not need to communicate with each other, then dispy project can be used.

The program takes following options to customize how the servers are started.

  • -c option specifies number of instances of discoro server processes to run. Each server process uses one processor. If -c option is used with a positive number, then that many processors can be used in parallel; if it is negative number, then that many processors are not used (from available processors) and if it is 0 (default value), then all available processors are used.

  • -i or --ip_addr is same as node option to AsynCoro

  • --ext_ip_addr is same as ext_ip_addr option to AsynCoro

  • -u or --udp_port is same as udp_port option to AsynCoro

  • -n or --name is same as name option to AsynCoro

  • --dest_path is same as dest_path option to AsynCoro

  • --max_file_size is same as max_file_size option to AsynCoro

  • -s or --secret is same as secret option to AsynCoro

  • --certfile is same as certfile option to AsynCoro

  • --keyfile is same as keyfile option to AsynCoro

  • --min_pulse_interval n specifies minimum pulse interval that can be given as pulse_interval by client computations. The default value is MinPulseInterval defined (as 10 seconds) in discoro module. As nodes send availability status (CPU, memory and disk avaialability), the clients may want this inofrmation more frequently than at MinPulseInterval, in which case, smaller value can be specieid with this option.

  • --max_pulse_interval n specifies maximum pulse interval that can be given as pulse_interval by client computations. See min_pulse_interval above.

  • --zombie_period n is maximum number of seconds that discoronode remains idle (i.e., doesn’t run any computations for current client) before the node closes current computation so another computation may use node.

  • -d or --debug option enables debug log messages.

  • --tcp_ports n1-n2 or --tcp_ports n option can be used to specify list of TCP ports to be used by servers. Without this option a server uses any available (dynamic) port, which can be a problem with remote servers or cloud computing that require firewall to be configured to forward ports. With tcp_ports, specific range of ports can be used and those ports can be configured for port forwarding. The range can be given either as n1-n2, in which case ports from n1 to n2 (both inclusive) will be used, or as single number n. tcp_ports can be used as many times as necessary to add different ranges, or different ports. If number of ports listed is less than number of servers (based on -c option, or number of availalble CPUs), discoronode will use ports beyond the highest listed port; thus, if number of servers to start is 8, and –tcp_ports 2345-2347 –tcp_ports 3799 is given, then servers will use ports 2345, 2346, 2347, 3799, 3780, 3781, 3782, 3783, or if, say, just –tcp_ports 51347 is used, then ports from 51347 to 51354 are used.

  • --serve n option indicates number of computations/clients to serve before exiting. The default value of -1 implies no limit and any positive number causes discoronode to quit after running that many computations. This option can be used with Docker Container to run each computation in a new container, so one computation starts with the same environment the docker image was built with.

  • --daemon, if given, indicates that discoronode shouldn’t read standard input. Without this option and started with command-line, discoronode offers a choice of commands that can be input to get status or quit. Currently supported commands are:

    • “status” shows status of each of the processes, such as number of coroutines being executed for a computation
    • “close” closes currently executing computation (if any). This is equivalent to computation calling “close” method. If any coroutines are being executed for that computation, they will be killed. A new computation can then use the server.
    • “quit” or “exit” command causes discoronode to stop accepting any more coroutines and when all coroutines are done, closes the computation and quits.
    • “terminate” kills currently executing coroutines, closes computation and quits.
  • --service_start HH:MM, --service_stop HH:MM and --service_end HH:MM set time of day when discoro service can be used by clients. The time is given as hours of day in 24-hour format, a colon and minutes of hour. If start time is not given, currrent time is assumed as start time. Either stop time or end time must also be given. After stop time new jobs will not be accepted, but currently running jobs will continue to execute. At end time any running jobs will be killed. For example, if service_start is set to 17:00, service_stop set to 07:00 and service_end is set to 08:00, the node will execute jobs from 5PM, stop accepting new jobs at 7AM (next day), and kill any running jobs at 8AM. Then it will not accept any jobs until 5PM.

  • --clean indicates that discoronode should kill any previous server processes left running. When discoronode server processes are started, they store process ID in files on file system. When the server processes quit, they remove the files. If the processes are terminated, for example, by killing them explicitly or due to an unforeseen exception, then the files are left behind and the next time the server processes notice the files from previous run and refuse to start. clean option can be used to request the server process to kill previous servers. This should be done only if previous process is not running, or acceptable to kill the process if it is running.

  • --peer location requests discoronode to contact given peer to estabilish communication with it. location should be given in the form node:port where node is either host name or IP address and port is TCP port where peer is running. This option can be used multiple times to detect multiple peers.

    A typical use case where client’s computations don’t communicate with computations on other discoronodes (i.e., each coroutine executing on a discoronode communicates with coroutines at client or other local coroutines executing on that discoronode only) can be implemented with running each discoronode without discover_peers and peer options, and then start scheduler / client. (The scheduler and client use discover_peers so they detect all available discoronode servers, even the ones started without discover_peers option.)

    If scheduler is already running, peer option can be used with location where scheduler is running so that scheduler and new discoronode server can detect each other; alternately, discover_peers can be used, but in that case other discoronode servers will also detect new server.

  • --save_config <file> saves configuration (various options to start dispynode) in given file and exits. See --config below.

  • --config <file> loads configuration (various options to start dispynode) from given file (saved with --save_config option).

    --save_config can be used to save most of the options (certain options, such as ip_addr are unique to a node) and --config can be used to run discoronode with those options on many nodes.

6.3. Scheduler

discoro scheduler schedules computations, keeps track of available nodes, server processes etc. Client computations, constructed with Computation, are sent to the scheduler. The scheduler queues the client computations and processes one each time, until the computation closes. Even if currently scheduled computation is not utilizing all the servers, the scheduler doesn’t schedule any other computations, so that one computation’s state doesn’t interfere with another computation’s. That is, currently scheduled computation reserves all the available servers.

Scheduler can be started either as a separate program, or from within the client program. If multiple client programs can schedule computations simultaneously, scheduler should be started as a program on a node; then the scheduler will schedule them in the order submitted.

6.3.1. Private/Local Scheduler

If only one client uses the nodes, then it is easier to start the scheduler in discoro module with:

Scheduler()

in the client program before computation is scheduled with compute.schedule() (see below). There is no need to hold reference to the object created, as the methods of scheduler are not to be used directly by the client; instead, computation created with Computation should be used to run remote coroutines. If necessary, options used in AsynCoro can be passed, such as node to specify host name or IP address to use for distributed computing. When the scheduler starts, it detects discoro nodes and servers on the configured network. See ‘discoro_client*.py’ files in examples directory for use cases.

6.3.2. Remote/Batch Scheduler

If more than one client may use the scheduler simultaneously, scheduler should be started by running the discoro program (discoro.py). The program takes following options (same as discoronode, except for starting servers):

  • -i or --ip_addr is same as node option to AsynCoro
  • --ext_ip_addr is same as ext_ip_addr option to AsynCoro
  • -u or --udp_port is same as udp_port option to AsynCoro
  • -t or --tcp_port is same as tcp_port option to AsynCoro
  • -n or --name is same as name option to AsynCoro
  • --node <host name or IP> can be used as many times as necessary to list name or IP address of nodes in remote networks. Nodes in local network are automatically found, so no need to list them with this option. Moreover, listing only one node per one remote network should be enough - asyncoro finds all nodes in a netowrk by broadcasting.
  • --dest_path is same as dest_path option to AsynCoro
  • --max_file_size is same as max_file_size option to AsynCoro
  • -s or --secret is same as secret option to AsynCoro
  • --certfile is same as certfile option to AsynCoro
  • --keyfile is same as keyfile option to AsynCoro
  • --zombie_period=sec specifies maximum number of seconds a remote server process can stay idle before it closes computation. The default value is 10*MaxPulseInterval, which is 1000 seconds. Once all servers used by a computation close, the computation is discarded so other pending (queued) computations can be run. If zombie_period is set to 0, then idle check is not done, so computations are not automatically closed.
  • -d or --debug option enables debug log messages.
  • --daemon, if given, indicates that discoro scheduler shouldn’t read standard input. Starting the scheduler as background process (i.e., with & in Unix, for example) implies daemon. If not a daemon, the scheduler can be terminated with “quit” or “exit” commands.

When remote scheduler is running on a computer in local network, Computation.schedule() will locate the scheduler automatically. If the scheduler is in remote network, scheduler.peer() method of AsynCoro should be used so asyncoro can locate the scheduler.

Note that discoro scheduler runs jobs for at most one computation at any time. Other computations are queued and wait for their turn in the order submitted; when currently running computation finishes, next computation in queue is made active so its jobs can be run.

Examples discoro_* use local scheduler with Scheduler() assuming that no other clients may be using nodes. If multiple clients share nodes, that statement should be removed (or commented) and discoro.py program must be running on a machine in local network. It is also possible to use scheduler running in a remote network in which case peer method of AsynCoro should be used to establish communication first.

6.4. Computation

discoro module’s Computation provides API for client programs to package computation fragments, send it to scheduler, and submit coroutines to be executed at remote server processes to execute distributed communicating processes. A computation’s jobs (remote coroutines) at a remote server run with only the components pacakge. A remote coroutine can expect to have only asyncoro module available; any other modules, global variables etc. need to be initialized appropriately. If necessary, initialization can be done by scheduling a job on remote servers, e.g., to read data in a file, before running other jobs, which can expect the side effects of setup job. See discoro_*.py files in examples directory (under where asyncoro module is installed) for different use cases. All coroutines at a server process (if more than one job is scheduled concurrently) run in one thread / processor, share the address space, and run concurrently.

Computation(self, components, pulse_interval=(5*MinPulseInterval), node_allocations=[], status_coro=None, node_setup=None, server_setup=None, disable_nodes=False, disable_servers=False, peers_communicate=False):
  • components must be a list, each element of which can be either a Python function class module, or path to a file. These computation fragments are sent to discoro servers for execution. Once the computation is scheduled (i.e,. schedule method is finished), generator functions can be run on the nodes / servers. These jobs can use the components packaged.

    If a component is a file, it is stored in (remote) node’s directory $TMPDIR/asyncoro/discoro, where $TMPDIR is as returned by Python’s tempfile.gettmpdir(), whereas jobs schedued later on servers are executed in server’s directory discoroproc-$n under node’s directory, where $n is a number from 1 to number of servers executing on that node. Thus, a computation can access a file transferred with components (and by node_available) at parent directory of current working directory - see discoro_client9_node.py for an example where a file transferred to nodes. Python’s sys.path is set to include both server process’s directory and node directory so loading modules works without additional steps.

    If a component is a Python code fragment (function, class), then this would be sent once to the node and all the servers would’ve been initialized with these definitions before any jobs are executed. It is not necessary for all functions used later to submit jobs later to be listed in compooents; however, the scheduler will then send definitions (code) for such functions, which is less efficient. So whenever possible, listing all definitions in components is advised.

  • pulse_interval is interval number of seconds at which pulse messages are exchanged among servers and client to check for faults. If no messages are received within (5 * pulse_interval), the remote end is assumed to be faulty and it is closed. This value must be at least MinPulseInterval and at most MaxPulseInterval. Both of these values are defined in discoro module. The default value of pulse_interval is 2*MinPulseInterval.

    If nodes have psutil module installed, they send node availability status (CPU available in percent, memory in bytes and disk space in bytes) as an instance of DiscoroNodeAvailInfo at pulse_interval frequency. This information is useful for monitoring application performance, filtering nodes for required resources etc. This information is shown in web browser if HTTP Server is used.

  • node_allocations should be a list of DiscoroNodeAllocate instances. When a node is discovered, the scheduler executes allocate method of each allocation with ip_addr, name, platform, cpus, memory, disk arguments. If the method returns a positive number, that many cpus are used on that node. If return value is 0, then that node is not used and if the return value is negative number, then next allocation in node_allocations is applied. See, for example, discoro_client9_node.py that filters out nodes running Windows as this example doesn’t work on such nodes.

    If necessary, DiscoroNodeAllocate can be sub-classed to override allocate method and objects of that sub-class can be passed to node_allocations. However, this works only with private scheduler (i.e., scheduler creted in the client program), but with shared scheduler.

  • status_coro, if given, must be a coroutine. If it is a coroutine, discoro scheduler sends status of remote servers, jobs executed etc., to this coroutine as messages. These messages can be used to schedule jobs.

  • node_setup, if given, must be a generator function. This function is executed at remote discoro node to prepare the node before server processes are created. If the coroutine finishes with value 0, the setup is assumed to be successful and node process creates server process for that computation. Otherwise, node is not used for that computation.

    If node_setup needs to be executed with node-specific arguments, then disable_nodes=True and status_coro can be used to call enable_node when NodeDiscovered message is received. node_speficic is executed in node’s directory $TMPDIR/asyncoro/discoro (where $TMPDIR is as returned by Python’s tempfile.gettmpdir(). See discoro_client9_node.py for an example.

    node_setup runs in a process that doesn’t have networking enabled, so this function can’t communicate with client / other peers. node_setup is not executed on nodes running Windows.

  • server_setup, if given, must be a generator function. This function is executed by each server process on each node (note that a node may run as many server processes as there are processors available on that node). Similar to node_setup, this function is meant to initialize server; disable_servers=True and server_coro can be used to call enable_server to pass arguments specific to each server if necessary. See discoro_client9_server.py for an example.

  • disable_nodes is by default False, in which case the scheduler will initialize a node when it becomes available (the node will execute node_setup if it is set, but without any parameters; i.e., node_setup should not have any formal parameters, except for coro=None keyword argument). If disable_nodes is True, the scheduler will not initialize the node. If Computation has set status_coro, then the scheduler will send NodeDiscovered message to it. The status_coro can then call enable_node method as appropriate (with any additional parameters required to call node_setup). See discoro_client9_node.py where disable_nodes is used and status_coro is used to call enable_node with parameter required to call node_setup.

  • disable_servers is by default False, in which case each (remote) server process will initialize itself as soon it is started by node. If disable_servers is True, the server will wait until Computation enables it with enable_server (with any additional parameters required to call server_setup). See discoro_client9_server.py where disable_servers is used and status_coro is used to call enable_server with parameter required to call server_setup.

  • peers_communicate requests discoro scheduler to inform each remote server (peer) about other servers so computations executing on these servers can communicate. Without this option (default), a computation executing on a remote server can communicate with the client only - servers are not aware of each other.

    Using this option causes each server to establish communication with each other server; this can be expensive, especially if there are many servers in the cluster.

    The computation created as, for example, compute = Computation(...), has following methods:

    compute.schedule(location=None)

    Note

    This method must be used with yield as result = yield compute.schedule().

    Schedule computation for execution. If scheduler is remote and executing other computations, this method will block until those computations close. If successful, result will be 0.

    location, if not None, should be a Location instance refering to where the scheduler is running. If it is None, scheduler must be running at a node in local network; asyncoro will use name resolution to find the scheduler (a coroutine) with the registered name “discoro_scheduler”.

    Note

    All the run* methods below take generator function and arguments used to create coroutines (jobs) at a remote server. If the generator function used in these methods is given as one of the components used to create computation, the code for the function is transferred to the servers once during initialization (thus a bit efficient); otherwise, the code is transferred to the servers each time a run* method is called.

    compute.run_at(where, gen, *args, **kwargs)

    Note

    This method must be used with yield as rcoro = yield compute.run_at(...).

    Run given generator function gen with arguments args and kwargs at where; i.e., create a coroutine at a server with the given generator function and arguments. If the request is successful, rcoro will be a (remote) coroutine; check result with isinstance(rcoro, asyncoro.Coro). The generator is expected to be (mostly) CPU bound and until this is finished, another CPU bound coroutine will not be submitted at the same server.

    If where is a string, it is assumed to be IP address of a node, in which case the coroutine is scheduled at a server at that node. If where is a Location instance, it is assumed to be server location in which case the coroutine is scheduled at that server.

    gen must be generator function, as it is used to run coroutine at remote location.

    args and kwargs must be serializable.

    compute.run(gen, *args, **kwargs)

    Note

    This method must be used with yield as rcoro = yield compute.run(...).

    Similar to run_at, except that the coroutine is executed at a server on a node with least load.

    run_result_at(where, gen, *args, **kwargs):

    Note

    This method must be used with yield as result = yield compute.run_result_at(where, gen, ...).

    Similar to run_at method, except that instead of returning reference to remote coroutine, the call blocks until remote coroutine is finished and its value (either the last value ‘yield’ed in the coroutine or value of rause StopIteration) is returned.

    run_result(gen, *args, **kwargs):

    Note

    This method must be used with yield as result = yield compute.run_result(gen, ...).

    Similar to run_result_at method, except the coroutine may run on any avavilable server.

    run_results(gen, iter):

    Note

    This method must be used with yield as results = yield compute.run_results(gen, ...).

    Runs run_result method for each item in given iterable iter. The return value is list of results that correspond to executing gen with items in iterable in the same order. If gen takes multiple arguments, each item can be given as tuple (i.e., iter would be list of tuples).

    run_async_at(where, gen, *args, **kwargs):

    Note

    This method must be used with yield as rcoro = yield compute.run_async_at(where, gen, ...).

    Similar to run_at method, except that the coroutine will run on some server, even if it is currently running other coroutines. gen is supposed to be not CPU bound (i.e., the coroutine should be mostly waiting for asynchronous operations - I/O events or messages), so running this will not impede other coroutines. discoro scheduler will not “track” these asynchronous coroutines; when computation is closed, scheduler wait for other remote coroutines scheduled with run_at, run_results_at etc. before closing a server, but not for async remote coroutines.

    run_async(gen, *args, **kwargs):

    Note

    This method must be used with yield as rcoro = yield compute.run_async(gen, ...).

    Similar to run_result_at method, except the coroutine may run on any server.

    compute.nodes()

    Note

    This method must be used with yield as nodes = yield compute.nodes().

    Returns list addresses of nodes used / available for computation.

    compute.servers()

    Note

    This method must be used with yield as servers = yield compute.servers().

    Returns list locations of servers used / available for computation.

    close()

    Note

    This method must be used with yield as yield compute.close().

    Requests scheduler to close computation. If any remote coroutines are still pending (except ones created with run or run_at), this method will block until those coroutines are finished. Closing computation causes each server process to remove any files saved or created at that server, remove global variables created by the jobs etc. If the scheduler is shared (i.e., discoronode.py is run as external program), next computation waiting to be scheduled will be allowed to use the cluster to run remote coroutines.

The remote coroutine rcoro obtained with run methods above can be used for message passing or terminated; see Distributed Coroutines. Although it can also be monitored with yield rcoro.monitor(coro), discoro scheduler monitors all coroutines created with run methods and sends MonitorException message to status_coro about remote coroutines.

6.5. DiscoroStatus

When discoro scheduler changes state of nodes, servers or coroutines, it sends the changes as messages to computations status_coro coroutine, if it is initialized to a coroutine before the computation is scheduled. This is so status_coro can schedule coroutines as and when servers are avialable, for example. Each message is an instance of DiscoroStatus, with attributes status and info:

  • status is either NodeDiscovered, NodeInitialized, NodeClosed, ServerDiscovered, ServerInitialized, ServerClosed, CoroCreated, or ComputationClosed.

  • info depends on status: If status is CoroCreated (indicating a coroutine has been created at remote server), then info is an instance of CoroInfo; if status is for a node (i.e., one of NodeDiscovered, NodeInitialized, NodeClosed), then info is an insance of DiscoroNodeAvailInfo for the node; if status is for a server (i.e., one of ServerDiscovered, ServerInitialized, ServerClosed), then info is an instance of Location; if status is CoroCreated, then it is an instance of CoroInfo with following attributes:

    • coro is (remote) coroutine (instance of Coro),
    • *args is arguments used to create coroutine,
    • **kwargs is keyword arguments used to create coroutine,
    • start_time is time when coroutine was created.

6.6. DiscoroNodeAvailInfo

A node’s availability information is sent to Computation‘s status_coro at pulse_interval frequency with an instance of DiscoroNodeAvailInfo, which has 5 read-only attributes:

  • location is instance of Location where node’s coroutine is running. This location is of interest only in node_avaiable (see Computation above) to send files to node. Elsewhere, location’s addr part (which is IP address of node) can be used for allocating CPUs / filtering nodes (with DiscoroNodeAllocate), maintain status information etc.
  • cpu is available CPU as percent. If it is close to 100, the node is not busy at all, and if it is close to 0, the node is rather busy (running compute-intensive tasks on all CPUs). This field is set only if psutil module is available on the node; otherwise it is set to None.
  • memory is available memory in bytes. This is not total memory, but usable memory, as interpretted by psutil module. This field is set only if psutil module is available on the node; otherwise it is set to None.
  • disk is available disk space in bytes for the partition that discoronode uses as given by dest_path option (where client’s files are saved by discoronode and jobs are run). This field is set only if psutil module is available on the node; otherwise it is set to None.
  • swap is available swap space in bytes on the node. This field is set only if psutil module is available on the node; otherwise it is set to None.

6.7. DiscoroNodeAllocate

If a cluster has nodes with different resources and computation requires specific resources, then the computation can use node_allocations list with Computation to control allocation of nodes / servers. Each element in the list must be an instance of DiscoroNodeAllocate with following attributes:

  • node must be either node’s name or an IP address or a regular expression of IP address to match. If it is name, it is resolved to IP address.

    The default allocate method will choose an allocation only if a node’s IP address matches this field.

  • platform must be a Python regular expression. For a computation to use a node, given expression must occur in its platform string, obtained by platform.platform() on the node, ignoring case. Default value is '', which matches any platform. For example, linux.*x86_64 accepts only nodes that run 64-bit Linux.

  • cpus must be an integer. If it is a positive number, then a node must have at least that many servers enabled to be used for the computation. If it is 0 (default), a node with any number of servers is accepted (i.e., no constraints on number of servers).

  • memory must be an integer. If it is a positive number, then a node must have at least that many bytes of memory to be used for the computation. If it is 0 (default), a node with any number of bytes is accepted (i.e., no constraints on amount of memory).

  • disk must be an integer. If it is a positive number, then a node must have at least that many bytes of disk space on the partition used by discoro to be used for the computation. If it is 0 (default), there are no constraints on amount of available disk space on discoronode’s partition. This field may be None if node doesn’t have psutil module.

This class provides allocate method that is called by scheduler when a node is available. The method is called with arguments ip_addr, name, platform, cpus, memory, disk, where ip_addr is IP address of node, name node’s name (either given with --name option to discoronode or node’s host name as obtained by socket.gethostname()), platform is as obtained by platform.platform() on the node, cpus is number of CPUs available, disk is available disk space (see above) if psutil module is available or None otherwise.

This method should return number of CPUs to allocate. If the return value is

  • positive number, that many CPUs are allocated (this number should be less than or equal to cpus called with),
  • 0, the node is not used for this computation
  • negative number, then this allocation is ignored and next item in node_allocations list is applied.

6.8. HTTP Server

asyncoro.httpd module provides provides HTTP interface to monitor and manage discoro servers (nodes and servers) with a web browser; it works with common web browsers, including in iOS and Android devices. It doesn’t require / use apache or other web servers. HTTP server can be created with:

class HTTPServer(computation, host='', port=8181, poll_sec=10, DocumentRoot=None, keyfile=None, certfile=None, show_coro_args=True)

Creates an instance of HTTP server which will listen for connections at given host and port.

  • computation is an instance of Computation whose status will be sent to HTTP client (browser).
  • host should be a string, either IP address or name of the host. The default value of ‘’ will bind the server to all configured network interfaces.
  • port should be a number HTTP server binds to. Web browsers can monitor and manage cluster by connecting to http://<host>:<port> if SSL (https) is not used and https://<host>:<port> if SSL is used.
  • poll_sec is number of seconds (interval) the client waits between update requests from the server. Smaller values of poll_sec will cause more frequent updates so the information shown is more accurate, but cause more network traffic/load. Bigger values of poll_sec are more efficient but the status in browser may not reflect more recent information.

This value can be changed in the browser as well.

  • DocumentRoot is directory where monitor.html, asyncoro.css, etc. files needed for the service are stored. If this is not set, the directory is assumed to be data directory under the directory where asyncoro.httpd module is stored.

  • keyfile and certfile, if given, will be used to configure SSL so https can be used between browser and HTTP server. If both key and certificate are in the same file, then it should be given as certfile.

  • show_coro_args boolean parameter controls whether coroutine arguments are shown in web browser. Default value True sends coroutine arguments to the browser. If coroutines are created with large data as arguments, though, it may be quite inefficient to exchange that data between the scheduler and the browser. In such case, it is strongly recommended to set this parameter to False so viewing coroutines on a server doesn’t cause performance issues. Note that httpd converts coroutine arguments to strings (if arguments are not primitive types, the classes must provide serialization methods) before sending them to the browser. If serialization is not possible, httpd may fail. Setting show_coro_args to False will prevent this.

    This parameter can also be updated dynamically in ‘Cluster’ page with web browser.

The HTTP server has following methods:

shutdown(wait=True)

Shuts down HTTP server. If wait is True, the server waits for current poll_sec period before shutting down so the web browser gets all the updates.

Note

When cluster status is being monitored, the HTTP server sends only changes to cluster status between updates to browser (for efficiency); i.e., each time browser gets the status update at current poll_sec interval, the server sends the changes since last time browser requested data. The browser computes full current status with these updates. Consequently, the status can be viewed in only one browser; if more than one browser is used, they will not get full information.

status_coro

This is a corotune that should get all status messages sent by scheduler. The client program should set status_coro attribute to this coroutine if the client needs to process status messages itself, or if messages need to be chained to other recipients.

6.9. Example

See discoro_httpd1.py for an example.

6.10. Client (Browser) Interface

Once HTTP server is created, the discoro servers can be monitored and managed in a web browser at http://<host>:8181, where <host> is name or IP address of computer running the program. If SSL certificates are used to setup HTTP server, https protocol should be used in URL above. There are currently 3 sections (menu items):

6.10.1. Cluster Status

The Cluster menu shows summary of nodes and coroutines:

_images/cluster.png

The information shows summary of nodes and coroutines. Coroutine summary shows total number of coroutines submitted so far, done (finished or cancelled) and currently running coroutines. The nodes summary shows IP address, number of servers running on that node, number of coroutines submitted to all servers on that node, number of corotuines done by all servers on that node, and number of currently running coroutines by all servers on that node. Each node’s IP address is shown with hyper link; when the link is activated, the page changes to show status for that node, as explained in Node Status.

The nodes are sorted by default on the IP address in descending order. The field to sort on can be changed; however, as other fields are not necessarily unique, sorting on other fields is inefficient, so if there are many nodes, especially with frequent updates, choose IP address as the field to sort. Sorting can be changed even after cluster is closed.

‘Show Coroutine Arguments’ checkbox controls whether coroutine arguments are shown in ‘Server’ page. If coroutines are created with large data as arguents, it is recommended to disable this, as otherwise exchanging that data between scheduler and web browser can be quite inefficient.

6.10.2. Node Status

Each node in Cluster Status section is a hyper link which when followed (to Node menu) shows details about that node, including servers available, coroutines processed:

_images/node.png

The summary of node includes number of server processes running on the node, number of coroutines running on each server. Each server is shown with its location (an instance of Location) as hyper link. This link can be used to get details of coroutines running on that server, as explained in Server Status.

6.10.3. Server Status

As noted above, a discoronode program starts a discoro server process for each CPU available (or as specified with -c option) on that node. Each server process runs coroutines submitted by discoro scheduler. The Server menu shows number of coroutines submitted, number of coroutines done, and details of each coroutine currently running. The details show the name of coroutine (function), the arguments used to run it and the time when it was started.

_images/server.png

The arguments are converted to strings by HTTP server before sending to the browser client. If any of these are instances of user provided classes, it may be useful to provide __str__ method. Otherwise, Python’s default __str__ method may show it as simply an instance of a class, which may not be very useful.

If necessary, coroutines can be selected and terminated (killed).

6.11. Docker Container

discoronode islotates computation environment so that jobs from one computation don’t interfere with jobs from another computation, even if a node is shared and jobs from different computations are running simlutaneously. Usually, any files transferred and saved by jobs are also removed when computation is closed (the exception is when dest_path is given or if cleanup is False, when files may be left behind). However, the jobs have access to server’s file system so they can be security risk. It is possible to avoid (some) issues by creating special user with access only to specific path (e.g., with a chroot environment).

If complete isolation of computation is needed, Docker containers can be used. Each container runs a copy of small Linux distribution with its own file system; the container has no access to host file system (unless it is configured to). asyncoro now includes Dockerfile under data directory where asyncoro module is installed, which can be obtained with the program:

import os, asyncoro
print(os.path.join(os.path.dirname(asyncoro.__file__), 'data', 'Dockerfile'))

Note that Docker runs under Linux host only; with other operating systems, a guest VM can be used to run Linux under which Docker can be run. See Docker Machine and Docker Docs for more details.

To build an image with latest Ubuntu Linux and asyncoro, install docker if not already installed, create a temporary directory, say, /tmp/asyncoro-docker, change to that directory and copy Dockerfile from above to that directory. (The Dockerfile can be customized to suit any additional tools or setup needed.) Then execute docker build -t asyncoro . (note the dot at the end). Full list of instructions for building image for Python 2.7 (for Python 3 use appropriate path to where Dockerfile is installed) are:

mkdir /tmp/asyncoro-docker
cd /tmp/asyncoro-docker
cp /usr/local/lib/python2.7/dist-packages/asyncoro/data/Dockerfile .
docker build -t asyncoro .

Once the image is built, a new container can be run with:

docker run --net=host -it asyncoro

to start discoronode.py (which is the default command for the image built above) with default options. --net=host runs container in host network mode, i.e., container uses host network configuration. See –save_config and –config options to discoronode to use same options across many runs. If these or any other options are needed, Dockerfile can be customized before building the image in the instructions above.

If each computation should be started in a new container (so that computations start in the same environment using the image built above), then serve option can be used as:

while :; do
    docker run --net=host -it asyncoro discoronode.py --serve 1
done

This causes discoronode to quit when the client closes currently running computation, which terminates container and because of while loop, a new container is started from the image.

dispy project also has similar instructions for building docker images. Since dispy depends on asyncoro, asyncoro modules, including discoronode, are also installed when installing dispy. So it is possible to build dispy and use discoronode (e.g., with docker run --net=host -it dispy discoronode.py) from dispy image instead of dispynode (when discoronode is more appropriate than dispynode).

6.12. Cloud Computing

ext_ip_addr of Node / Servers can be used to work with cloud computing service, such as Amazon EC2. Other cloud computing services can also be used similarly.

It may be necessary to setup the configuration to allow TCP ports used by discoronode. Here we assume ports 51347 and above are used by discoronode. For example, with EC2 “Security Group” should be created and assigned to the instance so inbound TCP ports 51347 (and/or other ports used) are allowed.

With EC2 service, a node has a private IP address (called ‘Private DNS Address’) that uses private network of the form 10.x.x.x and public address (called ‘Public DNS Address’) that is of the form ec2-x-x-x-x.x.amazonaws.com. After launching instance(s), login to server(s), install asyncoro (e.g., with pip install asyncoro) and run discoronode on each node with:

discoronode.py --ext_ip_addr ec2-x-x-x-x.y.amazonaws.com --tcp_ports 51347

(this address can’t be used with -i/–ip_addr option, as the network interface is configured with private IP address only). This node can then be used by discoro client from outside EC2 network by specifying ec2-x-x-x-x.x.amazonaws.com as a peer (see below). With ext_ip_addr, discoronode acts similar to NAT - it announces ext_ip_addr to other services instead of the configured ip_addr so that external services send requests to ext_ip_addr.

If the EC2 node can connect back to client with the IP address and port used by client, the node can be paired with:

...
yield asyncoro.AsynCoro().peer(asyncoro.Location('ec2-x-x-x-x.y.amazonaws.com', 51347))
if (yield computation.schedule()):
    raise Exception('Schedule failed')
...

By default, asyncoro uses random TCP port. Within a local network or if client can be reached at any port, this works fine. If the client is behind a router, the router’s firewall can be configured to forward a specific port, say, 4567 (or, 51347 at client as well; here, to avoid confusion a different port is used), to client’s IP address, and asyncoro can be configured in the client to use tcp port 4567 with:

asyncoro.AsynCoro(tcp_port=4567)

before any coroutines or channels are created (creating a coroutine or channel automatically starts asyncoro with default parameters, which uses random TCP port).

If client is behind a router and its firewall can’t be setup to forward port 4567, then ssh can be used to forward the port. To use this, first login to EC2 node with:

ssh -i ec2-key.pem 4567:127.0.0.1:4567 userid@ec2-x-x-x-x.y.amazonaws.com

Then start discoronode as mentioned above, and start asyncoro at client with:

asyncoro.AsynCoro(node='127.0.0.1', tcp_port=4567)

See discoro_ssh_ec2.py for an example where ssh port forwarding is used for cloud computing with Amazon EC2.

In case of problems, enable debugging on the nodes (with -d option) and client (with asyncoro.logger.setLevel(logging.DEBUG) statement, as done in example above). If that still doesn’t work, check that the node is reachable with telnet ec2-x-x-x-x.y.amazonaws.com 51347 from client (after starting discoronode); the output should contain Connected message.