The pprocess
module defines a simple parallel processing API
for Python, inspired somewhat by the thread
module, slightly less
by pypar, and slightly
less still by pypvm.
This document complements the tutorial by providing an overview of the different styles of parallel programming supported by the module. For an introduction and in order to get a clearer idea of the most suitable styles for your own programs, consult the tutorial.
To create new processes to run a function or any callable object, specify the "callable" and any arguments as follows:
channel = pprocess.start(fn, arg1, arg2, named1=value1, named2=value2)
This returns a channel which can then be used to communicate with the created process. Meanwhile, in the created process, the given callable will be invoked with another channel as its first argument followed by the specified arguments:
def fn(channel, arg1, arg2, named1, named2): # Read from and write to the channel. # Return value is ignored. ...
To create new processes in a similar way to that employed when using os.fork
(ie. the fork
system call on various operating systems), use the following
method:
channel = pprocess.create() if channel.pid == 0: # This code is run by the created process. # Read from and write to the channel to communicate with the # creating/calling process. # An explicit exit of the process may be desirable to prevent the process # from running code which is intended for the creating/calling process. ... pprocess.exit(channel) else: # This code is run by the creating/calling process. # Read from and write to the channel to communicate with the created # process. ...
When creating many processes, each providing results for the consumption of the main process, the collection of those results in an efficient fashion can be problematic: if some processes take longer than others, and if we decide to read from those processes when they are not ready instead of other processes which are ready, the whole activity will take much longer than necessary.
One solution to the problem of knowing when to read from channels is to create
an Exchange
object, optionally initialising it with a list of channels
through which data is expected to arrive:
exchange = pprocess.Exchange() # populate the exchange later exchange = pprocess.Exchange(channels) # populate the exchange with channels
We can add channels to the exchange using the add
method:
exchange.add(channel)
To test whether an exchange is active - that is, whether it is actually
monitoring any channels - we can use the active
method which
returns all channels being monitored by the exchange:
channels = exchange.active()
We may then check the exchange to see whether any data is ready to be received; for example:
for channel in exchange.ready(): # Read from and write to the channel. ...
If we do not wish to wait indefinitely for a list of channels, we can set a
timeout value as an argument to the ready
method (as a floating
point number specifying the timeout in seconds, where 0
means a
non-blocking poll as stated in the select
module's select
function documentation).
A convenient form of message exchanges can be adopted by defining a subclass of
the Exchange
class and defining a particular method:
class MyExchange(pprocess.Exchange): def store_data(self, channel): data = channel.receive() # Do something with data here.
The exact operations performed on the received data might be as simple as storing it on an instance attribute. To make use of the exchange, we would instantiate it as usual:
exchange = MyExchange() # populate the exchange later exchange = MyExchange(limit=10) # set a limit for later population
The exchange can now be used in a simpler fashion than that shown above. We can
add channels as before using the add
method, or we can choose to only
add channels if the specified limit of channels is not exceeded:
exchange.add(channel) # add a channel as normal exchange.add_wait(channel) # add a channel, waiting if the limit would be # exceeded
Or we can request that the exchange create a channel on our behalf:
channel = exchange.create()
We can even start processes and monitor channels without ever handling the channel ourselves:
exchange.start(fn, arg1, arg2, named1=value1, named2=value2)
We can explicitly wait for "free space" for channels by calling the
wait
method, although the start
and add_wait
methods make this less interesting:
exchange.wait()
Finally, when finishing the computation, we can choose to merely call the
finish
method and have the remaining data processed automatically:
exchange.finish()
Clearly, this approach is less flexible but more convenient than the raw message exchange API as described above. However, it permits much simpler and clearer code.
Instead of having to subclass the pprocess.Exchange
class and
to define the store_data
method, it might be more desirable to let
the exchange manage the communications between created and creating processes
and to let the creating process just consume received data as it arrives,
without particular regard for the order of the received data - perhaps the
creating process has its own way of managing such issues.
For such situations, the Queue
class may be instantiated and
channels added to the queue using the various methods provided:
queue = pprocess.Queue(limit=10) channel = queue.create() if channel: # Do some computation. pprocess.exit(channel)
The results can then be consumed by treating the queue like an iterator:
for result in queue: # Capture each result.
This approach does not, of course, require the direct handling of channels.
One could instead use the start
method on the queue to create
processes and to initiate computations (since a queue is merely an enhanced
exchange with a specific implementation of the store_data
method).
Where the above Queue
class appears like an attractive solution
for the management of the results of computations, but where the order of their
consumption by the creating process remains important, the Map
class may offer a suitable way of collecting and accessing results:
results = pprocess.Map(limit=10) for value in inputs: results.start(fn, args)
The results can then be consumed in an order corresponding to the order of the computations which produced them:
for result in results: # Process each result.
Internally, the Map
object records a particular ordering of
channels, ensuring that the received results can be mapped to this ordering,
and that the results can be made available with this ordering preserved.
A further simplification of the above convenient use of message exchanges
involves the creation of callables (eg. functions) which are automatically
monitored by an exchange. We create such a callable by calling the
manage
method on an exchange:
myfn = exchange.manage(fn)
This callable can then be invoked instead of using the exchange's
start
method:
myfn(arg1, arg2, named1=value1, named2=value2)
The exchange's finish
method can be used as usual to process
incoming data.
In making a program parallel, existing functions which only return results can
be manually modified to accept and use channels to communicate results back to
the main process. However, a simple alternative is to use the MakeParallel
class to provide a wrapper around unmodified functions which will return the results
from those functions in the channels provided. For example:
fn = pprocess.MakeParallel(originalfn)
In situations where a callable would normally be used in conjunction with the
Python built-in map
function, an alternative solution can be adopted by using
the pmap
function:
pprocess.pmap(fn, sequence)
Here, the sequence would have to contain elements that each contain the
required parameters of the specified callable, fn
. Note that the
callable does not need to be a parallel-aware function which has a
channel
argument: the pmap
function automatically
wraps the given callable internally.
So far, all parallel computations have been done with newly-created
processes. However, this can seem somewhat inefficient, especially if processes
are being continually created and destroyed (although if this happens too
often, the amount of work done by each process may be too little, anyway). One
solution is to retain processes after they have done their work and request
that they perform more work for each new parallel task or invocation. To enable
the reuse of processes in this way, a special keyword argument may be specified
when creating Exchange
instances (and instances of subclasses such
as Map
and Queue
). For example:
exchange = MyExchange(limit=10, reuse=1) # reuse up to 10 processes
Code invoked through such exchanges must be aware of channels and be
constructed in such a way that it does not terminate after sending a result
back to the creating process. Instead, it should repeatedly wait for subsequent
sets of parameters (compatible with those either in the signature of a callable
or with the original values read from the channel). Reusable code is terminated
when the special value of None
is sent from the creating process
to the created process, indicating that no more parameters will be sent; this
should cause the code to terminate.
An easier way of making reusable code sections for parallel use is to employ the
MakeReusable
class to wrap an existing callable:
fn = pprocess.MakeReusable(originalfn)
This wraps the callable in a similar fashion to MakeParallel
, but
provides the necessary mechanisms described above for reusable code.
Much of the usage of exchanges so far has concentrated on processes which are created, whose callables are invoked, and then, once those callables have returned, either they are invoked again in the same process (when reused) or in a new process (when not reused). However, the underlying mechanisms actually support processes whose callables not only receive input at the start of their execution and send output at the end of their execution, but may provide output on a continuous basis (similar to iterator or generator objects).
To enable support for continuous communications between processes, a
keyword argument must be specified when creating an Exchange
instance (or an instance of a subclass of Exchange
such as
Map
or Queue
):
exchange = MyExchange(limit=10, continuous=1) # support up to 10 processes
Code invoked in this mode of communication must be aware of channels, since
it will need to explicitly send data via a channel to the creating process,
instead of terminating and sending data only once (as would be done
automatically using convenience classes such as
MakeParallel
).
So far, all parallel computations have involved created processes which depend on the existence of the created process to collect results and to communicate with these created processes, preventing the created process from terminating, even if the created processes actually perform work and potentially create output which need not concern the process which created them. In order to separate creating and created processes, the concept of a background process (also known as a daemon process) is introduced.
The BackgroundCallable
class acts somewhat like the
manage
method on exchange-based objects, although no exchange is
immediately involved, and instances of BackgroundCallable
provide
wrappers around existing parallel-aware callables which then be invoked in order
to initiate a background computation in a created process. For example:
backgroundfn = pprocess.BackgroundCallable(address, fn)
This wraps the supplied callable (which can itself be the result of using
MakeParallel
), with the resulting wrapper lending itself to
invocation like any other function. One distinguishing feature is that of the
address
: in order to contact the background process after
invocation to (amongst other things) receive any result, a specific address
must be given to define the contact point between the created process and any
processes seeking to connect to it. Since these "persistent" communications
employ special files (specifically UNIX-domain sockets), the address must be a
suitable filename.
Background processes employing persistent communications require adaptations
of the facilities described in the sections above. For a single background
process, the BackgroundQueue
function is sufficient to create a
queue-like object which can monitor the communications channel between the
connecting process and a background process. For example:
queue = pprocess.BackgroundQueue(address)
This code will cause the process reachable via the given address
to be contacted and any results made available via the created queue-like
object.
Where many background processes have been created, a single
PersistentQueue
object can monitor their communications by being
connected to them all, as in the following example:
queue = pprocess.PersistentQueue() for address in addresses: queue.connect(address)
Here, the queue monitors all previously created processes whose addresses
reside in the addresses
sequence. Upon iterating over the queue,
results will be taken from whichever process happens to have data available in
no particular pre-defined order.
When created/child processes terminate, one would typically want to be informed of such conditions using a signal handler. Unfortunately, Python seems to have issues with restartable reads from file descriptors when interrupted by signals:
The exact combination of conditions indicating closed pipes remains relatively obscure. Here is a message/thread describing them (in the context of another topic):
It would seem, from using sockets and from studying the asyncore
module, that sockets are more predictable than pipes.
Notes about poll
implementations can be found here: