AMQP-CPP

TCP CONNECTIONS

Although the AMQP-CPP library gives you extreme flexibility by letting you setup your own network connections, the reality is that most if not all AMQP connections use the TCP protocol. To help you out, the library therefore also comes with a TCP module that takes care of setting up the network connections, and sending and receiving the data.

If you want to use this TCP module, you should not use the AMQP::Connection and AMQP::Channel classes that you saw above, but the alternative AMQP::TcpConnection and AMQP::TcpChannel classes instead. You also do not have to create your own class that implements the "AMQP::ConnectionHandler" interface - but a class that inherits from "AMQP::TcpHandler" instead. You especially need to implement the "monitor()" method in that class, as that is needed by the AMQP-CPP library to interact with the main event loop:

#include <amqpcpp.h>

class MyTcpHandler : public AMQP::TcpHandler
{
    /**
     *  Method that is called by the AMQP library when the login attempt
     *  succeeded. After this method has been called, the connection is ready
     *  to use.
     *  @param  connection      The connection that can now be used
     */
    virtual void onConnected(AMQP::TcpConnection *connection)
    {
        // @todo
        //  add your own implementation, for example by creating a channel
        //  instance, and start publishing or consuming
    }

    /**
     *  Method that is called by the AMQP library when a fatal error occurs
     *  on the connection, for example because data received from RabbitMQ
     *  could not be recognized.
     *  @param  connection      The connection on which the error occured
     *  @param  message         A human readable error message
     */
    virtual void onError(AMQP::TcpConnection *connection, const char *message)
    {
        // @todo
        //  add your own implementation, for example by reporting the error
        //  to the user of your program, log the error, and destruct the
        //  connection object because it is no longer in a usable state
    }

    /**
     *  Method that is called when the connection was closed. This is the
     *  counter part of a call to Connection::close() and it confirms that the
     *  connection was correctly closed.
     *
     *  @param  connection      The connection that was closed and that is now unusable
     */
    virtual void onClosed(AMQP::TcpConnection *connection) {}

    /**
     *  Method that is called by the AMQP-CPP library when it wants to interact
     *  with the main event loop. The AMQP-CPP library is completely non-blocking,
     *  and only make "write()" or "read()" system calls when it knows in advance
     *  that these calls will not block. To register a filedescriptor in the
     *  event loop, it calls this "monitor()" method with a filedescriptor and
     *  flags telling whether the filedescriptor should be checked for readability
     *  or writability.
     *
     *  @param  connection      The connection that wants to interact with the event loop
     *  @param  fd              The filedescriptor that should be checked
     *  @param  flags           Bitwise or of AMQP::readable and/or AMQP::writable
     */
    virtual void monitor(AMQP::TcpConnection *connection, int fd, int flags)
    {
        // @todo
        //  add your own implementation, for example by adding the file
        //  descriptor to the main application event loop (like the select() or
        //  poll() loop). When the event loop reports that the descriptor becomes
        //  readable and/or writable, it is up to you to inform the AMQP-CPP
        //  library that the filedescriptor is active by calling the 
        //  connection->process(fd, flags) method.
    }
};

The "monitor()" method can be used to integrate the AMQP filedescriptors in your application's event loop. For some popular event loops (libev, libevent), we have already added example handler objects (see the next section for that).

Using the TCP module of the AMQP-CPP library is easier than using the raw AMQP::Connection and AMQP::Channel objects, because you do not have to create the sockets and connections yourself, and you also do not have to take care of buffering network data.

The example that we gave above, looks slightly different if you make use of the TCP module:

// create an instance of your own tcp handler
MyTcpHandler myHandler;

// address of the server
AMQP::Address address("amqp://guest:guest@localhost/vhost");

// create a AMQP connection object
AMQP::TcpConnection connection(&myHandler, address);

// and create a channel
AMQP::TcpChannel channel(&connection);

// use the channel object to call the AMQP method you like
channel.declareExchange("my-exchange", AMQP::fanout);
channel.declareQueue("my-queue");
channel.bindQueue("my-exchange", "my-queue", "my-routing-key");


Discussions

There are 0 discussions relevant for this page, and 1 discussions in the whole project.




Add Discussion

Log in to comment