Background
If a process (running instance of a script/application) has just one path of execution, one main thread, it may be possible to speed up the execution time by performing some tasks in parallel (tasks processed at the same time on multicore systems) or concurrently (tasks making progress by CPU context switching on single core machines). Parallelization (meaning here NOT processing tasks in sequence) implemented within one process, can be achieved through:- multiple child processes running in parallel
- multiple threads
- asynchronous processing (Asynchronous Processing in Perl)
Code Examples
Child processes
The example below contains two implementation of forking child processes. The first one uses a CPAN module Parallel::ForkManager, the second one uses the basic forking. There are also two approaches to reaping of dead children. One through CHILD signal handler, one using waitpid. In long running processes, children may stay around even after finishing, causing a strain on system resources. Therefore it is important to ensure their removal/reaping.
#!/usr/bin/perl =head2 parallel_worker.pl two implementations of forking child processes =cut use strict; use warnings; use utf8; use v5.018; use Parallel::ForkManager; use POSIX":sys_wait_h"; use Time::HiRes qw(time); use Data::Printer; my $pm = Parallel::ForkManager->new(4); =head3 Process all files in parallel loops through all the files to be processed creates/forks child processes reaps deal child processes: reaping of dead child processes/zombies. Zombies are processes, that have finished execution, but remain in the process table, if the parent process need to inquire about the child process exit status. If, for some reason, the zombies are not removed from the process table (reaped, by reading the child status through the wait system call), can lead to resource leaks. 2 implementations: a) with Parallel::ForkManager b) with fork: 1) uses CHILD signal handler 2) uses waitpid =cut my @files = qw(a b c d e f g); my %child = (); # creating child processes: implementation 1 # ========================================== DATA_LOOP: foreach my $data (@files) { # forks a new child process my $pid; $pid = $pm->start and say "... child $pid" and next DATA_LOOP; # what will be done in the child process # until ->finish is encountered sleep 3; # end the child process $pm->finish; } $pm->wait_all_children; say ">>> DONE 1"; # creating child processes: implementation 2 # ========================================== # child handler to reap dead children # ----------------------------------- $SIG{CHLD} = sub { while ( (my $pid = waitpid(-1, WNOHANG)) > 0 ) { if (exists $child{$pid}) { delete $child{$pid}; say "!!! deleted $pid"; } return unless keys %child; } }; foreach my $data (@files) { # create a child process # the flow execution goes until the # end of the block my $pid = fork; # child process -------------------- if ($pid) { say "* in the child process $pid"; $child{$pid} = undef; sleep 3; } # parent process elsif ($pid == 0) { # the parent process needs to exit # otherwise the flow execution will # continue after the foreach loop # producing multiple 'DONE 2 statements' # instead of just one exit 0; } # failure to fork else { say "* failed to fork a process"; } say "* still processing in child process $pid"; # ---------------------------------- } ### reaping dead child processes without child signal handler ### to use: comment out the CHILD gignal handler ### and uncomment lines below ## --------------------------------------------------- ##while (keys %child) { ## for my $key (keys %child) {; ## my $pid = waitpid($key, WNOHANG); ## ## if ($pid == -1) { ## "\t>>> child $key does not exist"; ## delete $child{$key}; ## ## say "\t\t deleted key $key"; ## } ## ## if ($pid == $key) { ## delete $child{$key}; ## ## say "\t\t *** child $key reaped"; ## say "\t\t *** deleted key $key"; ## } ## say ">>>--------------------------"; ## } ##} ## p %child; say ">>> DONE 2";
Threads
The task is to process files in a directory, that contain one number per line, and output the total sum of all numbers in all files.
The following script contains two implementations for comparison. Both use a job queue, a Thread::Queue object, holding jobs (information need to do a task). One job is to process one file, ie to calculate the sum of all numbers in that file. The implementations are using
a) a single thread
b) a pool of threads
There is a limit on the number of created threads. The threads work in parallel and take jobs off the job queue until there is no more work to be done. One job in this example is the calculation of the sum of all numbers in one file.
Taking a job off the job queue (->dequeue) is non-blocking. This allows the flow to continue even when there are no more jobs in the queue. Blocking dequeuing would require to implement a mechanism that would deal with this scenario and allow the program to continue.
After a thread processes a file, it returns the file sum. After all threads are created, we join them/wait for them to finish and retrieve the partial sums, which we then process further.
#!/usr/bin/perl use strict; use warnings; use utf8; use v5.018; use threads; use Thread::Queue; my ($t0_a, $t1_a, $t0_b, $t1_b, $td_a, $td_b); use List::Util qw(sum); use Data::Printer; use Benchmark qw(timediff timestr); local $|; my $MAX_THREADS = 5; my $data_dir = './test'; my %work_queue = (); my @results = (); my $files_count = 0; opendir my $dh, $data_dir || die "can't opendir $data_dir $!"; my @files = grep { /a*\.txt/ } readdir $dh; $files_count = scalar @files; closedir $dh; say "\n*************************************************"; say "*** Jobs: one job == processed file ***"; say "*************************************************\n"; p @files; # Job queue my $q = Thread::Queue->new(); # Add jobs to the job queue $q->enqueue(@files); say "\n*************************************************"; say "*** One thread takes jobs off a job queue ***"; say "*************************************************\n"; say "Pending jobs:"; p $q->pending(); =head2 One thread Each thread will take work off the work queue while work is available =cut $t0_a = Benchmark->new; my $thr = threads->create( sub { my $sum = 0; # Thread will loop until no more work # using ->dequeue will block the execution # when there are no jobs to be done, unless # another mechanism takes care of that # and handles the empty job queue while (defined (my $file = $q->dequeue_nb())) { my $incr_sum = _get_file_sum("$data_dir/$file"); $sum += $incr_sum; } return $sum; } ); { my @thr_results = map { $_->join() } threads->list(); $t1_a = Benchmark->new; $td_a = timestr(timediff($t1_a, $t0_a)); p @thr_results; say "Done: sum is " . sum @thr_results; say "Run time = $td_a"; } =head2 Thread pool
To avoid the cost of creating new threads, we shall create
a thread pool and reuse threads that are available to do
more work
=cut say "\n*************************************************"; say "*** A pool of threads: each thread takes jobs off\nthe job queue while jobs are available ***"; say "*************************************************\n"; say "Pending jobs after the previous processing:"; p $q->pending(); # Send work to the thread $q->enqueue(@files); # Signal that there is no more work to be sent $q->end(); say "Pending jobs:"; p $q->pending(); $t0_b = Benchmark->new; # Lower the number of created threads if the number of jobs is lower than the # allowed thread limit $MAX_THREADS = ($MAX_THREADS > $files_count) ? $files_count : $MAX_THREADS; say "\nCreating a pool of $MAX_THREADS threads\n"; for (1 .. $MAX_THREADS) {; my $thr = threads->create( sub { my $sum = 0; # Thread will loop until no more work # using ->dequeue will block the execution # when there are no jobs to be done, unless # another mechanism takes care of that # and handles the empty job queue while (defined (my $file = $q->dequeue_nb())) { my $incr_sum = _get_file_sum("$data_dir/$file"); $sum += $incr_sum; } return $sum; } ); } =head3 Wait for all threads to finish and collect all results =cut { my @thr_results = map { $_->join() } threads->list(); say "Pending jobs:"; p $q->pending(); say "Collected results:"; p @thr_results; $t1_b = Benchmark->new; $td_b = timestr(timediff($t1_b, $t0_b)); say "Done: sum is " . sum @thr_results; say "Run time when 1 queue => $td_a"; say "Run time when $MAX_THREADS threads => $td_b"; } exit(0); =head2 PRIVATE METHODS =head3 _get_file_sum =cut sub _get_file_sum { my ($file) = @_; open my $fh, '<', $file or die "$!"; # For benchmarking purposes sleep 1; my $work; while (my $line = <$fh>) { chomp $line; $work += $line; } say "\t\tFile $file: sum = $work" if defined $work; return $work; }
Code on github
No comments:
Post a Comment
Note: only a member of this blog may post a comment.