The Apache Qpid XML Exchange

High speed, reliable enterprise messaging using open standards and open source

Jonathan Robie

Emerging Technologies

Red Hat

Copyright © 2008 by the author. Used with permission.

expand Abstract

expand Jonathan Robie

Balisage logo

Proceedings

expand How to cite this paper

The Apache Qpid XML Exchange

High speed, reliable enterprise messaging using open standards and open source

Balisage: The Markup Conference 2008
August 12 - 15, 2008

Modern software systems are often composed of many separate programs working together. In simple cases, there may be a few programs running on one computer; in more ambitious systems, many programs may be running on various hardware and operating systems on machines scattered across the world. These programs may be developed in a variety of languages, including both traditional programming languages and scripting languages, and they may include legacy programs written long before there was a need to communicate with other programs. Programs may be distributed across an organization, or span the boundaries of an organization to include key trading partners, suppliers, and consumers. These programs need to share data, and many of these programs represent that data as XML. From the very beginning, XML was designed to make it easier for programs to exchange data, independent of the languages, platforms, and underlying systems Bosak, and it has been widely used as a way to exchange data with existing proprietary systems. But XML defines only the data format, not the mechanism for exchanging documents among programs.

In many systems, the requirement is to support message transfer, allowing one program to send a message that can be received by one or more other programs. These messages may be simple notifications, or part of a request/response, publish/subscribe, or similar pattern. Message-oriented middleware (MOM) provides direct support for sending and receiving messages, hiding the details of network communication, and providing guaranteed delivery, high performance, transactions, and a variety of other features. Most MOM systems support asynchronous messaging between programs, typically sending messages to message queues that store a message until it is retrieved, even if the receiving program is busy or not connected. These systems are often described using an email analogy - a program can address a message and send it, or receive the messages addressed to it. Other MOM systems use multicast or broadcast messaging instead of or in addition to asynchronous messaging.

Unfortunately, message-oriented Middleware has been plagued by a lack of standards. Each vendor's system uses its own proprietary protocols, so clients from one system generally can not communicate with servers from another system. For Java programs, most systems support the Java JMS API, but this is only an API, and does not define a standard protocol that would allow communication across systems. The Advanced Message Queuing Protocol (AMQP) AMQP was developed by a consortium of companies [1] to be an open standard for enterprise level messaging. It is licensed to allow implementation on a perpetual, worldwide, royalty-free basis, and will be submitted to a recognized standards body.[2] AMQP supports most common messaging idioms, including request/response, point-to-point, publish/subscribe, and broadcast. It has security, reliability, transactions, and other basic enterprise messaging features built in. AMQP defines a network protocol, which specifies what client applications and message servers must send over the wire to interoperate with each other. It also defines a protocol model, which specifies the semantics an AMQP implementation must obey to be interoperable with other implementations. AMQP is already being used in production systems, where it is serving very high message volumes; for example, one bank has a worldwide deployment that delivers over 100 million messages per day in a 7 hour trading window in its Tokyo hub.

Red Hat Enterprise MRG MRG includes a multi-language, multi-platform implementation of AMQP that provides enterprise level messaging with guaranteed delivery, transaction management, queuing, distribution, security, management and heterogeneous multi-platform support. Client APIs include C++, Java, Python and C# for .NET. A JMS API is also provided for JMS environments. We develop the AMQP implementation as part of the upstream Apache Qpid project Qpid.

The Apache Qpid XML Exchange is a messaging exchange specifically designed for reliable, high performance XML messaging. In the XML Exchange, message routing is specified in the broker (not in the application), using XQuery, based on the content of XML messages, message properties, or both. Queries can be as simple or as complex as needed. Clients can determine which documents they are interested in without changing the applications that publish XML documents. The Qpid XML Exchange was developed by Red Hat and contributed to the Apache Qpid project.

This article discusses the AMQP protocol, shows how to write a basic messaging application in Apache Qpid, then shows how to write messaging applications using the Qpid XML Exchange, using XQuery to specify which messages should be routed to a queue. Finally, we make a brief comparison between the technologies presented in this paper and alternative technologies like REST, SOAP and Web Services, Java JMS, ESB, and ebXML.

Advanced Message Queueing Protocol

The AMQP Model

In the AMQP model, message producers write to exchanges, exchanges route messages to queues, and message consumers read from queues. AMQP allows modular architectures that cleanly separate the concerns of each module; for instance, a message producer can write to an exchange without knowing which queues its messages will be routed to, and a message consumer can read from its queues without knowing anything about the message producer or the exchange to which the message was written.

Here are the basic components of the AMQP model:

  • A message producer is a program that writes messages to an exchange. To do this, it creates a message, fills it with content, gives the message a routing key[3], and sends it to an exchange. The routing key is simply a string that the exchange can use to determine to which queues the message should be delivered[4].

  • An exchange accepts messages from message producers and routes them to message queues if the message meets the criteria expressed in a binding.

  • A binding defines the relationship between an exchange and a message queue, specifying which messages should be routed to a given queue. For instance, a binding might state that all messages with a given routing key should be sent to a particular queue. If a queue is not bound to an exchange, it does not receive any messages from that exchange.

  • A message queue holds messages and delivers them to the message consumers that subscribe to the queue. A message queue may be durable, which means that the queue is never lost; even if the messaging broker were to suffer a hardware failure, the queue would be restored when the broker is restarted. A message queue may be exclusive, which means only one client can use it. A message queue may also be auto-delete, which means that the queue will disappear from the server when the last client unsubscribes from the queue.

  • A message consumer is a program that reads messages from a message queue. A message consumer can create, subscribe to, share, use, or destroy message queues and their bindings (as long as it has have permission to do so).

Before delivering a message, the message producer can influence message handling characteristics by setting various message properties in the message; for instance, one property determines whether the message is durable. If a broker supports persistence, it guarantees durable messages are never lost; even if the messaging broker were to suffer a hardware failure, all durable messages would be delivered when the broker is restarted. Another property can be used to specify message priority; the broker gives higher priority messages precedence over lower priority messages. A message producer can use transactions to ensure that a group of messages are all received. In a transaction, messages and acknowledgements acknowledgements are batched together, and all messages in the transaction succeed or fail as a unit.

The exact way that a message is routed depends on the exchange type. Here are the main exchange types in AMQP:

  • A fanout exchange routes messages to every queue bound to the exchange, ignoring the routing key.

  • A direct exchange routes a message only if a queue's binding key is the same as the message's routing key.

  • A topic exchange is similar to a Direct exchange, but it supports multipart keys that contain multiple words separated by the “.” delimiter; for instance, a message producer can create messages with routing keys like usa.news, usa.weather, europe.news, and europe.weather. Binding keys can include wildcard characters: a “#” matches one or more words, a “*” matches a single word. Typical bindings use binding keys like #.news (all news items), usa.# (all items in the USA), or usa.weather (all usa weather items).

Any of the standard exchange types we have discussed in this paragraph can be used for exchanging any kind of data, including XML. Routing decisions are based on the exchange type and the routing key only; none of these exhange types allow routing based on the content of a message, and none has any direct support for XML. The XML Exchange, which is the subject of this paper, is a custom exchange type that lets programs use XQuery to specify routing for XML messages. Before we discuss the XML Exchange, we will look at the code needed for a simple and conventional AMQP program written in Python.

Programming AMQP Applications with Apache Qpid

This section shows how to write a basic AMQP application using Apache Qpid. The examples in this section are written in Python and use the direct exchange. Like the other native AMQP exchanges, the direct exchange works well for sending XML messages as long as routing decisions are not made based on the content of the messages. Applications that need to do routing based on the XML content of a message use the XML exchange, which is presented in the following section.

This application is composed of two programs:

  • A message producer that sends messages to an exchange (which routes messages to the appropriate message queue).

  • A message consumer that creates a message queue, binds the queue to the exchange, then reads messages that are sent to its queue.

In this application, messages are published to the direct exchange using the key "routing_key". The first step is to establish a binding on the server to route these messages to a queue called "message_queue":

session.queue_declare(queue="message_queue")
session.exchange_bind(exchange="amq.direct", queue="message_queue", 
routing_key="routing_key")

Before these lines can be invoked, the client must log in to the server and create a session. Here is a complete program that uses the above two lines to declare a queue:

import qpid
from qpid.client import Client
from qpid.content import Content
from qpid.queue import Empty

#----- Initialization ----------------------------

#  Set parameters for login

host="127.0.0.1"
port=5672
amqp_spec="/usr/share/amqp/amqp.0-10-preview.xml"
user="guest"
password="guest"

#  Create a client and log in to the server.

client = Client(host, port, qpid.spec.load(amqp_spec))
client.start({"LOGIN": user, "PASSWORD": password})

session = client.session()
session.session_open()

#----- Main Body of Program-----------------------

session.queue_declare(queue="message_queue")
session.exchange_bind(exchange="amq.direct", queue="message_queue", 
routing_key="routing_key")

#----- Cleanup -----------------------------------

# Close the session before exiting so there are no open threads.

session.session_close()

The following code publishes 10 messages to the direct exchange using "routing_key" as the routing key. To create a full program, simply replace the two lines in the main body of the previous example with the following lines:

for i in range(10):
  message = Content("message " + str(i))
  message["routing_key"] = "routing_key"
  session.message_transfer(destination="amq.direct", content=message)

Now let's write a program that reads messages from the message queue. First, the client creates a local queue into which messages will be read from the server, subscribes to the message queue on the server, and activates the local queue to start receiving messages. It then reads messages off the queue and prints them::

local_queue_name = "local_queue"
local_queue = session.incoming(local_queue_name)

session.message_subscribe(queue="message_queue", destination=local_queue_name)
local_queue.start()

final = "That's all, folks!"   # In a message body, signals the last message
content = ""           # Content of the last message read

message = None
while content != final:
  message = local_queue.get(timeout=10)
  content = message.body
        session.message_accept(RangedSet(message.id)) # acknowledge message receipt
  print content

This section introduced basic Apache Qpid messaging using a direct exchange in python. For a more complete tutorial on Apache Qpid, including point-to-point, broadcast, request-response, and publish-subscribe applications, with persistence and transactions, in Java JMS, C++, and Python, see the MRG Messaging Tutorial MRGTutorial.

The Apache Qpid XML Exchange

The Qpid XML Exchange adds two basic capabilities to Apache Qpid: it can route XML messages based on message content, and it can do more sophisticated queries based on message properties. In both cases, queries are expressed in XQuery.

For a Qpid programmer, the main difference between an XML Exchange and any other exchange is the message binding. In the XML Exchange, an XQuery is provided to indicate which documents should be routed for a given binding.

A Simple Example

The following code declares an XML exchange and uses an XQuery to specify that all messages with odd-numbered ids should be routed to a queue named "message_queue":

session.exchange_declare(exchange="xml", type="xml")
session.queue_declare(queue="message_queue")

binding = {}
binding["xquery"] = "./message/id mod 2 = 1"

session.exchange_bind(exchange="xml", queue="message_queue", 
routing_key="routing_key", args=binding)

A publisher can write XML messages to this message queue with no knowledge of the query, it needs only the name of the XML exchange so it can specify the destination:

for i in range(10):
  message = Content("<message><id>" + i + "</id></message>"))
  message["routing_key"] = "routing_key"
  session.message_transfer(destination="xml", content=message)

A consumer needs no knowledge of the publisher or the XML Exchange, it simply reads messages from its queue, as shown in the previous section.

A Weather Alert

Time to go Sailing!

Now let's explore an application that uses the XML Exchange in a publish/subscribe application using weather observations from the National Oceanic and Atmospheric Administration's National Weather Service NWS. In this application, each individual can set up a persistent queue, to which alerts are sent, and identify weather conditions that should trigger an alert. For instance, I like to sail small sailboats, and I live in Durham, NC, so I might ask for a weather alert whenever it is really good weather for sailing.

This example is composed of two programs:

  • A message producer that sends weather notifications in XML to an exchange (which routes messages to the appropriate message queues).

  • A message consumer that creates a message queue, binds the queue to the XML exchange using an XQuery to indicate messages it wants to receive, then reads messages that are sent to its queue.

In this application, a producer posts messages to an XML exchange named "weather" in the National Weather Service's standard XML format. Here is one message in that format:

<current_observation version="1.0"
xsi:noNamespaceSchemaLocation="http://www.weather.gov/data/current_obscurrent_observation.xsd">
<credit>NOAA's National Weather Service</credit>
<credit_URL>http://weather.gov/</credit_URL>
<image>
<url>http://weather.gov/images/xml_logo.gif</url>
<title>NOAA's National Weather Service</title>
<link>http://weather.gov</link>
</image>
<suggested_pickup>15 minutes after the hour</suggested_pickup>
<suggested_pickup_period>60</suggested_pickup_period>
<location>Raleigh-Durham International Airport, NC</location>
<station_id>KRDU</station_id>
<latitude>35.88</latitude>
<longitude>-78.78</longitude>
<observation_time>Last Updated on Apr 21, 2:51 pm EDT</observation_time>
<observation_time_rfc822>Mon, 21 Apr 2008 14:51:00 -0400 EDT</observation_time_rfc822>
<weather>Mostly Cloudy</weather>
<temperature_string>67 F (19 C)</temperature_string>
<temp_f>67</temp_f>
<temp_c>19</temp_c>
<relative_humidity>57</relative_humidity>
<wind_string>Variable at 7 MPH</wind_string>
<wind_dir>Variable</wind_dir>
<wind_degrees>999</wind_degrees>
<wind_mph>6.9</wind_mph>
<wind_gust_mph>NA</wind_gust_mph>
<pressure_string>29.95" (1014.0 mb)</pressure_string>
<pressure_mb>1014.0</pressure_mb>
<pressure_in>29.95</pressure_in>
<dewpoint_string>51 F (11 C)</dewpoint_string>
<dewpoint_f>51</dewpoint_f>
<dewpoint_c>11</dewpoint_c>
<heat_index_string>NA</heat_index_string>
<heat_index_f>NA</heat_index_f>
<heat_index_c>NA</heat_index_c>
<windchill_string>NA</windchill_string>
<windchill_f>NA</windchill_f>
<windchill_c>NA</windchill_c>
<visibility_mi>10.00</visibility_mi>
<icon_url_base>http://weather.gov/weather/images/fcicons/</icon_url_base>
<icon_url_name>bkn.jpg</icon_url_name>
<two_day_history_url>http://www.weather.gov/data/obhistory/KRDU.html</two_day_history_url>
<ob_url>http://www.nws.noaa.gov/data/METAR/KRDU.1.txt</ob_url>
<disclaimer_url>http://weather.gov/disclaimer.html</disclaimer_url>
<copyright_url>http://weather.gov/disclaimer.html</copyright_url>
<privacy_policy_url>http://weather.gov/notice.html</privacy_policy_url>
</current_observation>

The message consumer is an application that allows me to subscribe to this weather feed, specifying conditions for my alerts based on the data available in such a message. On my little sailboats, I especially like to sail when the local weather is at least 60 degrees Fahrenheit, the wind is between 7 and 15 miles per hour, and the temperature is at least 10 degrees above the dewpoint (so it is unlikely to rain). The application would have a GUI to allow me to specify these kinds of conditions, and would generate an XQuery for the XML binding. Here is a query that expresses these conditions:

let $obs := ./current_observation
return $obs/station_id = 'KRDU'
   and $obs/wind_mph >= 7
   and $obs/wind_mph <= 20
   and $obs/temp_f > 60
   and $obs/temp_f - $obs/dewpoint_f >= 10

Of course, this is very individual. My friend Jim has a 40 foot sailboat in Clearwater, and his sailboat just starts to get moving at speeds that would be very challenging in my little sailboat. So he might specify very different values for wind speeds. And I have other friends whose weather-related interests have nothing to do with sailing - perhaps someone would simply like an alert when it looks like it will rain. Each of us can declare an exclusive queue with a binding that expresses which messages we are interested in, as illustrated in the following code:

# Use a UUID as the name of the queue
# to guarantee a unique queue name
uuid = uuid()
session.queue_declare(queue=uuid, exclusive=True)

binding = {}
binding["xquery"] = """
  let $obs := ./current_observation
    return $obs/station_id = 'KRDU'
      and $obs/wind_mph >= 7
      and $obs/wind_mph <= 20
      and $obs/temp_f > 60
      and $obs/temp_f - $obs/dewpoint_f >= 10"""

session.exchange_bind(exchange="xml", queue="message_queue", 
routing_key="routing_key", args=binding)

Querying Message Properties

When the data used by clients is flat, and clients are interested in the same properties, a message producer can bind these properties to a message so they can be accessed without parsing the message content, which may be faster, since the message content need not be parsed. When used in this way, the XML Exchange operates much like Java JMS selectors.

In the XQuery used for binding, message properties are bound as external variables of type string. The query must declare these external variables before using them. The XML Exchange binds the message properties to the corresponding variables before the query is invoked. Here is the query from the previous example expressed using message properties instead of XML message content:

declare variable $station_id as xs:string external;
declare variable $wind_mph as xs:string external;
declare variable $temp_mph as xs:string external;
$station_id = 'KRDU'
and xs:decimal($wind_mph) >= 7
and xs:decimal($wind_mph) <= 20
and xs:decimal($temp_f) > 60
and xs:decimal($temp_f) - xs:decimal($dewpoint_f) >= 10

Properties and XML content can both be used in the same query. In the following query, the station identifier is read from the message content, but the wind speed and temperature are read from message properties:

declare variable $wind_mph as xs:string external;
declare variable $temp_mph as xs:string external;
./current_observation/station_id = 'KRDU'
and xs:decimal($wind_mph) >= 7
and xs:decimal($wind_mph) <= 20
and xs:decimal($temp_f) > 60
and xs:decimal($temp_f) - xs:decimal($dewpoint_f) >= 10

XML Messaging in the REST/SOAP/ESB Landscape

XML applications already have a bewildering number of choices for exchanging data. Many applications already use REST Fielding, SOAP Soap1 Soap2, ebXML ebXML, Enterprise Service Buses ESBWiki, Java JMS Messaging JMS, or any one of the dozens of other available choices now available. Each of these sytems has advantages and disadvantages, and a thorough comparison to these systems is beyond the scope of this paper. This section attempts to point out some of the fundamental design characteristics of AMQP and the XML Exchange to make it easier for readers to make their own comparisons.

Like REST, AMQP is simple[5], open, platform independent, and programming language independent. However, REST is designed only for client/server request/response interaction, and because it relies on HTTP, it is a synchronous protocol. AMQP supports request/response together with other common patterns such as point-to-point, broadcast, and publish-subscribe, which allows it to define communication among messaging clients, not just communication to a server. AMQP is an asynchronous protocol, which is much more suitable for high-performance systems, though synchronous patterns are also supported in the Qpid APis.

Java JMS provides broad support for these kinds of messaging patterns, but it is a Java API, with no support for other languages, and it does not define an interoperable protocol, allowing each implementation choose how to exchange data. As a result, Java JMS does not provide interoperability across languages or across messaging systems.

SOAP and Web Services support many of the same messaging patterns as AMQP, are language and platform independent, and can also be used to achieve interoperable messaging if all parties agree on the communication protocol and on a given WS-I profile. AMQP is smaller in scope and more narrowly focused on messaging per se; it is designed to be as simple as possible while still providing robust and complete support for messaging. An AMQP server would probably be an excellent basis for implementing a SOAP server. Unlike SOAP, AMQP message headers and basic routing information are not contained in XML. This allows more efficient processing, since XML parsing is slow compared to the speed with which routing decisions are made in high-performance messaging systems, and it also cleanly separates message content from envelope information.[6] Another difference is that XML is just one of the formats supported by AMQP, and messages can be in any desired format (but the XML Exchange was designed for XML messages). Another advantage of AMQP is that there is just one specification, and only one way to claim conformance, so interoperability does not require agreeing on specific protocols. This is simpler and more interoperable than the SOAP family of specifications WSActivity Oasis WS-I, which specify basic messaging functionality in a variety of unrelated and overlapping specifications, from different organizations, while other organizations specify profiles that can be used for interoperability.

Much of what was said in the previous paragraph also applies to ebXML. AMQP would be an excellent technology for implementing ebXML messaging, and can be used as a bridging technology between ebXML systems and other systems. But AMQP does not attempt to address anything beyond messaging.

Similarly, an Enterprise Service Bus generally refers to a more complex system build on top of an enterprise messaging system. AMQP is such an enterprise messaging system. But XML-based routing is a prominent feature in some ESB systems, and the XML Exchange provides this feature, inegrated into an AMQP server.

Summary

AMQP provides a coherent, simple architecture for high performance, reliable messaging that is interoperable across implementations and supports most common messaging paradigms. Apache Qpid is a multi-platform, multi-language implementation of AMQP. The Qpid XML Exchange provides XML-based routing in an AMQP server, using XQuery to express routing criteria based on message content and message properties. Together, they provide the simplicity of REST together with the broad support for enterprise messaging that SOAP aspires to achieve. They provide a solid foundation for mission critical XML messaging applications, and vastly simplify the task of writing XML messaging software.

Bibliography

[Bosak] XML, Java, and the future of the Web Jon Bosak, Sun Microsystems. Last revised 1997.03.10. http://www.ibiblio.org/pub/sun-info/standards/xml/why/xmlapps.htm

[Fielding] Architectural Styles and the Design of Network-based Software Architectures. Roy Thomas Fielding, 2000. DISSERTATION submitted in partial satisfaction of the requirements for the degree of DOCTOR OF PHILOSOPHY in Information and Computer Science. University of California, Irvine. http://www.ics.uci.edu/~fielding/pubs/dissertation/top.htm

[Vinoski] "Advanced Message Queuing Protocol", Steve Vinoski. IEEE Internet Computing, vol. 10, no. 6, 2006, pp. 87-89. http://steve.vinoski.net/pdf/IEEE-Advanced_Message_Queuing_Protocol.pdf

[AMQP] AMQP: A General-Purpose Middleware Standard. Copyright Cisco Systems, Credit Suisse, Deutsche Börse Systems, Envoy Technologies, Inc.,Goldman Sachs, IONA Technologies PLC, iMatix Corporation sprl., JPMorgan Chase Bank Inc. N.A, Novell, Rabbit Technologies Ltd., Red Hat, Inc., TWIST Process Innovations ltd, and 29West Inc. 2006. https://jira.amqp.org/confluence/download/attachments/720900/amqp.0-10.pdf?version=1

[Qpid] Apache Qpid Project. http://cwiki.apache.org/qpid/

[MRG] Red Hat Enterprise MRG. http://www.redhat.com/mrg/.

[MRGTutorial] Red Hat Enterprise MRG: MRG Messaging Tutorial. Available from http://www.redhat.com/mrg/resources/.

[XQueryREST] An XQuery Servlet for RESTful Data Services, Jonathan Robie. http://2006.xmlconference.org/proceedings/87/presentation.html

[Soap1] SOAP Version 1.2 Part 1: Messaging Framework (Second Edition). W3C Recommendation 27 April 2007. http://www.w3.org/TR/2007/REC-soap12-part1-20070427/

[Soap2] SOAP Version 1.2 Part 2: Adjuncts (Second Edition). W3C Recommendation 27 April 2007. http://www.w3.org/TR/2007/REC-soap12-part2-20070427/

[WSActivity] W3C Web Services Activity. http://www.w3.org/2002/ws/ Currently lists 15 W3C Recommendations for Web Services.

[Oasis] OASIS. A full list of committees can be found at http://www.oasis-open.org/committees/.

[WS-I] Web Services Interoperability Organization. http://wsi-org. Currently lists the following profiles for interoperability: Basic Profile, Simple Soap Binding Profile, Basic Security Profile, Reliable Secure Profile, Kerberos Token Profile, REL Token Profile 1.0, SAML Token Profile.

[NWS] National Weather Service, current observations in XML. http://www.nws.noaa.gov/data/current_obs/

[JMS] Java JMS. http://java.sun.com/products/jms/

[ebXML] ebXML: Enabling a Global Electronic Market. http://www.ebxml.org/

[ESBWiki] Enterprise service bus. From Wikipedia, the free encyclopedia. http://en.wikipedia.org/wiki/Enterprise_service_bus



[1] Cisco Systems, Credit Suisse, Deutsche Börse Systems, Envoy Technologies, Inc., Goldman Sachs, IONA Technologies PLC, iMatix Corporation sprl., JPMorgan Chase Bank Inc. N.A, Novell, Rabbit Technologies Ltd., Red Hat, Inc., TWIST Process Innovations ltd, and 29West Inc.

[2] At the time of writing, it is not possible to be more specific about the standards submission.

[3] For one kind of exchange, the fanout exchange, a routing key is optional

[4] The way the routing key is used depends on the exchange type, and is discussed later in this chapter

[5] Though not as simple as REST!

[6] Message properties can be read from messages in applications that want this information. An application could easily model them as XML.