The pprocess
module provides several mechanisms for running
Python code concurrently in several processes. The most straightforward way of
making a program parallel-aware - that is, where the program can take
advantage of more than one processor to simultaneously process data - is to
use the pmap
function.
For a brief summary of each of the features of pprocess
, see
the reference document.
The way pprocess
uses multiple processes to perform work in
parallel involves the fork
system call, which on modern operating
systems involves what is known as "copy-on-write" semantics. In plain language,
when pprocess
creates a new child process to perform work
in parallel with other work that needs to be done, this new process will be a
near-identical copy of the original parent process, and the running
code will be able to access data resident in that parent process.
However, when a child process modifies data, instead of changing that data in such a way that the parent process can see the modifications, the parent process will, in fact, remain oblivious to such changes. What happens is that as soon as the child process attempts to modify the data, it obtains its own separate copy which is then modified independently of the original data. Thus, a copy of any data is made when an attempt is made to write to such data. Meanwhile, the parent's copy of that data will be left untouched by the activities of the child.
It is therefore essential to note that any data distributed to other processes, and which will then be modified by those processes, will not appear to change in the parent process even if the objects employed are mutable. This is rather different to the behaviour of a normal Python program: passing a list to a function, for example, mutates that list in such a way that upon returning from that function the modifications will still be present. For example:
def mutator(l): l.append(3) l = [1, 2] mutator(l) # l is now [1, 2, 3]
In contrast, passing a list to a child process will cause the list to mutate in the child process, but the parent process will not see the list change. For example:
def mutator(l): l.append(3) results = pprocess.Map() mutator = results.manage(pprocess.MakeParallel(mutator)) l = [1, 2] mutator(l) # l is now [1, 2]
To communicate changes to data between processes, the modified objects must be explicitly returned from child processes using the mechanisms described in this documentation. For example:
def mutator(l): l.append(3) return l # the modified object is explicitly returned results = pprocess.Map() mutator = results.manage(pprocess.MakeParallel(mutator)) l = [1, 2] mutator(l) all_l = results[:] # there are potentially many results, not just one l = all_l[0] # l is now [1, 2, 3], taken from the first result
It is perhaps easiest to think of the communications mechanisms as providing a gateway between processes through which information can be passed, with the rest of a program's data being private and hidden from the other processes (even if that data initially resembles what the other processes also see within themselves).
Consider a program using the built-in map
function and a sequence of inputs:
t = time.time() # Initialise an array. sequence = [] for i in range(0, N): for j in range(0, N): sequence.append((i, j)) # Perform the work. results = map(calculate, sequence) # Show the results. for i in range(0, N): for result in results[i*N:i*N+N]: print result, print print "Time taken:", time.time() - t
(This code in context with import
statements and functions is
found in the examples/simple_map.py
file.)
The principal features of this program involve the preparation of an array
for input purposes, and the use of the map
function to iterate
over the combinations of i
and j
in the array. Even
if the calculate
function could be invoked independently for each
input value, we have to wait for each computation to complete before
initiating a new one. The calculate
function may be defined as
follows:
def calculate(t): "A supposedly time-consuming calculation on 't'." i, j = t time.sleep(delay) return i * N + j
In order to reduce the processing time - to speed the code up, in other words - we can make this code use several processes instead of just one. Here is the modified code:
t = time.time() # Initialise an array. sequence = [] for i in range(0, N): for j in range(0, N): sequence.append((i, j)) # Perform the work. results = pprocess.pmap(calculate, sequence, limit=limit) # Show the results. for i in range(0, N): for result in results[i*N:i*N+N]: print result, print print "Time taken:", time.time() - t
(This code in context with import
statements and functions is
found in the examples/simple_pmap.py
file.)
By replacing usage of the map
function with the
pprocess.pmap
function, and specifying the limit on the number of
processes to be active at any given time (the value of the limit
variable is defined elsewhere), several calculations can now be performed in
parallel.
Although some programs make natural use of the map
function,
others may employ an invocation in a nested loop. This may also be converted
to a parallel program. Consider the following Python code:
t = time.time() # Initialise an array. results = [] # Perform the work. print "Calculating..." for i in range(0, N): for j in range(0, N): results.append(calculate(i, j)) # Show the results. for i in range(0, N): for result in results[i*N:i*N+N]: print result, print print "Time taken:", time.time() - t
(This code in context with import
statements and functions is
found in the examples/simple1.py
file.)
Here, a computation in the calculate
function is performed for
each combination of i
and j
in the nested loop,
returning a result value. However, we must wait for the completion of this
function for each element before moving on to the next element, and this means
that the computations are performed sequentially. Consequently, on a system
with more than one processor, even if we could call calculate
for
more than one combination of i
and j
and have the computations executing at the same time, the above program will
not take advantage of such capabilities.
We use a slightly modified version of calculate
which employs
two parameters instead of one:
def calculate(i, j): """ A supposedly time-consuming calculation on 'i' and 'j'. """ time.sleep(delay) return i * N + j
In order to reduce the processing time - to speed the code up, in other words - we can make this code use several processes instead of just one. Here is the modified code:
t = time.time()
# Initialise the results using a map with a limit on the number of
# channels/processes.
results = pprocess.Map(limit=limit)
# Wrap the calculate function and manage it.
calc = results.manage(pprocess.MakeParallel(calculate))
# Perform the work.
print "Calculating..."
for i in range(0, N):
for j in range(0, N):
calc(i, j)
# Show the results.
for i in range(0, N):
for result in results[i*N:i*N+N]:
print result,
print
print "Time taken:", time.time() - t
(This code in context with import
statements and functions is
found in the examples/simple_managed_map.py
file.)
The principal changes in the above code involve the use of a
pprocess.Map
object to collect the results, and a version of the
calculate
function which is managed by the Map
object. What the Map
object does is to arrange the results of
computations such that iterating over the object or accessing the object using
list operations provides the results in the same order as their corresponding
inputs.
In some programs, it is not important to receive the results of computations in any particular order, usually because either the order of these results is irrelevant, or because the results provide "positional" information which let them be handled in an appropriate way. Consider the following Python code:
t = time.time() # Initialise an array. results = [0] * N * N # Perform the work. print "Calculating..." for i in range(0, N): for j in range(0, N): i2, j2, result = calculate(i, j) results[i2*N+j2] = result # Show the results. for i in range(0, N): for result in results[i*N:i*N+N]: print result, print print "Time taken:", time.time() - t
(This code in context with import
statements and functions is
found in the examples/simple2.py
file.)
Here, a result array is initialised first and each computation is performed
sequentially. A significant difference to the previous examples is the return
value of the calculate
function: the position details
corresponding to i
and j
are returned alongside the
result. Obviously, this is of limited value in the above code because the
order of the computations and the reception of results is fixed. However, we
get no benefit from parallelisation in the above example.
We can bring the benefits of parallel processing to the above program with the following code:
t = time.time() # Initialise the communications queue with a limit on the number of # channels/processes. queue = pprocess.Queue(limit=limit) # Initialise an array. results = [0] * N * N # Wrap the calculate function and manage it. calc = queue.manage(pprocess.MakeParallel(calculate)) # Perform the work. print "Calculating..." for i in range(0, N): for j in range(0, N): calc(i, j) # Store the results as they arrive. print "Finishing..." for i, j, result in queue: results[i*N+j] = result # Show the results. for i in range(0, N): for result in results[i*N:i*N+N]: print result, print print "Time taken:", time.time() - t
(This code in context with import
statements and functions is
found in the examples/simple_managed_queue.py
file.)
This revised code employs a pprocess.Queue
object whose
purpose is to collect the results of computations and to make them available
in the order in which they were received. The code collecting results has been
moved into a separate loop independent of the original computation loop and
taking advantage of the more relevant "positional" information emerging from
the queue.
We can take this example further, illustrating some of the mechanisms
employed by pprocess
. Instead of collecting results in a queue,
we can define a class containing a method which is called when new results
arrive:
class MyExchange(pprocess.Exchange): "Parallel convenience class containing the array assignment operation." def store_data(self, ch): i, j, result = ch.receive() self.D[i*N+j] = result
This code exposes the channel paradigm which is used throughout
pprocess
and is available to applications, if desired. The effect
of the method is the storage of a result received through the channel in an
attribute of the object. The following code shows how this class can be used,
with differences to the previous program illustrated:
t = time.time() # Initialise the communications exchange with a limit on the number of # channels/processes. exchange = MyExchange(limit=limit) # Initialise an array - it is stored in the exchange to permit automatic # assignment of values as the data arrives. results = exchange.D = [0] * N * N # Wrap the calculate function and manage it. calc = exchange.manage(pprocess.MakeParallel(calculate)) # Perform the work. print "Calculating..." for i in range(0, N): for j in range(0, N): calc(i, j) # Wait for the results. print "Finishing..." exchange.finish() # Show the results. for i in range(0, N): for result in results[i*N:i*N+N]: print result, print print "Time taken:", time.time() - t
(This code in context with import
statements and functions is
found in the examples/simple_managed.py
file.)
The main visible differences between this and the previous program are the
storage of the result array in the exchange, the removal of the queue
consumption code from the main program, placing the act of storing values in
the exchange's store_data
method, and the need to call the
finish
method on the MyExchange
object so that we do
not try and access the results too soon. One underlying benefit not visible in
the above code is that we no longer need to accumulate results in a queue or
other structure so that they may be processed and assigned to the correct
positions in the result array.
For the curious, we may remove some of the remaining conveniences of the
above program to expose other features of pprocess
. First, we
define a slightly modified version of the calculate
function:
def calculate(ch, i, j): """ A supposedly time-consuming calculation on 'i' and 'j', using 'ch' to communicate with the parent process. """ time.sleep(delay) ch.send((i, j, i * N + j))
This function accepts a channel, ch
, through which results
will be sent, and through which other values could potentially be received,
although we choose not to do so here. The program using this function is as
follows, with differences to the previous program illustrated:
t = time.time() # Initialise the communications exchange with a limit on the number of # channels/processes. exchange = MyExchange(limit=limit) # Initialise an array - it is stored in the exchange to permit automatic # assignment of values as the data arrives. results = exchange.D = [0] * N * N # Perform the work. print "Calculating..." for i in range(0, N): for j in range(0, N): exchange.start(calculate, i, j) # Wait for the results. print "Finishing..." exchange.finish() # Show the results. for i in range(0, N): for result in results[i*N:i*N+N]: print result, print print "Time taken:", time.time() - t
(This code in context with import
statements and functions is
found in the examples/simple_start.py
file.)
Here, we have discarded two conveniences: the wrapping of callables using
MakeParallel
, which lets us use functions without providing any
channel parameters, and the management of callables using the
manage
method on queues, exchanges, and so on. The
start
method still calls the provided callable, but using a
different notation from that employed previously.
Although many programs employ functions and other useful abstractions which can be treated as parallelisable units, some programs perform computations "inline", meaning that the code responsible appears directly within a loop or related control-flow construct. Consider the following code:
t = time.time() # Initialise an array. results = [0] * N * N # Perform the work. print "Calculating..." for i in range(0, N): for j in range(0, N): time.sleep(delay) results[i*N+j] = i * N + j # Show the results. for i in range(0, N): for result in results[i*N:i*N+N]: print result, print print "Time taken:", time.time() - t
(This code in context with import
statements and functions is
found in the examples/simple.py
file.)
To simulate "work", as in the different versions of the
calculate
function, we use the time.sleep
function
(which does not actually do work, and which will cause a process to be
descheduled in most cases, but which simulates the delay associated with work
being done). This inline work, which must be performed sequentially in the
above program, can be performed in parallel in a somewhat modified version of
the program:
t = time.time() # Initialise the results using a map with a limit on the number of # channels/processes. results = pprocess.Map(limit=limit) # Perform the work. # NOTE: Could use the with statement in the loop to package the # NOTE: try...finally functionality. print "Calculating..." for i in range(0, N): for j in range(0, N): ch = results.create() if ch: try: # Calculation work. time.sleep(delay) ch.send(i * N + j) finally: # Important finalisation. pprocess.exit(ch) # Show the results. for i in range(0, N): for result in results[i*N:i*N+N]: print result, print print "Time taken:", time.time() - t
(This code in context with import
statements and functions is
found in the examples/simple_create_map.py
file.)
Although seemingly more complicated, the bulk of the changes in this
modified program are focused on obtaining a channel object, ch
,
at the point where the computations are performed, and the wrapping of the
computation code in a try
...finally
statement which
ensures that the process associated with the channel exits when the
computation is complete. In order for the results of these computations to be
collected, a pprocess.Map
object is used, since it will maintain
the results in the same order as the initiation of the computations which
produced them.
One notable aspect of the above programs when parallelised is that each invocation of a computation in parallel creates a new process in which the computation is to be performed, regardless of whether existing processes had just finished producing results and could theoretically have been asked to perform new computations. In other words, processes were created and destroyed instead of being reused.
However, we can request that processes be reused for computations by
enabling the reuse
feature of exchange-like objects and employing
suitable reusable callables. Consider this modified version of the simple_managed_map program:
t = time.time() # Initialise the results using a map with a limit on the number of # channels/processes. results = pprocess.Map(limit=limit, reuse=1) # Wrap the calculate function and manage it. calc = results.manage(pprocess.MakeReusable(calculate)) # Perform the work. print "Calculating..." for i in range(0, N): for j in range(0, N): calc(i, j) # Show the results. for i in range(0, N): for result in results[i*N:i*N+N]: print result, print print "Time taken:", time.time() - t
(This code in context with import
statements and functions is
found in the examples/simple_manage_map_reusable.py
file.)
By indicating that processes and channels shall be reused, and by wrapping
the calculate
function with the necessary support, the
computations may be performed in parallel using a pool of processes instead of
creating a new process for each computation and then discarding it, only to
create a new process for the next computation.
Although reusable processes offer the opportunity to invoke a callable over
and over within the same created process, they do not fully support the
potential of the underlying mechanisms in pprocess
: created
processes can communicate multiple values to the creating process and can
theoretically run within the same callable forever.
Consider this modified form of the calculate
function:
def calculate(ch, i): """ A supposedly time-consuming calculation on 'i'. """ for j in range(0, N): time.sleep(delay) ch.send((i, j, i * N + j))
This function accepts a channel ch
together with an argument
i
corresponding to an entire row of the input array, as opposed
to having two arguments (i
and j
) corresponding to a
single cell in the input array. In this function, a series of calculations are
performed and a number of values are returned through the channel, without the
function terminating until all values have been returned for the row data.
To use this modified function, a modified version of the simple_managed_queue program is used:
t = time.time() # Initialise the communications queue with a limit on the number of # channels/processes. queue = pprocess.Queue(limit=limit, continuous=1) # Initialise an array. results = [0] * N * N # Manage the calculate function. calc = queue.manage(calculate) # Perform the work. print "Calculating..." for i in range(0, N): calc(i) # Store the results as they arrive. print "Finishing..." for i, j, result in queue: results[i*N+j] = result # Show the results. for i in range(0, N): for result in results[i*N:i*N+N]: print result, print print "Time taken:", time.time() - t
(This code in context with import
statements and functions is
found in the examples/simple_continuous_queue.py
file.)
Although the inner loop in the work section has been relocated to the
calculate
function, the queue still receives outputs from that
function with positional information and a result for the result array. Thus,
no change is needed for the retrieval of the results: they arrive in the queue
as before.
Occasionally, it is desirable to initiate time-consuming computations and to
not only leave such processes running in the background, but to be able to detach
the creating process from them completely, potentially terminating the creating
process altogether, and yet also be able to collect the results of the created
processes at a later time, potentially in another completely different process.
For such situations, we can make use of the BackgroundCallable
class, which converts a parallel-aware callable into a callable which will run
in a background process when invoked.
Consider this excerpt from a modified version of the simple_managed_queue program:
def task(): # Initialise the communications queue with a limit on the number of # channels/processes. queue = pprocess.Queue(limit=limit) # Initialise an array. results = [0] * N * N # Wrap the calculate function and manage it. calc = queue.manage(pprocess.MakeParallel(calculate)) # Perform the work. print "Calculating..." for i in range(0, N): for j in range(0, N): calc(i, j) # Store the results as they arrive. print "Finishing..." for i, j, result in queue: results[i*N+j] = result return results
Here, we have converted the main program into a function, and instead of printing out the results, we return the results list from the function.
Now, let us consider the new main program (with the relevant mechanisms highlighted):
t = time.time() if "--reconnect" not in sys.argv: # Wrap the computation and manage it. ptask = pprocess.BackgroundCallable("task.socket", pprocess.MakeParallel(task)) # Perform the work. ptask() # Discard the callable. del ptask print "Discarded the callable." if "--start" not in sys.argv: # Open a queue and reconnect to the task. print "Opening a queue." queue = pprocess.BackgroundQueue("task.socket") # Wait for the results. print "Waiting for persistent results" for results in queue: pass # should only be one element # Show the results. for i in range(0, N): for result in results[i*N:i*N+N]: print result, print print "Time taken:", time.time() - t
(This code in context with import
statements and functions is
found in the examples/simple_background_queue.py
file.)
This new main program has two parts: the part which initiates the computation, and the part which connects to the computation in order to collect the results. Both parts can be run in the same process, and this should result in similar behaviour to that of the original simple_managed_queue program.
In the above program, however, we are free to specify --start
as
an option when running the program, and the result of this is merely to initiate
the computation in a background process, using BackgroundCallable
to obtain a callable which, when invoked, creates the background process and
runs the computation. After doing this, the program will then exit, but it will
leave the computation running as a collection of background processes, and a
special file called task.socket
will exist in the current working
directory.
When the above program is run using the --reconnect
option, an
attempt will be made to reconnect to the background processes already created by
attempting to contact them using the previously created task.socket
special file (which is, in fact, a UNIX-domain socket); this being done using
the BackgroundQueue
function which will handle the incoming results
in a fashion similar to that of a Queue
object. Since only one
result is returned by the computation (as defined by the return
statement in the task
function), we need only expect one element to
be collected by the queue: a list containing all of the results produced in the
computation.
In the above example, a single background process was used to manage a number of other processes, with all of them running in the background. However, it can be desirable to manage more than one background process.
Consider this excerpt from a modified version of the simple_managed_queue program:
def task(i): # Initialise the communications queue with a limit on the number of # channels/processes. queue = pprocess.Queue(limit=limit) # Initialise an array. results = [0] * N # Wrap the calculate function and manage it. calc = queue.manage(pprocess.MakeParallel(calculate)) # Perform the work. print "Calculating..." for j in range(0, N): calc(i, j) # Store the results as they arrive. print "Finishing..." for i, j, result in queue: results[j] = result return i, results
Just as we see in the previous example, a function called task
has been defined to hold a background computation, and this function returns a
portion of the results. However, unlike the previous example or the original
example, the scope of the results of the computation collected in the function
have been changed: here, only results for calculations involving a certain value
of i
are collected, with the particular value of i
returned along with the appropriate portion of the results.
Now, let us consider the new main program (with the relevant mechanisms highlighted):
t = time.time() if "--reconnect" not in sys.argv: # Wrap the computation and manage it. ptask = pprocess.MakeParallel(task) for i in range(0, N): # Make a distinct callable for each part of the computation. ptask_i = pprocess.BackgroundCallable("task-%d.socket" % i, ptask) # Perform the work. ptask_i(i) # Discard the callable. del ptask print "Discarded the callable." if "--start" not in sys.argv: # Open a queue and reconnect to the task. print "Opening a queue." queue = pprocess.PersistentQueue() for i in range(0, N): queue.connect("task-%d.socket" % i) # Initialise an array. results = [0] * N # Wait for the results. print "Waiting for persistent results" for i, result in queue: results[i] = result # Show the results. for i in range(0, N): for result in results[i]: print result, print print "Time taken:", time.time() - t
(This code in context with import
statements and functions is
found in the examples/simple_persistent_queue.py
file.)
In the first section, the process of making a parallel-aware callable is as
expected, but instead of then invoking a single background version of such a
callable, as in the previous example, we create a version for each value of
i
(using BackgroundCallable
) and then invoke each one.
The result of this is a total of N
background processes, each
running an invocation of the task
function with a distinct value of
i
(which in turn perform computations), and each employing a
UNIX-domain socket for communication with a name of the form
task-i.socket
.
In the second section, since we now have more than one background process, we
must find a way to monitor them after reconnecting to them; to achieve this, a
PersistentQueue
is created, which acts like a regular
Queue
object but is instead focused on handling persistent
communications. Upon connecting the queue to each of the previously created
UNIX-domain sockets, the queue acts like a regular Queue
and
exposes received results through an iterator. Here, the principal difference
from previous examples is the structure of results: instead of collecting each
individual value in a flat i
by j
array, a list is
returned for each value of i
and is stored directly in another
list.
Background computations are useful because they provide flexibility in the
way the results can be collected. One area in which they can be useful is Web
programming, where a process handling an incoming HTTP request may need to
initiate a computation but then immediately send output to the Web client - such
as a page indicating that the computation is "in progress" - without having to
wait for the computation or to allocate resources to monitor it. Moreover, in
some Web architectures, notably those employing the Common Gateway Interface
(CGI), it is necessary for a process handling an incoming request to terminate
before its output will be sent to clients. By using a
BackgroundCallable
, a Web server process can initiate a
communication, and then subsequent server processes can be used to reconnect to
the background computation and to wait efficiently for results.
The following table indicates the features used in converting one sequential example program to another parallel program:
Sequential Example | Parallel Example | Features Used |
---|---|---|
simple_map | simple_pmap | pmap |
simple1 | simple_managed_map | MakeParallel, Map, manage |
simple2 | simple_managed_queue | MakeParallel, Queue, manage |
simple_continuous_queue | Queue, manage (continuous) | |
simple_managed | MakeParallel, Exchange (subclass), manage, finish | |
simple_start | Channel, Exchange (subclass), start, finish | |
simple_background_queue | MakeParallel, BackgroundCallable, BackgroundQueue | |
simple_persistent_queue | MakeParallel, BackgroundCallable, PersistentQueue | |
simple | simple_create_map | Channel, Map, create, exit |