Search This Blog

Sunday, 8 March 2015

Parallelization examples in Perl

Background

If a process (running instance of a script/application) has just one path of execution, one main thread, it may be possible to speed up the execution time by performing some tasks in parallel (tasks processed at the same time on multicore systems) or concurrently (tasks making progress by CPU context switching on single core machines). Parallelization (meaning here NOT processing tasks in sequence) implemented within one process, can be achieved through:
While child processes are independent of the parent process, threads are parallel paths of execution within a process. This affects system resources used, shared data and communication. Processes need to use Inter-process Communication. Threads share the memory address space and other resources (file handles, network sockets, locks etc) within the process. Therefore can read from and write to the same variables and data structures and directly communicate. Asynchronous processing is used in one threaded processes to achieve parallelization through non-blocking approach. Task A is started, the flow however is not blocked by waiting for the task to finish before moving on. Instead, the processing carries on, being able to tackle other tasks (ie, these tasks can run in parallel with task A), before the flow returns to task A when the task is finished, possibly returning data etc. Asynchronous processing is achieved through various implementations of an event loop.

Code Examples

Child processes


The example below contains two implementation of forking child processes. The first one uses a CPAN module Parallel::ForkManager, the second one uses the basic forking. There are also two approaches to reaping of dead children. One through CHILD signal handler, one using waitpid. In long running processes, children may stay around even after finishing, causing a strain on system resources. Therefore it is important to ensure their removal/reaping.

#!/usr/bin/perl 

=head2 parallel_worker.pl

two implementations of forking child processes

=cut

use strict;
use warnings;
use utf8;
use v5.018;

use Parallel::ForkManager;
use POSIX":sys_wait_h";

use Time::HiRes qw(time);
use Data::Printer;

my $pm = Parallel::ForkManager->new(4);

=head3 Process all files in parallel

loops through all the files to be processed
creates/forks child processes
reaps deal child processes:
    reaping of dead child processes/zombies. Zombies are processes,
    that have finished execution, but remain in the process table,
    if the parent process need to inquire about the child process
    exit status. If, for some reason, the zombies are not removed
    from the process table (reaped, by reading the child status 
    through the wait system call), can lead to resource leaks.
2 implementations:
    a) with Parallel::ForkManager
    b) with fork:
        1) uses CHILD signal handler
        2) uses waitpid

=cut

my @files = qw(a b c d e f g);
my %child = ();

# creating child processes: implementation 1
# ==========================================

DATA_LOOP:
foreach my $data (@files) {
    
    # forks a new child process
    my $pid;
    $pid = $pm->start and say "... child $pid" and next DATA_LOOP;

    # what will be done in the child process
    # until ->finish is encountered
    sleep 3;

    # end the child process
    $pm->finish;
}

$pm->wait_all_children;
say ">>> DONE 1";

# creating child processes: implementation 2
# ==========================================

# child handler to reap dead children
# -----------------------------------
$SIG{CHLD} = sub {
    while ( (my $pid = waitpid(-1, WNOHANG)) > 0 ) {
        if (exists $child{$pid}) {
            delete $child{$pid};
            say "!!! deleted $pid";
        }
        return unless keys %child;
    }
};

foreach my $data (@files) {

    # create a child process
    # the flow execution goes until the 
    # end of the block
    my $pid = fork;

    # child process --------------------
    if ($pid) {
        say "* in the child process $pid";
        $child{$pid} = undef;
        sleep 3;
    } 
    # parent process
    elsif ($pid == 0) {
        # the parent process needs to exit
        # otherwise the flow execution will 
        # continue after the foreach loop
        # producing multiple 'DONE 2 statements'
        # instead of just one
        exit 0;
    }
    # failure to fork
    else {
        say "* failed to fork a process";
    }

    say "* still processing in child process $pid";
    # ----------------------------------
}

### reaping dead child processes without child signal handler
###     to use: comment out the CHILD gignal handler
###     and uncomment lines below
## ---------------------------------------------------
##while (keys %child) {
##    for my $key (keys %child) {;
##        my $pid = waitpid($key, WNOHANG);
##
##        if ($pid == -1) {
##            "\t>>> child $key does not exist";
##            delete $child{$key}; 
##
##            say "\t\t deleted key $key";
##        }
##
##        if ($pid == $key) {
##            delete $child{$key}; 
##
##            say "\t\t *** child $key reaped";
##            say "\t\t *** deleted key $key";
##        }
##        say ">>>--------------------------";
##    }
##}
##

p %child;
say ">>> DONE 2";

Threads

The task is to process files in a directory, that contain one number per line, and output the total sum of all numbers in all files.

The following script contains two implementations for comparison. Both use a job queue, a Thread::Queue object, holding jobs (information need to do a task). One job is to process one file, ie to calculate the sum of all numbers in that file. The implementations are using

a) a single thread
b) a pool of threads

There is a limit on the number of created threads. The threads work in parallel and take jobs off the job queue until there is no more work to be done. One job in this example is the calculation of the sum of all numbers in one file.

Taking a job off the job queue (->dequeue)  is non-blocking. This allows the flow to continue even when there are no more jobs in the queue. Blocking dequeuing would require to implement a mechanism that would deal with this scenario and allow the program to continue.

After a thread processes a file, it returns the file sum. After all threads are created, we join them/wait for them to finish and retrieve the partial sums, which we then process further.

#!/usr/bin/perl

use strict;
use warnings;
use utf8;
use v5.018;

use threads;
use Thread::Queue;

my ($t0_a, $t1_a, $t0_b, $t1_b, $td_a, $td_b);

use List::Util qw(sum);
use Data::Printer;
use Benchmark qw(timediff timestr);

local $|;
my $MAX_THREADS  = 5;
my $data_dir     = './test';

my %work_queue   = ();
my @results      = ();
my $files_count  = 0;

opendir my $dh, $data_dir || die "can't opendir $data_dir $!";

my @files        = grep { /a*\.txt/ } readdir $dh;
$files_count     = scalar @files;

closedir $dh;

say "\n*************************************************";
say "*** Jobs: one job == processed file ***";
say "*************************************************\n";
p @files;

# Job queue
my $q = Thread::Queue->new();

# Add jobs to the job queue
$q->enqueue(@files);

say "\n*************************************************";
say "*** One thread takes jobs off a job queue ***";
say "*************************************************\n";

say "Pending jobs:";
p $q->pending();

=head2 One thread

Each thread will take work off the work queue
while work is available

=cut

$t0_a = Benchmark->new;

my $thr = threads->create(
    sub {
        my $sum = 0;

        # Thread will loop until no more work
        #   using ->dequeue will block the execution
        #   when there are no jobs to be done, unless 
        #   another mechanism takes care of that
        #   and handles the empty job queue
        while (defined (my $file = $q->dequeue_nb())) {
            my $incr_sum = _get_file_sum("$data_dir/$file");   
            $sum += $incr_sum; 
        }
        return $sum;
    }
);

{
    my @thr_results = map { $_->join() } threads->list();

    $t1_a = Benchmark->new;
    $td_a = timestr(timediff($t1_a, $t0_a));

    p @thr_results;
    say "Done: sum is " . sum @thr_results;
    say "Run time = $td_a";
}

=head2 Thread pool 


To avoid the cost of creating new threads, we shall create
a thread pool and reuse threads that are available to do
more work 


=cut

say "\n*************************************************";
say "*** A pool of threads: each thread takes jobs off\nthe job queue while jobs are available ***";
say "*************************************************\n";

say "Pending jobs after the previous processing:";
p $q->pending();

# Send work to the thread
$q->enqueue(@files);

# Signal that there is no more work to be sent
$q->end();

say "Pending jobs:";
p $q->pending();

$t0_b = Benchmark->new;

# Lower the number of created threads if the number of jobs is lower than the
# allowed thread limit
$MAX_THREADS = ($MAX_THREADS > $files_count) ? $files_count : $MAX_THREADS;

say "\nCreating a pool of $MAX_THREADS threads\n";

for (1 .. $MAX_THREADS) {;
    my $thr = threads->create(
        sub {
            my $sum = 0;

            # Thread will loop until no more work
            #   using ->dequeue will block the execution
            #   when there are no jobs to be done, unless 
            #   another mechanism takes care of that
            #   and handles the empty job queue
            while (defined (my $file = $q->dequeue_nb())) {
                my $incr_sum = _get_file_sum("$data_dir/$file");   
                $sum += $incr_sum; 
            }
            return $sum;
        }
    );
}

=head3 Wait for all threads to finish and collect all results

=cut

{
    my @thr_results = map { $_->join() } threads->list();

    say "Pending jobs:";
    p $q->pending();

    say "Collected results:";
    p @thr_results;

    $t1_b = Benchmark->new;
    $td_b = timestr(timediff($t1_b, $t0_b));

    say "Done: sum is " . sum @thr_results;

    say "Run time when 1 queue => $td_a";
    say "Run time when $MAX_THREADS threads => $td_b";
}

exit(0);

=head2 PRIVATE METHODS

=head3 _get_file_sum

=cut

sub _get_file_sum {
    my ($file) = @_;

    open my $fh, '<', $file or die "$!";

    # For benchmarking purposes
    sleep 1;

    my $work;
    while (my $line = <$fh>) {
        chomp $line;
        $work += $line;
    }

    say "\t\tFile $file: sum = $work" if defined $work;

    return $work;
}

Code on github

No comments:

Post a Comment

Note: only a member of this blog may post a comment.