Learning to use multiprocessing in Python using Pebble package

A popular thread and concurrency management package in Python that I generally rely on is the pebble package. Pebble simplifies parallel task execution in Python, making it easier to leverage the full computing power of multicore systems and improve the performance of concurrent applications. I intend to give a small demo of how to implement this package in a regular piece of code.


Let's consider this piece of code that iterates over an array of tuples. Each tuple holds our task name and time duration. We are passing each tuple as argument to a function that performs a basic task(e.g. sleep). There is no concurrency involved here as each loop waits until the previous loop completes all tasks.

import time
work = (["A", 5], ["B", 2], ["C", 1], ["D", 3])


def work_log(work_data):
print(" Process %s waiting %s seconds" % (work_data[0], work_data[1]))
time.sleep(int(work_data[1]))
print(" Process %s Finished." % work_data[0])


for work_data in work:
work_log(work_data)

The output of this code is as follows:

 Process A waiting 5 seconds

 Process A Finished.

 Process B waiting 2 seconds

 Process B Finished.

 Process C waiting 1 seconds

 Process C Finished.

 Process D waiting 3 seconds

 Process D Finished.


The runtime of this code is at the very least 5+2+1+3=11 seconds. This is not the most efficient manner of handling tasks that are similar in nature. Thus, we use pebble to improve runtime of code.


We will start by importing the packages we need.

from pebble import ProcessPool, ProcessExpired
import time


Now we modify the main block to initialize ProcessPool that we imported from pebble package.
We set max_workers as 16 and max_tasks to 1. max_workers is generally set to the number of CPUs the system has.
Next, we use the map method of pool object passing function work_log and function argument work array. This second argument can be used to pass any number of arguments required for function in first argument.
We define an iterator by calling result() method of the future that holds our execution pool.
Finally, we use next() method passing this iterator to start our multi-thread execution and define error handling mechanisms.

def start_run():
with ProcessPool(max_workers=16, max_tasks=1) as pool:
future = pool.map(work_log, work, timeout=10)

iterator = future.result()

while True:
try:
result = next(iterator)
except StopIteration:
break
except TimeoutError as error:
print("function took longer than %d seconds" % error.args[1])
except ProcessExpired as error:
print("%s. Exit code: %d" % (error, error.exitcode))
except Exception as error:
print("function raised %s" % error)
print(error)


Finally, we use main execution loop to call our function start_run(). As seen in the output below, our concurrent code takes as long as the longest task duration, which in our case is 5 seconds for Task A.

if __name__ == '__main__':
start_run()


Output:

 Process A waiting 5 seconds

 Process B waiting 2 seconds

 Process C waiting 1 seconds

 Process D waiting 3 seconds

 Process C Finished.

 Process B Finished.

 Process D Finished.

 Process A Finished.


Process finished with exit code 0


Thus,

without using concurrency = 11 seconds
using concurrency = 5 seconds

Execution time reduced by 11-5=6 seconds.

Speedup = 11/5 ~ 2.2

Comments

Popular posts from this blog

Building a local weather station using Raspberry pi W and Arduino Nano || Serial communication over Bluetooth HC-05 module

Youtube not working in Vianet NetTV? Here's how you can fix it

Building a news app in react-native using Expo and Express on the Node.js server