The implementation presented here, was inspired by rabbitmq-tutorials.
The fibonacci subroutine, in the server code, uses caching to speed up calculations for higher numbers.
The client script sends its requests in a parallelized manner.
The dependencies are:
Data::Printer (can be replaced by Data::Dumper)
The scripts are heavily documented to help understand all steps.
RPC Server
#!/usr/bin/perl =head1 RPC server using RabbitMQ to provide fibonacci series calculation =cut use strict; use warnings; use v5.010; $|++; use AnyEvent; use Net::RabbitFoot; use Data::Printer; # Declare subroutines # ------------------- sub fib; sub on_request; # We shall be caching intermediate results of the recursive fib subroutine # ------------------------------------------------------------------------ # (if we don't, calculations take an enormous amount of time for higher numbers) my $cached = {}; # Connection to the RabbitMQ daemon # --------------------------------- # (in this case on the same server, the localhost) my $conn = Net::RabbitFoot->new()->load_xml_spec()->connect( host => 'localhost', port => 5672, user => 'guest', pass => 'guest', vhost => '/', ); # Create a RabbitMQ communication channel # --------------------------------------- my $channel = $conn->open_channel(); # Declare which queue we shall be offering the service on # ------------------------------------------------------- # (will provide the fibonaci series result. If we don't get any input, # then the result will be calculated for n=30 ) $channel->declare_queue(queue => 'rpc_queue'); # Do the calculations, when a request comes # ----------------------------------------- # (sent to the 'rpc_queue' queue) $channel->consume( on_consume => \&on_request, ); print " [x] Awaiting RPC requests\n"; # Wait forever # ------------ AnyEvent->condvar->recv; # ---------------------------------- SUBROUTINES ---------------------------------- sub fib { my ($n, $padding) = @_;
# I used the padding to show the intermediate
# recursive calculations during development.
$padding //= ""; if ($n == 0 || $n == 1) { return $n; } else { my $n1 = $n-1; my $n2 = $n-2; $cached->{$n1} //= fib($n1, "$padding (-1)"); $cached->{$n2} //= fib($n2, "$padding (-2)"); return $cached->{$n1} + $cached->{$n2}; } } sub on_request { my $var = shift; # hashref with Net::AMQP header and
# body info my $body = $var->{body}->{payload}; # the number for which the fibonacci
# calculation is requested my $props = $var->{header}; # has correlation_id and reply_to queue
# details my $n = $body; print " [.] fib($n)\n"; my $response = fib($n); # publish/send the calculation to the client's queue $channel->publish( exchange => '', routing_key => $props->{reply_to}, header => { correlation_id => $props->{correlation_id}, }, body => $response, ); $channel->ack(); # acknoledgement that the calculation # was sent, so the message/calculation
# can be deleted from memory }
RPC Client
#!/usr/bin/perl use strict; use warnings; use v5.010; $|++; use Getopt::Long; use Parallel::ForkManager; use AnyEvent; use UUID::Tiny; use Net::RabbitFoot; use Data::Printer; # Process the command input and setup the default # for fibonacci processing, if no input provided # ----------------------------------------------- GetOptions( "numbers|n=s" => \@nums, "help|h" => sub { say "perl $0 [numbers|n ]" }, ); @nums = (scalar @nums) ? split(/,/,join(',',@nums)) : qw(50 30 20 10 5); # Declare subroutines # ------------------- sub fibonacci($); # ----------------------------------------------- # # Send parallelized requests to the RPC server # -------------------------------------------- my $pm = Parallel::ForkManager->new(10); # 1) set up data structure retrieval and handling # ----------------------------------------------- $pm->run_on_finish( sub { my ($pid, $exit_code, $ident, $exit_signal, $core_dump, $all_child_data_ref) = @_; if (defined $all_child_data_ref) { p $all_child_data_ref; } else { print qq|No message received from child process $pid!\n|; } } ); # 2) send the RPC requests # ------------------------ FIB_LOOP: foreach my $num (@nums) { my $pid = $pm->start and next FIB_LOOP; # ----- start child process print " [x] Requesting fib($num)\n"; my $response = fibonacci($num); # ----- end of child process $pm->finish(0, { $num => $response }); } $pm->wait_all_children; say "[END] - all children finished"; # ---------------------------------- SUBROUTINES ---------------------------------- # The main subroutine for the RPC (asynchronous) request # ------------------------------------------------------ sub fibonacci($) { my $n = shift; # set up condition variable to watch for an event (when we receive
# a result we asked for) my $cv = AnyEvent->condvar; # create a unique id for our request to the RPC server my $corr_id = UUID::Tiny::create_UUID_as_string(UUID::Tiny::UUID_V4); # connect to the RabbitMQ deamon # (in this case on the same server, the localhost) my $conn = Net::RabbitFoot->new()->load_xml_spec()->connect( host => 'localhost', port => 5672, user => 'guest', pass => 'guest', vhost => '/', ); my $channel = $conn->open_channel(); # after we disconnect, queue will be deleted my $result = $channel->declare_queue(exclusive => 1); my $callback_queue = $result->{method_frame}->{queue}; my $on_response = sub { my $var = shift; my $body = $var->{body}->{payload}; if ($corr_id eq $var->{header}->{correlation_id}) { $cv->send($body); } }; # callback to execute after the response from the RPC server comes back $channel->consume( no_ack => 1, # turning off message acknowledgment:
# we shall not notify the server on_consume => $on_response, # receives the server response, then
# makes the condition variable # true, which stops the event loop ); # send the request for the fibonacci calculation to the RPC server $channel->publish( exchange => '', routing_key => 'rpc_queue', # to which queue on the server we are
# sending the request # (ie the number for which we want the
fibonacci value) header => { reply_to => $callback_queue, # our queue to which the server
# will send its response correlation_id => $corr_id, # identifier of our request }, body => $n, # the number, for which we want the fibonacci
# value calculated ); return $cv->recv; # callback->recv blocks until callback->send
# is used # returns whatever data callback->send
# supplies
