Red Hat Shadowman
slanted W3C logo -->
Red Hat Shadowman

Apache Qpid XML Exchange

Jonathan Robie <jonathan.robie@redhat.com>




Hit the space bar for next slide

Overview

Bosak: "Java, XML, and the future of the Web"

Needed: Support for common messaging patterns

Email Analogy

If only messaging were as simple as email ...

Exchanging XML ... a sampler

AMQP Goals

The goal of AMQP: mission critical messaging widely available as a standard service.

AMQP Layers

The AMQP Model

The AMQP Model

  • Message Producers write Messages to Exchanges.

    • A Message Producer can set Message Properties or Delivery Properties in the Message Headers to influence behavior.

  • Bindings express the rules that determine which Messages are sent to each Queue.

    • A Binding is created whenever a Queue is bound to an Exchange.

    • The Message is sent to the Queue if it matches the binding criterion.

    • There are different kinds of Exchanges, and each kind accepts certain kinds of bindings (see the next few slides).

  • Queues store messages until they are read.

  • Message Consumers read Messages from Queues.

  • A Message Consumer can also be a Message Producer

    Example: Read a message, process, send a response.

AMQP Message Headers

Direct Exchange

Fanout Exchange

Topic Exchange

Topic exchanges support wildcard matching

  • words are delimited by periods (".")

  • * matches one word.

    • *.news matches usa.news or europe.news.

    • usa.* matches usa.sports or usa.news.

  • # matches one or more words.

    • #.news matches usa.news or europe.news or usa.questionable.news.

    • usa.# matches usa.sports or usa.news or usa.questionable.news.

AMQP Status

  • Currently deployed in mission-critical financial applications

  • Pre-release - agreement to submit AMQP 1.0 to a recognized standards body

  • Free to use, implement, or read

  • Currently maintained by a consortium of users and implementors

    • JPMorgan

    • Deutsche Boerse

    • Credit Suisse

    • Goldman Sachs

    • Cisco

    • Iona

    • Red Hat

    • Novell

    • CohesiveFT

Apache Qpid

  • Open source implementation of AMQP

  • Languages

    • Java JMS

    • C++

    • Python

  • Server Platforms

    • Platform Independent (Java)

    • Linux (C++ implementation)

The Apache Qpid XML Exchange

  • A Custom Exchange, not a native AMQP Exchange

  • Implemented in Apache Qpid and in Red Hat Messaging

  • Allows routing based on XML content.

    • Message Consumers can create Queues and Bindings to subscribe based on XML content.

    • ... or administrative programs can send content to the appropriate Queues

    • ... or the Message Producer may know something about who needs the content

  • Bindings in XQuery.

The XML Exchange: Implementation

  • Implemented as an optional module in Apache Qpid's C++ server.

    • Shipping as part of Red Hat's MRG Messaging.

    • Will soon be a Qpid server plugin.

    • Not (yet?) available in Qpid Java server

  • XQuery processing uses XQilla.

    • Open-source C++ based implementation of XQuery.

    • Queries can be precompiled.

    • Xerces is used for XML processing.

    • Support for streaming input and output, document projection.

    • Will also support Zorba when common C++ API is available.

XML Exchange: Publish-Subscribe Use Case

  • National Weather Service publishes weather reports using RSS

  • Message Producer monitors National Weather Service feed and publishes new reports to XML Exchange.

  • Message Consumers subscribe to weather events they are interested in.

  • Example report

    <current_observation version="1.0" 
    xsi:noNamespaceSchemaLocation="http://www.weather.gov/data/current_obscurrent_observation.xsd">
      <credit>NOAA's National Weather Service</credit>
      <credit_URL>http://weather.gov/</credit_URL>
      <image>
        <url>http://weather.gov/images/xml_logo.gif</url>
        <title>NOAA's National Weather Service</title>
        <link>http://weather.gov</link>
      </image>
      <suggested_pickup>15 minutes after the hour</suggested_pickup>
      <suggested_pickup_period>60</suggested_pickup_period>
      <location>Raleigh-Durham International Airport, NC</location>
      <station_id>KRDU</station_id>
      <latitude>35.88</latitude>
      <longitude>-78.78</longitude>
      <observation_time>Last Updated on Apr 21, 2:51 pm EDT</observation_time>
      <observation_time_rfc822>Mon, 21 Apr 2008 14:51:00 -0400 EDT</observation_time_rfc822>
      <weather>Mostly Cloudy</weather>
      <temperature_string>67 F (19 C)</temperature_string>
      <temp_f>67</temp_f>
      <temp_c>19</temp_c>
      <relative_humidity>57</relative_humidity>
      <wind_string>Variable at 7 MPH</wind_string>
      <wind_dir>Variable</wind_dir>
      <wind_degrees>999</wind_degrees>
      <wind_mph>6.9</wind_mph>
      <wind_gust_mph>NA</wind_gust_mph>
      <pressure_string>29.95" (1014.0 mb)</pressure_string>
      <pressure_mb>1014.0</pressure_mb>
      <pressure_in>29.95</pressure_in>
      <dewpoint_string>51 F (11 C)</dewpoint_string>
      <dewpoint_f>51</dewpoint_f>
      <dewpoint_c>11</dewpoint_c>
      <heat_index_string>NA</heat_index_string>
      <heat_index_f>NA</heat_index_f>
      <heat_index_c>NA</heat_index_c>
      <windchill_string>NA</windchill_string>
      <windchill_f>NA</windchill_f>
      <windchill_c>NA</windchill_c>
      <visibility_mi>10.00</visibility_mi>
      <icon_url_base>http://weather.gov/weather/images/fcicons/</icon_url_base>
      <icon_url_name>bkn.jpg</icon_url_name>
      <two_day_history_url>http://www.weather.gov/data/obhistory/KRDU.html</two_day_history_url>
      <ob_url>http://www.nws.noaa.gov/data/METAR/KRDU.1.txt</ob_url>
      <disclaimer_url>http://weather.gov/disclaimer.html</disclaimer_url>
      <copyright_url>http://weather.gov/disclaimer.html</copyright_url>
      <privacy_policy_url>http://weather.gov/notice.html</privacy_policy_url>
    </current_observation>
    

XML Exchange: Publish-Subscribe Demo

XML Exchange: Code Template for Examples

  • Establishing a connection

    import qpid
    import sys
    import os
    from qpid.util import connect
    from qpid.connection import Connection
    from qpid.datatypes import Message, RangedSet, uuid4
    from qpid.queue import Empty
    
    #----- Initialization -----------------------------------
    
    #  Set parameters for login
    
    host="127.0.0.1"
    port=5672
    user="guest"
    password="guest"
    
    #  Create a connection.
    socket = connect(host, port)
    connection = Connection (sock=socket)
    connection.start()
    session = connection.session(str(uuid4()))
    
  • Main body of program

    # Program code goes here 
  • Closing the session

    session.close()

XML Exchange: Use Case: Publisher

  • Declaring an XML Exchange

    session.exchange_declare(exchange="xml", type="xml")
  • Writing reports to the XML Exchange

    props = session.delivery_properties(routing_key="weather")
    
    for i in range(10):
      session.message_transfer(destination="xml", message=Message(props, next_report()))

XML Exchange: Use Case: Subscriber

  • Each subscriber declares its own Queue and Bindings - e.g. "Good Sailing Weather in Durham"

    session.queue_declare(queue="message_queue")
    
    binding = {}
    binding["xquery"] = """
       let $w := ./weather
       return $w/station = 'Raleigh-Durham International Airport (KRDU)'
          and $w/temperature_f > 50
          and $w/temperature_f - $w/dewpoint > 5
          and $w/wind_speed_mph > 7
          and $w/wind_speed_mph < 20 """
                          
    session.exchange_bind(exchange="xml", queue="message_queue", binding_key="weather", arguments=binding)
  • Each subscriber creates a local queue, subscribes it to the message queue, and reads messages from the local queue as messages arrive.

    local_queue_name = "local_queue"
    local_queue = session.incoming(local_queue_name)
    
    session.message_subscribe(queue="message_queue", destination=local_queue_name)
    local_queue.start()
    
    message = None
    while True:
       try:
    	message = local_queue.get(timeout=10)
    	session.message_accept(RangedSet(message.id))
    	print message.body
       except Empty:
    	print "No more messages!"
    	break
    

XML Messaging: Common patterns

  • Point-to-point: Direct Exchange

  • Request-response: Direct Exchange, Repy-To header

  • Broadcast: Fanout Exchange

  • Publish-subscribe

    • Topic Exchange (using routing key)

    • XML Exchange (content-based routing, application headers)

  • Transactions (local and distributed): Any Exchange

  • Shared queues, private queues: Any queue

XML Exchange: Status

  • Implemented in Apache Qpid

  • Shipping with Red Hat's MRG Messaging

  • An AMQP custom exchange - not built in, must be declared

  • Limited production experience

Questions and Discussion