Coding With Fun
Home Docker Django Node.js Articles Python pip guide FAQ Policy

How to implement Python parallel processing with one line of code


Jun 01, 2021 Article blog


Table of contents


Python is somewhat infamous for parallelizing programs. P utting aside technical issues, such as thread implementation and GIL I think the wrong instruction is the main problem. C ommon classic Python multithreaded, multi-process tutorials tend to be "heavy." And often itchy boots, without delving into the most useful aspects of everyday work.

Traditional examples

Simply searching for the Python Multithreaded Tutorials, it's not hard to see that almost all of them give examples of classes and queues:

import os
import PIL


from multiprocessing import Pool
from PIL import Image


SIZE = (75,75)
SAVE_DIRECTORY = 'thumbs'


def get_image_paths(folder):
    return (os.path.join(folder, f)
            for f in os.listdir(folder)
            if 'jpeg' in f)


def create_thumbnail(filename): 
    im = Image.open(filename)
    im.thumbnail(SIZE, Image.ANTIALIAS)
    base, fname = os.path.split(filename)
    save_path = os.path.join(base, SAVE_DIRECTORY, fname)
    im.save(save_path)


if __name__ == '__main__':
    folder = os.path.abspath(
        '11_18_2013_R000_IQM_Big_Sur_Mon__e10d1958e7b766c3e840')
    os.mkdir(os.path.join(folder, SAVE_DIRECTORY))


    images = get_image_paths(folder)


    pool = Pool()
    pool.map(creat_thumbnail, images)
    pool.close()
    pool.join()

Ha, it looks a little like Java doesn't it?

I'm not saying it's wrong to use the producer/consumer model to handle multithreaded/multiprocess tasks (in fact, this model has its own useful uses). It's just that we can use a more efficient model when dealing with everyday scripting tasks.

The problem is...

First, you need a sample class;

Second, you need a queue to pass objects;

Also, you need to build methods at both ends of the channel to help you work (if you want to communicate in both directions or save the results, you need to introduce another queue).

(Recommended tutorial: python tutorial)

The more workers there are, the more problems there are

Along these lines, you now need a pool of worker threads. Here's an example from an IBM classic tutorial that accelerates through multiple threads when searching web pages.

#Example2.py
'''
A more realistic thread pool example
'''


import time
import threading
import Queue
import urllib2


class Consumer(threading.Thread): 
    def __init__(self, queue): 
        threading.Thread.__init__(self)
        self._queue = queue


    def run(self):
        while True:
            content = self._queue.get()
            if isinstance(content, str) and content == 'quit':
                break
            response = urllib2.urlopen(content)
        print 'Bye byes!'


def Producer():
    urls = [
        'http://www.python.org', 'http://www.yahoo.com'
        'http://www.scala.org', 'http://www.google.com'
        # etc..
    ]
    queue = Queue.Queue()
    worker_threads = build_worker_pool(queue, 4)
    start_time = time.time()


    # Add the urls to process
    for url in urls:
        queue.put(url)  
    # Add the poison pillv
    for worker in worker_threads:
        queue.put('quit')
    for worker in worker_threads:
        worker.join()


    print 'Done! Time taken: {}'.format(time.time() - start_time)


def build_worker_pool(queue, size):
    workers = []
    for _ in range(size):
        worker = Consumer(queue)
        worker.start()
        workers.append(worker)
    return workers


if __name__ == '__main__':
    Producer()

This code works correctly, but take a closer look at what we need to do: construct different methods, track a series of threads, and do a series of join operations to solve annoying deadlocks. This is just the beginning...

Now that we've reviewed the classic multithreaded tutorial, it's a bit hollow, isn't it? Modeled and error-prone, this multi-effort style is clearly not suitable for everyday use, so we have a better way.

Why don't you try map

map is a compact and sophisticated function that is key to simplifying the parallelization of Python programs. map is derived from functional programming languages such as Lisp It enables mapping between two functions in a sequence.

    urls = ['http://www.yahoo.com', 'http://www.reddit.com']
    results = map(urllib2.urlopen, urls)

The above two lines of code pass each element in the urls sequence as an argument to the urlopen method and save all the results to the results list. The result is roughly equivalent to:

results = []
for url in urls:
    results.append(urllib2.urlopen(url))

map function handles a series of operations, such as sequence operations, parameter delivery, and result saving.

Why is this important? This is because with the right libraries, map makes it easy to parallelize operations.

 How to implement Python parallel processing with one line of code1

There are two libraries in Python that contain map functions: multiprocessing and its little-known sub-library multiprocessing.dummy .

Here are two more sentences: multiprocessing.dummy T hreaded clone of the mltiprocessing library? I s this shrimp rice? E ven in the official documentation for the multiprocessing library, there is only one relevant description of this sub-library. A nd this description translates into adult words basically means, "Well, there's something like this, and you know it's going to be." Believe me, this library is grossly undervalued!

dummy is a complete clone of the multiprocessing module, the only difference being that multiprocessing works on the process, while dummy module acts on the thread (and therefore all of Python common multithreaded restrictions). S o it's easy to replace the two libraries. You can choose different libraries for IO tasks and CPU tasks.

Try it out

Use the following two lines of code to refer to a library that contains parallelized map functions:

from multiprocessing import Pool
from multiprocessing.dummy import Pool as ThreadPool

Instantiate Pool objects:

pool = ThreadPool()

This simple statement replaces the work of the 7-line code of the build*worker*pool function in example2.py It generates a series of worker and completes the initialization work, storing them in variables for easy access.

Pool object has some parameters, and all I need to focus on here is its first argument: processes . T his parameter is used to set the number of threads in the thread pool. Its default value is the number of cores for the current machine CPU

In general, when performing CPU tasks, the more cores you call, the faster you will be. But when dealing with network-intensive tasks, things are a little unpredictable, and it's wise to experiment to determine the size of a thread pool.

pool = ThreadPool(4) # Sets the pool size to 4

Switching threads can take longer than they actually do when there are too many threads. For different jobs, it's a good idea to try to find the optimal value for the size of the thread pool.

Once the Pool object is created, the parallelized program is called out. Let's take a look at the rewritten example2.py

import urllib2
from multiprocessing.dummy import Pool as ThreadPool


urls = [
    'http://www.python.org',
    'http://www.python.org/about/',
    'http://www.onlamp.com/pub/a/python/2003/04/17/metaclasses.html',
    'http://www.python.org/doc/',
    'http://www.python.org/download/',
    'http://www.python.org/getit/',
    'http://www.python.org/community/',
    'https://wiki.python.org/moin/',
    'http://planet.python.org/',
    'https://wiki.python.org/moin/LocalUserGroups',
    'http://www.python.org/psf/',
    'http://docs.python.org/devguide/',
    'http://www.python.org/community/awards/'
    # etc..
    ]


# Make the Pool of workers
pool = ThreadPool(4)
# Open the urls in their own threads
# and return the results
results = pool.map(urllib2.urlopen, urls)
#close the pool and wait for the work to finish
pool.close()
pool.join()

There are only four lines of code that actually works, and only one of them is critical. map function easily replaces the example of more than 40 lines above. To be more interesting, I counted the time-consuming situations of different methods and different thread pool sizes.

# results = []
# for url in urls:
#   result = urllib2.urlopen(url)
#   results.append(result)


# # ------- VERSUS ------- #


# # ------- 4 Pool ------- #
# pool = ThreadPool(4)
# results = pool.map(urllib2.urlopen, urls)


# # ------- 8 Pool ------- #


# pool = ThreadPool(8)
# results = pool.map(urllib2.urlopen, urls)


# # ------- 13 Pool ------- #


# pool = ThreadPool(13)
# results = pool.map(urllib2.urlopen, urls)

outcome:

#        Single thread:  14.4 Seconds
#               4 Pool:   3.1 Seconds
#               8 Pool:   1.4 Seconds
#              13 Pool:   1.3 Seconds

That's a great result, isn't it? T his result also illustrates why experiments should be used to determine the size of the thread pool. On my machine, the benefits of a thread pool size greater than 9 are limited.

Another real example

Create thumbnails of thousands of pictures

This is a CPU-intensive task and is well suited for parallelization.

The underlying single-process version

import os
import PIL


from multiprocessing import Pool
from PIL import Image


SIZE = (75,75)
SAVE_DIRECTORY = 'thumbs'


def get_image_paths(folder):
    return (os.path.join(folder, f)
            for f in os.listdir(folder)
            if 'jpeg' in f)


def create_thumbnail(filename): 
    im = Image.open(filename)
    im.thumbnail(SIZE, Image.ANTIALIAS)
    base, fname = os.path.split(filename)
    save_path = os.path.join(base, SAVE_DIRECTORY, fname)
    im.save(save_path)


if __name__ == '__main__':
    folder = os.path.abspath(
        '11_18_2013_R000_IQM_Big_Sur_Mon__e10d1958e7b766c3e840')
    os.mkdir(os.path.join(folder, SAVE_DIRECTORY))


    images = get_image_paths(folder)


    for image in images:
        create_thumbnail(Image)

The main job of this code above is to traverse the picture files in the incoming folder, generate thumbnails one by one, and save them to a specific folder.

On this machine, it takes 27.9 seconds to process 6000 pictures with this program.

If we use map function instead of for loop:

import os
import PIL


from multiprocessing import Pool
from PIL import Image


SIZE = (75,75)
SAVE_DIRECTORY = 'thumbs'


def get_image_paths(folder):
    return (os.path.join(folder, f)
            for f in os.listdir(folder)
            if 'jpeg' in f)


def create_thumbnail(filename): 
    im = Image.open(filename)
    im.thumbnail(SIZE, Image.ANTIALIAS)
    base, fname = os.path.split(filename)
    save_path = os.path.join(base, SAVE_DIRECTORY, fname)
    im.save(save_path)


if __name__ == '__main__':
    folder = os.path.abspath(
        '11_18_2013_R000_IQM_Big_Sur_Mon__e10d1958e7b766c3e840')
    os.mkdir(os.path.join(folder, SAVE_DIRECTORY))


    images = get_image_paths(folder)


    pool = Pool()
    pool.map(creat_thumbnail, images)
    pool.close()
    pool.join()

5.6 seconds!

Although only a few lines of code have been changed, we have significantly improved the speed with which the program executes. I n a production environment, we can select multi-process and multithreaded libraries for CPU and IO tasks, respectively, to further speed up execution -- and that's the solution to the deadlock problem. In addition, because map function does not support manual thread management, it makes related debug work extremely simple.

At this point, we achieve (basically) parallelization through a one-line Python.

(Recommended micro-course: python3 basic micro-course)

The above is about how to implement Python parallel processing with one line of code related to the introduction, I hope to help you.