AMQP-CPP

CHANNELS

In the above example we created a channel object. A channel is a sort of virtual connection, and it is possible to create many channels that all use the same connection.

AMQP instructions are always sent over a channel, so before you can send the first command to the RabbitMQ server, you first need a channel object. The channel object has many methods to send instructions to the RabbitMQ server. It for example has methods to declare queues and exchanges, to bind and unbind them, and to publish and consume messages. You can best take a look at the channel.h C++ header file for a list of all available methods. Every method in it is well documented.

All operations that you can perform on a channel are non-blocking. This means that it is not possible for a method (like Channel::declareExchange()) to immediately return 'true' or 'false'. Instead, almost every method of the Channel class returns an instance of the 'Deferred' class. This 'Deferred' object can be used to install handlers that will be called in case of success or failure.

For example, if you call the channel.declareExchange() method, the AMQP-CPP library will send a message to the RabbitMQ message broker to ask it to declare the queue. However, because all operations in the library are asynchronous, the declareExchange() method can not return 'true' or 'false' to inform you whether the operation was succesful or not. Only after a while, after the instruction has reached the RabbitMQ server, and the confirmation from the server has been sent back to the client, the library can report the result of the declareExchange() call.

To prevent any blocking calls, the channel.declareExchange() method returns a 'Deferred' result object, on which you can set callback functions that will be called when the operation succeeds or fails.

// create a channel (or use TcpChannel if you're using the Tcp module)
Channel myChannel(&connection);

// declare an exchange, and install callbacks for success and failure
myChannel.declareExchange("my-exchange")

    .onSuccess([]() {
        // by now the exchange is created
    })

    .onError([](const char *message) {
        // something went wrong creating the exchange
    });

As you can see in the above example, we call the declareExchange() method, and treat its return value as an object, on which we immediately install a lambda callback function to handle success, and to handle failure.

Installing the callback methods is optional. If you're not interested in the result of an operation, you do not have to install a callback for it. Next to the onSuccess() and onError() callbacks that can be installed, you can also install a onFinalize() method that gets called directly after the onSuccess() and onError() methods, and that can be used to set a callback that should run in either case: when the operation succeeds or when it fails.

The signature for the onError() method is always the same: it gets one parameter with a human readable error message. The onSuccess() function has a different signature depending on the method that you call. Most onSuccess() functions (like the one we showed for the declareExchange() method) do not get any parameters at all. Some specific onSuccess callbacks receive extra parameters with additional information.

CHANNEL CALLBACKS

As explained, most channel methods return a 'Deferred' object on which you can install callbacks using the Deferred::onError() and Deferred::onSuccess() methods.

The callbacks that you install on a Deferred object, only apply to one specific operation. If you want to install a generic error callback for the entire channel, you can so so by using the Channel::onError() method. Next to the Channel::onError() method, you can also install a callback to be notified when the channel is ready for sending the first instruction to RabbitMQ.

// create a channel (or use TcpChannel if you use the Tcp module)
Channel myChannel(&connection);

// install a generic channel-error handler that will be called for every
// error that occurs on the channel
myChannel.onError([](const char *message) {

    // report error
    std::cout << "channel error: " << message << std::endl;
});

// install a generic callback that will be called when the channel is ready
// for sending the first instruction
myChannel.onReady([]() {

    // send the first instructions (like publishing messages)
});

In theory, you should wait for the onReady() callback to be called before you send any other instructions over the channel. In practice however, the AMQP library caches all instructions that were sent too early, so that you can use the channel object right after it was constructed.

CHANNEL ERRORS

It is important to realize that any error that occurs on a channel, invalidates the entire channel, including all subsequent instructions that were already sent over it. This means that if you call multiple methods in a row, and the first method fails, all subsequent methods will not be executed either:

Channel myChannel(&connection);
myChannel.declareQueue("my-queue");
myChannel.declareExchange("my-exchange");

If the first declareQueue() call fails in the example above, the second myChannel.declareExchange() method will not be executed, even when this second instruction was already sent to the server. The second instruction will be ignored by the RabbitMQ server because the channel was already in an invalid state after the first failure.

You can overcome this by using multiple channels:

Channel channel1(&connection);
Channel channel2(&connection);
channel1.declareQueue("my-queue");
channel2.declareExchange("my-exchange");

Now, if an error occurs with declaring the queue, it will not have consequences for the other call. But this comes at a small price: setting up the extra channel requires and extra instruction to be sent to the RabbitMQ server, so some extra bytes are sent over the network, and some additional resources in both the client application and the RabbitMQ server are used (although this is all very limited).

If possible, it is best to make use of this feature. For example, if you have an important AMQP connection that you use for consuming messages, and at the same time you want to send another instruction to RabbitMQ (like declaring a temporary queue), it is best to set up a new channel for this 'declare' instruction. If the declare fails, it will not stop the consumer, because it was sent over a different channel.

The AMQP-CPP library allows you to create channels on the stack. It is not a problem if a channel object gets destructed before the instruction was received by the RabbitMQ server:

void myDeclareMethod(AMQP::Connection *connection)
{
    // create temporary channel to declare a queue
    AMQP::Channel channel(connection);

    // declare the queue (the channel object is destructed before the
    // instruction reaches the server, but the AMQP-CPP library can deal
    // with this)
    channel.declareQueue("my-new-queue");
}


Discussions

There are 0 discussions relevant for this page, and 1 discussions in the whole project.




Add Discussion

Log in to comment