Jonathan Robie
<jonathan.robie@redhat.com>
Hit the space bar for next slide
Introduction: Exchanging XML
AMQP (Advanced Message Queueing Protocol)
General purpose, mission-critical messaging for applications
Goal: widely available as a standard service
Apache Qpid
Open source implementation of Apache Qpid
C++, Python, Java JMS ...
Apache Qpid XML Exchange
XML-based content routing
Routing based on application headers
Publish-Subscribe Use Case for XML Exchange
Motivational paper for XML, written in early 1997
Data interchange among programs was one major reason for XML
How to exchange data is not discussed in the paper.
Request-response
Point-to-point
Broadcast (fanout)
Publish-subscribe
Transactions (local and distributed)
Shared queues, private queues
If only messaging were as simple as email ...
Simple primitives
Well defined interactions among parties
Asynchronous
Ubiquitous
Separation of concerns
What we don't like about email ...
Too slow
Spam
Missing emails
Availability
Security
REST
SOAP, ebXML
ESB
Message Queues
Java JMS
Message Queues + Java JMS widely used for mission critical messaging applications
The goal of AMQP: mission critical messaging widely available as a standard service.
Cost-effective common infrastructure
Across platforms
Across languages
Across vendors (any client can talk to any server)
Source-code compatibility with Java JMS applications
Open source implementations (Apache Qpid project)
Fast
Performance adequate for demanding financial applications
Millions of transactions per day
Hundreds of thousands of transactions per second
Caveat: XML Exchange is significantly slower than other exchanges
Trustworthy
Reliable (guaranteed delivery)
Available
Scalable (number of clients, number of messages, number of bytes)
Secure
Long term stability
Support for any payload
XML
Text
Structured binary
Streaming media ...
Support for common messaging patterns
Publish-subscribe
Request-response
Point-to-point
Broadcast (fanout)
Transactions (local and distributed)
Shared queues, private queues
Message Producers write Messages to Exchanges.
A Message Producer can set Message Properties or Delivery Properties in the Message Headers to influence behavior.
Bindings express the rules that determine which Messages are sent to each Queue.
A Binding is created whenever a Queue is bound to an Exchange.
The Message is sent to the Queue if it matches the binding criterion.
There are different kinds of Exchanges, and each kind accepts certain kinds of bindings (see the next few slides).
Queues store messages until they are read.
Message Consumers read Messages from Queues.
A Message Consumer can also be a Message Producer
Example: Read a message, process, send a response.
Topic exchanges support wildcard matching
words are delimited by periods (".")
* matches one word.
*.news matches usa.news or europe.news.
usa.* matches usa.sports or usa.news.
# matches one or more words.
#.news matches usa.news or europe.news or usa.questionable.news.
usa.# matches usa.sports or usa.news or usa.questionable.news.
Currently deployed in mission-critical financial applications
Pre-release - agreement to submit AMQP 1.0 to a recognized standards body
Free to use, implement, or read
Currently maintained by a consortium of users and implementors
JPMorgan
Deutsche Boerse
Credit Suisse
Goldman Sachs
Cisco
Iona
Red Hat
Novell
CohesiveFT
Open source implementation of AMQP
Languages
Java JMS
C++
Python
Server Platforms
Platform Independent (Java)
Linux (C++ implementation)
A Custom Exchange, not a native AMQP Exchange
Implemented in Apache Qpid and in Red Hat Messaging
Allows routing based on XML content.
Message Consumers can create Queues and Bindings to subscribe based on XML content.
... or administrative programs can send content to the appropriate Queues
... or the Message Producer may know something about who needs the content
Bindings in XQuery.
Implemented as an optional module in Apache Qpid's C++ server.
Shipping as part of Red Hat's MRG Messaging.
Will soon be a Qpid server plugin.
Not (yet?) available in Qpid Java server
XQuery processing uses XQilla.
Open-source C++ based implementation of XQuery.
Queries can be precompiled.
Xerces is used for XML processing.
Support for streaming input and output, document projection.
Will also support Zorba when common C++ API is available.
National Weather Service publishes weather reports using RSS
Message Producer monitors National Weather Service feed and publishes new reports to XML Exchange.
Message Consumers subscribe to weather events they are interested in.
Example report
<current_observation version="1.0" xsi:noNamespaceSchemaLocation="http://www.weather.gov/data/current_obscurrent_observation.xsd"> <credit>NOAA's National Weather Service</credit> <credit_URL>http://weather.gov/</credit_URL> <image> <url>http://weather.gov/images/xml_logo.gif</url> <title>NOAA's National Weather Service</title> <link>http://weather.gov</link> </image> <suggested_pickup>15 minutes after the hour</suggested_pickup> <suggested_pickup_period>60</suggested_pickup_period> <location>Raleigh-Durham International Airport, NC</location> <station_id>KRDU</station_id> <latitude>35.88</latitude> <longitude>-78.78</longitude> <observation_time>Last Updated on Apr 21, 2:51 pm EDT</observation_time> <observation_time_rfc822>Mon, 21 Apr 2008 14:51:00 -0400 EDT</observation_time_rfc822> <weather>Mostly Cloudy</weather> <temperature_string>67 F (19 C)</temperature_string> <temp_f>67</temp_f> <temp_c>19</temp_c> <relative_humidity>57</relative_humidity> <wind_string>Variable at 7 MPH</wind_string> <wind_dir>Variable</wind_dir> <wind_degrees>999</wind_degrees> <wind_mph>6.9</wind_mph> <wind_gust_mph>NA</wind_gust_mph> <pressure_string>29.95" (1014.0 mb)</pressure_string> <pressure_mb>1014.0</pressure_mb> <pressure_in>29.95</pressure_in> <dewpoint_string>51 F (11 C)</dewpoint_string> <dewpoint_f>51</dewpoint_f> <dewpoint_c>11</dewpoint_c> <heat_index_string>NA</heat_index_string> <heat_index_f>NA</heat_index_f> <heat_index_c>NA</heat_index_c> <windchill_string>NA</windchill_string> <windchill_f>NA</windchill_f> <windchill_c>NA</windchill_c> <visibility_mi>10.00</visibility_mi> <icon_url_base>http://weather.gov/weather/images/fcicons/</icon_url_base> <icon_url_name>bkn.jpg</icon_url_name> <two_day_history_url>http://www.weather.gov/data/obhistory/KRDU.html</two_day_history_url> <ob_url>http://www.nws.noaa.gov/data/METAR/KRDU.1.txt</ob_url> <disclaimer_url>http://weather.gov/disclaimer.html</disclaimer_url> <copyright_url>http://weather.gov/disclaimer.html</copyright_url> <privacy_policy_url>http://weather.gov/notice.html</privacy_policy_url> </current_observation>
Establishing a connection
import qpid import sys import os from qpid.util import connect from qpid.connection import Connection from qpid.datatypes import Message, RangedSet, uuid4 from qpid.queue import Empty #----- Initialization ----------------------------------- # Set parameters for login host="127.0.0.1" port=5672 user="guest" password="guest" # Create a connection. socket = connect(host, port) connection = Connection (sock=socket) connection.start() session = connection.session(str(uuid4()))
Main body of program
# Program code goes here
Closing the session
session.close()
Declaring an XML Exchange
session.exchange_declare(exchange="xml", type="xml")
Writing reports to the XML Exchange
props = session.delivery_properties(routing_key="weather") for i in range(10): session.message_transfer(destination="xml", message=Message(props, next_report()))
Each subscriber declares its own Queue and Bindings - e.g. "Good Sailing Weather in Durham"
session.queue_declare(queue="message_queue") binding = {} binding["xquery"] = """ let $w := ./weather return $w/station = 'Raleigh-Durham International Airport (KRDU)' and $w/temperature_f > 50 and $w/temperature_f - $w/dewpoint > 5 and $w/wind_speed_mph > 7 and $w/wind_speed_mph < 20 """ session.exchange_bind(exchange="xml", queue="message_queue", binding_key="weather", arguments=binding)
Each subscriber creates a local queue, subscribes it to the message queue, and reads messages from the local queue as messages arrive.
local_queue_name = "local_queue" local_queue = session.incoming(local_queue_name) session.message_subscribe(queue="message_queue", destination=local_queue_name) local_queue.start() message = None while True: try: message = local_queue.get(timeout=10) session.message_accept(RangedSet(message.id)) print message.body except Empty: print "No more messages!" break
Point-to-point: Direct Exchange
Request-response: Direct Exchange, Repy-To header
Broadcast: Fanout Exchange
Publish-subscribe
Topic Exchange (using routing key)
XML Exchange (content-based routing, application headers)
Transactions (local and distributed): Any Exchange
Shared queues, private queues: Any queue
Implemented in Apache Qpid
Shipping with Red Hat's MRG Messaging
An AMQP custom exchange - not built in, must be declared
Limited production experience