AMQP-CPP

CONSUMING MESSAGES

Fetching messages from RabbitMQ is called consuming, and can be started by calling the method Channel::consume(). After you've called this method, RabbitMQ starts delivering messages to you.

Just like the publish() method that we just described, the consume() method also comes in many forms. The first parameter is always the name of the queue you like to consume from. The subsequent parameters are an optional consumer tag, flags and a table with custom arguments. The first additional parameter, the consumer tag, is nothing more than a string identifier that you can use when you want to stop consuming.

The full documentation from the C++ Channel.h headerfile looks like this:

/**
 *  Tell the RabbitMQ server that we're ready to consume messages
 *
 *  After this method is called, RabbitMQ starts delivering messages to the client
 *  application. The consume tag is a string identifier that will be passed to
 *  each received message, so that you can associate incoming messages with a
 *  consumer. If you do not specify a consumer tag, the server will assign one
 *  for you.
 *
 *  The following flags are supported:
 *
 *      -   nolocal             if set, messages published on this channel are
 *                              not also consumed
 *
 *      -   noack               if set, consumed messages do not have to be acked,
 *                              this happens automatically
 *
 *      -   exclusive           request exclusive access, only this consumer can
 *                              access the queue
 *
 *  The callback registered with DeferredConsumer::onSuccess() will be called when the
 *  consumer has started.
 *
 *  @param  queue               the queue from which you want to consume
 *  @param  tag                 a consumer tag that will be associated with this consume operation
 *  @param  flags               additional flags
 *  @param  arguments           additional arguments
 *  @return bool
 */
DeferredConsumer &consume(const std::string &queue, const std::string &tag, int flags, const AMQP::Table &arguments);
DeferredConsumer &consume(const std::string &queue, const std::string &tag, int flags = 0);
DeferredConsumer &consume(const std::string &queue, const std::string &tag, const AMQP::Table &arguments);
DeferredConsumer &consume(const std::string &queue, int flags, const AMQP::Table &arguments);
DeferredConsumer &consume(const std::string &queue, int flags = 0);
DeferredConsumer &consume(const std::string &queue, const AMQP::Table &arguments);

As you can see, the consume method returns a DeferredConsumer. This object is a regular Deferred, with additions. The onSuccess() method of a DeferredConsumer is slightly different than the onSuccess() method of a regular Deferred object: one extra parameter will be supplied to your callback function with the consumer tag.

The onSuccess() callback will be called when the consume operation has started, but not when messages are actually consumed. For this you will have to install a different callback, using the onReceived() method.

// callback function that is called when the consume operation starts
auto startCb = [](const std::string &consumertag) {

    std::cout << "consume operation started" << std::endl;
};

// callback function that is called when the consume operation failed
auto errorCb = [](const char *message) {

    std::cout << "consume operation failed" << std::endl;
}

// callback operation when a message was received
auto messageCb = [&channel](const AMQP::Message &message, uint64_t deliveryTag, bool redelivered) {

    std::cout << "message received" << std::endl;

    // acknowledge the message
    channel.ack(deliveryTag);
}

// start consuming from the queue, and install the callbacks
channel.consume("my-queue")
    .onReceived(messageCb)
    .onSuccess(startCb)
    .onError(errorCb);

The Message object holds all information of the delivered message: the actual content, all meta information from the envelope (in fact, the Message class is derived from the Envelope class), and even the name of the exchange and the routing key that were used when the message was originally published. For a full list of all information in the Message class, you best have a look at the message.h, envelope.h and metadata.h header files.

Another important parameter to the onReceived() method is the deliveryTag parameter. This is a unique identifier that you need to acknowledge an incoming message. RabbitMQ only removes the message after it has been acknowledged, so that if your application crashes while it was busy processing the message, the message does not get lost but remains in the queue. But this means that after you've processed the message, you must inform RabbitMQ about it by calling the Channel:ack() method. This method is very simple and takes in its simplest form only one parameter: the deliveryTag of the message.

Consuming messages is a continuous process. RabbitMQ keeps sending messages, until you stop the consumer, which can be done by calling the Channel::cancel() method. If you close the channel, or the entire TCP connection, consuming also stops.

RabbitMQ throttles the number of messages that are delivered to you, to prevent that your application is flooded with messages from the queue, and to spread out the messages over multiple consumers. This is done with a setting called quality-of-service (QOS). The QOS setting is a numeric value which holds the number of unacknowledged messages that you are allowed to have. RabbitMQ stops sending additional messages when the number of unacknowledges messages has reached this limit, and only sends additional messages when an earlier message gets acknowledged. To change the QOS, you can simple call Channel::setQos().



Discussions

There are 0 discussions relevant for this page, and 1 discussions in the whole project.




Add Discussion

Log in to comment