AMQP-CPP

PUBLISHING MESSAGES

Publishing messages is easy, and the Channel class has a list of methods that can all be used for it. The most simple one takes three arguments: the name of the exchange to publish to, the routing key to use, and the actual message that you're publishing - all these parameters are standard C++ strings.

More extended versions of the publish() method exist that accept additional arguments, and that enable you to publish entire Envelope objects. An envelope is an object that contains the message plus a list of optional meta properties like the content-type, content-encoding, priority, expire time and more. None of these meta fields are interpreted by this library, and also the RabbitMQ ignores most of them, but the AMQP protocol defines them, and they are free for you to use. For an extensive list of the fields that are supported, take a look at the MetaData.h header file (MetaData is the base class for Envelope). You should also check the RabbitMQ documentation to find out if an envelope header is interpreted by the RabbitMQ server (at the time of this writing, only the expire time is being used).

The following snippet is copied from the Channel.h header file and lists all available publish() methods. As you can see, you can call the publish() method in almost any form:

/**
 *  Publish a message to an exchange
 *
 *  The following flags can be used
 *
 *      -   mandatory   if set, an unroutable message will be sent back to
 *                      the client (currently not supported)
 *
 *      -   immediate   if set, a message that could not immediately be consumed
 *                      is returned to the client (currently not supported)
 *
 *  If either of the two flags is set, and the message could not immediately
 *  be published, the message is returned by the server to the client. However,
 *  at this moment in time, the AMQP-CPP library does not support catching
 *  such returned messages.
 *
 *  @param  exchange    the exchange to publish to
 *  @param  routingkey  the routing key
 *  @param  flags       optional flags (see above)
 *  @param  envelope    the full envelope to send
 *  @param  message     the message to send
 *  @param  size        size of the message
 */
bool publish(const std::string &exchange, const std::string &routingKey, int flags, const AMQP::Envelope &envelope);
bool publish(const std::string &exchange, const std::string &routingKey, const AMQP::Envelope &envelope);
bool publish(const std::string &exchange, const std::string &routingKey, int flags, const std::string &message);
bool publish(const std::string &exchange, const std::string &routingKey, const std::string &message);
bool publish(const std::string &exchange, const std::string &routingKey, int flags, const char *message, size_t size);
bool publish(const std::string &exchange, const std::string &routingKey, const char *message, size_t size);

Published messages are normally not confirmed by the server, and the RabbitMQ will not send a report back to inform you whether the message was succesfully published or not. Therefore the publish method does not return a Deferred object.

As long as no error is reported via the Channel::onError() method, you can safely assume that your messages were delivered.

This can of course be a problem when you are publishing many messages. If you get an error halfway through there is no way to know for sure how many messages made it to the broker and how many should be republished. If this is important, you can wrap the publish commands inside a transaction. In this case, if an error occurs, the transaction is automatically rolled back by RabbitMQ and none of the messages are actually published.

// start a transaction
channel.startTransaction();

// publish a number of messages
channel.publish("my-exchange", "my-key", "my first message");
channel.publish("my-exchange", "my-key", "another message");

// commit the transactions, and set up callbacks that are called when
// the transaction was successful or not
channel.commitTransaction()
    .onSuccess([]() {
        // all messages were successfully published
    })
    .onError([]() {
        // none of the messages were published
        // now we have to do it all over again
    });

Note that AMQP transactions are not as powerful as transactions that are knows in the database world. It is not possible to wrap all sort of operations in a transaction, they are only meaningful for publishing and consuming.



Discussions

There are 0 discussions relevant for this page, and 1 discussions in the whole project.




Add Discussion

Log in to comment