Search This Blog

Saturday 29 November 2014

Fibonacci - using RPC RabbitMQ based server and client - Perl implementation

The implementation presented here, was inspired by rabbitmq-tutorials.

The fibonacci subroutine, in the server code, uses caching to speed up calculations for higher numbers.
The client script sends its requests in a parallelized manner.

The dependencies are:

    Getopt::Long
    Parallel::ForkManager
    AnyEvent
    UUID::Tiny
    Net::RabbitFoot
    Data::Printer         (can be replaced by Data::Dumper)


The scripts are heavily documented to help understand all steps.

RPC Server

#!/usr/bin/perl

=head1 RPC server using RabbitMQ to provide fibonacci series calculation

=cut

use strict;
use warnings;
use v5.010;

$|++;
use AnyEvent;
use Net::RabbitFoot;

use Data::Printer;

# Declare subroutines
# -------------------
sub fib;
sub on_request;

# We shall be caching intermediate results of the recursive fib subroutine
# ------------------------------------------------------------------------
# (if we don't, calculations take an enormous amount of time for higher numbers)

my $cached = {};

# Connection to the RabbitMQ daemon
# ---------------------------------
# (in this case on the same server, the localhost)

my $conn = Net::RabbitFoot->new()->load_xml_spec()->connect(
    host  => 'localhost',
    port  =>  5672,
    user  => 'guest',
    pass  => 'guest',
    vhost => '/',
);

# Create a RabbitMQ communication channel
# ---------------------------------------

my $channel = $conn->open_channel();

# Declare which queue we shall be offering the service on
# -------------------------------------------------------
# (will provide the fibonaci series result. If we don't get any input,
#  then the result will be calculated for n=30 )

$channel->declare_queue(queue => 'rpc_queue');

# Do the calculations, when a request comes
# -----------------------------------------
# (sent to the 'rpc_queue' queue) 

$channel->consume(
    on_consume => \&on_request,
);

print " [x] Awaiting RPC requests\n";

# Wait forever
# ------------
AnyEvent->condvar->recv;

# ---------------------------------- SUBROUTINES ----------------------------------

sub fib {
    my ($n, $padding) = @_
 
    # I used the padding to show the intermediate
    # recursive calculations during development. 
    $padding //= "";

    if ($n == 0 || $n == 1) {
        return $n;
    } else {
        my $n1 = $n-1;
        my $n2 = $n-2;

        $cached->{$n1} //=  fib($n1, "$padding  (-1)");
        $cached->{$n2} //=  fib($n2, "$padding  (-2)");
   
        return $cached->{$n1} + $cached->{$n2};
    }
}

sub on_request {
    my $var   = shift;                      # hashref with Net::AMQP header and 
                                            # body info
    my $body  = $var->{body}->{payload};    # the number for which the fibonacci
                                            # calculation is requested
    my $props = $var->{header};             # has correlation_id and reply_to queue
                                            # details


    my $n = $body;
    print " [.] fib($n)\n";

    my $response = fib($n);

    # publish/send the calculation to the client's queue
    $channel->publish(
        exchange    => '',
        routing_key => $props->{reply_to},
        header      => {
            correlation_id => $props->{correlation_id},
        },
        body => $response,
    );

    $channel->ack();                        # acknoledgement that the calculation  
                                            # was sent, so the message/calculation
                                            # can be deleted from memory
} 

RPC Client

#!/usr/bin/perl

use strict;
use warnings;
use v5.010;

$|++;

use Getopt::Long;
use Parallel::ForkManager;

use AnyEvent;
use UUID::Tiny;
use Net::RabbitFoot;

use Data::Printer;

# Process the command input and setup the default
# for fibonacci processing, if no input provided
# -----------------------------------------------
GetOptions( 
    "numbers|n=s"  => \@nums,
    "help|h"        => sub { say "perl $0 [numbers|n ]" },
);
@nums = (scalar @nums) ? split(/,/,join(',',@nums)) : qw(50 30 20 10 5);

# Declare subroutines
# -------------------

sub fibonacci($);

# -----------------------------------------------
#
# Send parallelized requests to the RPC server
# --------------------------------------------

my $pm = Parallel::ForkManager->new(10);

# 1) set up data structure retrieval and handling
# -----------------------------------------------

$pm->run_on_finish( 
    sub {
      my ($pid, 
          $exit_code, $ident, $exit_signal, $core_dump, 
          $all_child_data_ref) = @_;

      if (defined $all_child_data_ref) {
        p $all_child_data_ref;
      }
      else {
        print qq|No message received from child process $pid!\n|;
      }
    }
);

# 2) send the RPC requests
# ------------------------

FIB_LOOP:
foreach my $num (@nums) {
    my $pid = $pm->start and next FIB_LOOP;
    # ----- start child process

    print " [x] Requesting fib($num)\n";
    my $response  = fibonacci($num);

    # ----- end of child process
    $pm->finish(0, { $num => $response });
}

$pm->wait_all_children;

say "[END] - all children finished";

# ---------------------------------- SUBROUTINES ---------------------------------- 

# The main subroutine for the RPC (asynchronous) request
# ------------------------------------------------------

sub fibonacci($) {
    my $n = shift;

    # set up condition variable to watch for an event (when we receive
    # a result we asked for)
    my $cv = AnyEvent->condvar;

    # create a unique id for our request to the RPC server  
    my $corr_id = UUID::Tiny::create_UUID_as_string(UUID::Tiny::UUID_V4);

    # connect to the RabbitMQ deamon
    # (in this case on the same server, the localhost)
    my $conn = Net::RabbitFoot->new()->load_xml_spec()->connect(
        host  => 'localhost',
        port  =>  5672,
        user  => 'guest',
        pass  => 'guest',
        vhost => '/',
    );

    my $channel = $conn->open_channel();

    # after we disconnect, queue will be deleted
    my $result          = $channel->declare_queue(exclusive => 1);
    my $callback_queue  = $result->{method_frame}->{queue};

    my $on_response = sub {
        my $var     = shift;
        my $body    = $var->{body}->{payload};
        if ($corr_id eq $var->{header}->{correlation_id}) {
            $cv->send($body);
        }
    };

    # callback to execute after the response from the RPC server comes back
    $channel->consume(
        no_ack      => 1,               # turning off message acknowledgment: 
                                        #     we shall not notify the server
        on_consume  => $on_response,    # receives the server response, then 
                                        #     makes the condition variable
                                        #     true, which stops the event loop
    );

    # send the request for the fibonacci calculation to the RPC server
    $channel->publish(
        exchange    => '',
        routing_key => 'rpc_queue',     # to which queue on the server we are
                                        # sending the request 
                                        # (ie the number for which we want the
                                           fibonacci value)
        header      => {
            reply_to        => $callback_queue,     # our queue to which the server
                                                    #      will send its response
            correlation_id  => $corr_id,            # identifier of our request
        },
        body => $n,                     # the number, for which we want the fibonacci
                                        # value calculated
    );

    return $cv->recv;                   # callback->recv blocks until callback->send
                                        # is used
                                        # returns whatever data callback->send 
                                        # supplies 
}

No comments:

Post a Comment

Note: only a member of this blog may post a comment.