The implementation presented here, was inspired by rabbitmq-tutorials.
The fibonacci subroutine, in the server code, uses caching to speed up calculations for higher numbers.
The client script sends its requests in a parallelized manner.
The dependencies are:
Getopt::Long
Parallel::ForkManager
AnyEvent
UUID::Tiny
Net::RabbitFoot
Data::Printer (can be replaced by Data::Dumper)
The scripts are heavily documented to help understand all steps.
The fibonacci subroutine, in the server code, uses caching to speed up calculations for higher numbers.
The client script sends its requests in a parallelized manner.
The dependencies are:
Getopt::Long
Parallel::ForkManager
AnyEvent
UUID::Tiny
Net::RabbitFoot
Data::Printer (can be replaced by Data::Dumper)
The scripts are heavily documented to help understand all steps.
RPC Server
#!/usr/bin/perl =head1 RPC server using RabbitMQ to provide fibonacci series calculation =cut use strict; use warnings; use v5.010; $|++; use AnyEvent; use Net::RabbitFoot; use Data::Printer; # Declare subroutines # ------------------- sub fib; sub on_request; # We shall be caching intermediate results of the recursive fib subroutine # ------------------------------------------------------------------------ # (if we don't, calculations take an enormous amount of time for higher numbers) my $cached = {}; # Connection to the RabbitMQ daemon # --------------------------------- # (in this case on the same server, the localhost) my $conn = Net::RabbitFoot->new()->load_xml_spec()->connect( host => 'localhost', port => 5672, user => 'guest', pass => 'guest', vhost => '/', ); # Create a RabbitMQ communication channel # --------------------------------------- my $channel = $conn->open_channel(); # Declare which queue we shall be offering the service on # ------------------------------------------------------- # (will provide the fibonaci series result. If we don't get any input, # then the result will be calculated for n=30 ) $channel->declare_queue(queue => 'rpc_queue'); # Do the calculations, when a request comes # ----------------------------------------- # (sent to the 'rpc_queue' queue) $channel->consume( on_consume => \&on_request, ); print " [x] Awaiting RPC requests\n"; # Wait forever # ------------ AnyEvent->condvar->recv; # ---------------------------------- SUBROUTINES ---------------------------------- sub fib { my ($n, $padding) = @_;
# I used the padding to show the intermediate
# recursive calculations during development.
$padding //= ""; if ($n == 0 || $n == 1) { return $n; } else { my $n1 = $n-1; my $n2 = $n-2; $cached->{$n1} //= fib($n1, "$padding (-1)"); $cached->{$n2} //= fib($n2, "$padding (-2)"); return $cached->{$n1} + $cached->{$n2}; } } sub on_request { my $var = shift; # hashref with Net::AMQP header and
# body info my $body = $var->{body}->{payload}; # the number for which the fibonacci
# calculation is requested my $props = $var->{header}; # has correlation_id and reply_to queue
# details my $n = $body; print " [.] fib($n)\n"; my $response = fib($n); # publish/send the calculation to the client's queue $channel->publish( exchange => '', routing_key => $props->{reply_to}, header => { correlation_id => $props->{correlation_id}, }, body => $response, ); $channel->ack(); # acknoledgement that the calculation # was sent, so the message/calculation
# can be deleted from memory }
RPC Client
#!/usr/bin/perl use strict; use warnings; use v5.010; $|++; use Getopt::Long; use Parallel::ForkManager; use AnyEvent; use UUID::Tiny; use Net::RabbitFoot; use Data::Printer; # Process the command input and setup the default # for fibonacci processing, if no input provided # ----------------------------------------------- GetOptions( "numbers|n=s" => \@nums, "help|h" => sub { say "perl $0 [numbers|n ]" }, ); @nums = (scalar @nums) ? split(/,/,join(',',@nums)) : qw(50 30 20 10 5); # Declare subroutines # ------------------- sub fibonacci($); # ----------------------------------------------- # # Send parallelized requests to the RPC server # -------------------------------------------- my $pm = Parallel::ForkManager->new(10); # 1) set up data structure retrieval and handling # ----------------------------------------------- $pm->run_on_finish( sub { my ($pid, $exit_code, $ident, $exit_signal, $core_dump, $all_child_data_ref) = @_; if (defined $all_child_data_ref) { p $all_child_data_ref; } else { print qq|No message received from child process $pid!\n|; } } ); # 2) send the RPC requests # ------------------------ FIB_LOOP: foreach my $num (@nums) { my $pid = $pm->start and next FIB_LOOP; # ----- start child process print " [x] Requesting fib($num)\n"; my $response = fibonacci($num); # ----- end of child process $pm->finish(0, { $num => $response }); } $pm->wait_all_children; say "[END] - all children finished"; # ---------------------------------- SUBROUTINES ---------------------------------- # The main subroutine for the RPC (asynchronous) request # ------------------------------------------------------ sub fibonacci($) { my $n = shift; # set up condition variable to watch for an event (when we receive
# a result we asked for) my $cv = AnyEvent->condvar; # create a unique id for our request to the RPC server my $corr_id = UUID::Tiny::create_UUID_as_string(UUID::Tiny::UUID_V4); # connect to the RabbitMQ deamon # (in this case on the same server, the localhost) my $conn = Net::RabbitFoot->new()->load_xml_spec()->connect( host => 'localhost', port => 5672, user => 'guest', pass => 'guest', vhost => '/', ); my $channel = $conn->open_channel(); # after we disconnect, queue will be deleted my $result = $channel->declare_queue(exclusive => 1); my $callback_queue = $result->{method_frame}->{queue}; my $on_response = sub { my $var = shift; my $body = $var->{body}->{payload}; if ($corr_id eq $var->{header}->{correlation_id}) { $cv->send($body); } }; # callback to execute after the response from the RPC server comes back $channel->consume( no_ack => 1, # turning off message acknowledgment:
# we shall not notify the server on_consume => $on_response, # receives the server response, then
# makes the condition variable # true, which stops the event loop ); # send the request for the fibonacci calculation to the RPC server $channel->publish( exchange => '', routing_key => 'rpc_queue', # to which queue on the server we are
# sending the request # (ie the number for which we want the
fibonacci value) header => { reply_to => $callback_queue, # our queue to which the server
# will send its response correlation_id => $corr_id, # identifier of our request }, body => $n, # the number, for which we want the fibonacci
# value calculated ); return $cv->recv; # callback->recv blocks until callback->send
# is used # returns whatever data callback->send
# supplies
}
No comments:
Post a Comment
Note: only a member of this blog may post a comment.