Skip to main content

Pub/Sub with WSO2 MB and WSO2 ESB using Durable and Hierarchical Topics


WSO2 MB is a standers complaint  message broker which supports JMS and AMQP standards and it will allow interoperability between many languages. It supports two of the main standers patterns of communication.

  1. Point-to-Point messaging through queues where one application sends messages directly to another application.
  2. Publish/Subscribe pattern through topics where one application publishes messages to a topic and other applications who are subscribed to this topic will receive these messages.
This post will explain how to use WSO2 ESB as a publisher and subscriber for WSO2 MB which will act as the middle hub for message exchange. Also it will explain the hierarchical topic capabilities and durable topic capabilities of WSO2 MB.

As an example lets take a news publisher service which publishes various types of news. And there are subscribers who are interested on various types of news.


  1. Download and unzip WSO2 ESB 4.8.1 and WSO2 MB 2.1.0
  2. Locate wso2mb-2.1.0/bin folder and start wso2mb-2.1.0 using ./wso2server.sh (or wso2server.bat) command.
  3. Configure WSO2 ESB
    • Open carbon.xml inside "wso2esb-4.8.1/repository/conf" folder and update port Offset
      •    1: <Offset>1</Offset> 

    • Open axis2.xml inside wso2esb-4.8.1/repository/conf/axis2" and uncomment JMSListener for WSO2 MB.
      •    1: <!--Uncomment this and configure as appropriate for JMS transport support with WSO2 MB 2.x.x -->

           2:  

           3: ransportReceiver name="jms" class="org.apache.axis2.transport.jms.JMSListener">

           4:       <parameter name="myTopicConnectionFactory" locked="false">

           5:          <parameter name="java.naming.factory.initial" locked="false">org.wso2.andes.jndi.PropertiesFileInitialContextFactory</parameter>

           6:           <parameter name="java.naming.provider.url" locked="false">repository/conf/jndi.properties</parameter>

           7:           <parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false">TopicConnectionFactory</parameter>

           8:           <parameter name="transport.jms.ConnectionFactoryType" locked="false">topic</parameter>

           9:       </parameter>

          10:  

          11:  

          12:       <parameter name="myQueueConnectionFactory" locked="false">

          13:           <parameter name="java.naming.factory.initial" locked="false">org.wso2.andes.jndi.PropertiesFileInitialContextFactory</parameter>

          14:           <parameter name="java.naming.provider.url" locked="false">repository/conf/jndi.properties</parameter>

          15:           <parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false">QueueConnectionFactory</parameter>

          16:          <parameter name="transport.jms.ConnectionFactoryType" locked="false">queue</parameter>

          17:       </parameter>

          18:  

          19:  

          20:       <parameter name="default" locked="false">

          21:           <parameter name="java.naming.factory.initial" locked="false">org.wso2.andes.jndi.PropertiesFileInitialContextFactory</parameter>

          22:           <parameter name="java.naming.provider.url" locked="false">repository/conf/jndi.properties</parameter>

          23:           <parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false">QueueConnectionFactory</parameter>

          24:           <parameter name="transport.jms.ConnectionFactoryType" locked="false">queue</parameter>

          25:       </parameter>

          26:   </transportReceiver>

    • Uncomment JMS sender inside inside wso2esb-4.8.1/repository/conf/axis2/axis2.xml"
      •    1: <!-- uncomment this and configure to use connection pools for sending messages-->

           2:   <transportSender name="jms" class="org.apache.axis2.transport.jms.JMSSender"/>

    • Open "wso2esb-4.8.1/repository/conf/jndi.properties" file and add following configuration. 
      •    1: # register some connection factories
           2: # connectionfactory.[jndiname] = [ConnectionURL]

           3: connectionfactory.QueueConnectionFactory = amqp://admin:admin@clientID/carbon?brokerlist='tcp://localhost:5672'

           4: connectionfactory.TopicConnectionFactory = amqp://admin:admin@clientID/carbon?brokerlist='tcp://localhost:5672'
           5:  
           6:  
           7: # register some queues in JNDI using the form
           8: # queue.[jndiName] = [physicalName]

           9:  

          10:  

          11: # register some topics in JNDI using the form

          12: # topic.[jndiName] = [physicalName]
    • Copy following two jar file from "wso2mb-2.1.0/client-lib" to "wso2esb-4.8.1/repository/components/lib" folder

      • geronimo-jms_1.1_spec-1.1.0.wso2v1.jar
      • andes-client-0.13.wso2v8.jar
    • Locate wso2esb-4.8.1/bin folder and start wso2esb-4.8.1 using ./wso2server.sh (or wso2server.bat) command.

  4. Create publisher and subscribers from ESB 

    • Sign-in to ESB admin console and create a proxy service called "SportsNewsPublisherProxy" with following configuration ( This is the publisher which will publish to a topic called "events.news.sports").
           1: <?xml version="1.0" encoding="UTF-8"?>


           2: <proxy xmlns="http://ws.apache.org/ns/synapse"

           3:        name="SportsNewsPublisherProxy"

           4:        transports="https,http"

           5:        statistics="disable"

           6:        trace="disable"

           7:        startOnLoad="true">

           8:    <target>

           9:       <inSequence>

          10:          <property name="OUT_ONLY" value="true"/>

          11:          <property name="FORCE_SC_ACCEPTED" value="true" scope="axis2"/>

          12:          <log level="custom">

          13:             <property name="Message"

          14:                       value="************** Start SportsNewsPublisherProxy **************"/>

          15:          </log>

          16:          <log level="full"/>

          17:          <send>

          18:             <endpoint>

          19:                <address uri="jms:/events.news.sports?transport.jms.ConnectionFactoryJNDIName=TopicConnectionFactory&amp;java.naming.factory.initial=org.wso2.andes.jndi.PropertiesFileInitialContextFactory&amp;java.naming.provider.url=repository/conf/jndi.properties&amp;transport.jms.DestinationType=topic"/>

          20:             </endpoint>

          21:          </send>

          22:       </inSequence>

          23:    </target>

          24:    <description/>

          25: </proxy>
    • Create another proxy service as the first subscriber with following configuration. (This subscriber will subscribe only for  Sports news with the topic definition of "events.news.sports") 
           1: <?xml version="1.0" encoding="UTF-8"?>


           2: <proxy xmlns="http://ws.apache.org/ns/synapse"

           3:        name="SportsNewsSubscriberProxy"

           4:        transports="jms"

           5:        statistics="disable"

           6:        trace="disable"

           7:        startOnLoad="true">

           8:    <target>

           9:       <inSequence>

          10:          <property name="OUT_ONLY" value="true"/>

          11:          <log level="custom">

          12:             <property name="SportsNewsSubscriberProxy"

          13:                       value="### I am Sports News subscriber (SportsNewsSubscriberProxy) ###"/>

          14:          </log>

          15:          <log level="full"/>

          16:       </inSequence>

          17:    </target>

          18:    <parameter name="transport.jms.ContentType">

          19:       <rules>

          20:          <jmsProperty>contentType</jmsProperty>

          21:          <default>application/xml</default>

          22:       </rules>

          23:    </parameter>

          24:    <parameter name="transport.jms.ConnectionFactory">myTopicConnectionFactory</parameter>

          25:    <parameter name="transport.jms.DestinationType">topic</parameter>

          26:    <parameter name="transport.jms.SubscriptionDurable">true</parameter>

          27:    <parameter name="transport.jms.Destination">events.news.sports</parameter>

          28:    <parameter name="transport.jms.DurableSubscriberClientID">SportsNewsSubscriber</parameter>

          29:    <description/>

          30: </proxy>
    • Create the second subscriber proxy with following configuration. (This proxy will subscribe to all types of news by "events.news.*" topic definition. So this proxy is capable of receiving events from any "immediate children" from "events.news" topic hierarchy)
           1: <?xml version="1.0" encoding="UTF-8"?>


           2: <proxy xmlns="http://ws.apache.org/ns/synapse"

           3:        name="NewsSubscriberProxy"

           4:        transports="jms"

           5:        statistics="disable"

           6:        trace="disable"

           7:        startOnLoad="true">

           8:    <target>

           9:       <inSequence>

          10:          <property name="OUT_ONLY" value="true"/>

          11:          <log level="custom">

          12:             <property name="NewsSubscriberProxy"

          13:                       value="### I am News subscriber (NewsSubscriberProxy) ###"/>

          14:          </log>

          15:          <log level="full"/>

          16:       </inSequence>

          17:    </target>

          18:    <parameter name="transport.jms.ContentType">

          19:       <rules>

          20:          <jmsProperty>contentType</jmsProperty>

          21:          <default>application/xml</default>

          22:       </rules>

          23:    </parameter>

          24:    <parameter name="transport.jms.ConnectionFactory">myTopicConnectionFactory</parameter>

          25:    <parameter name="transport.jms.DestinationType">topic</parameter>

          26:    <parameter name="transport.jms.SubscriptionDurable">true</parameter>

          27:    <parameter name="transport.jms.Destination">events.news.*</parameter>

          28:    <parameter name="transport.jms.DurableSubscriberClientID">NewsSubscriber</parameter>

          29:    <description/>

          30: </proxy>
    • Create the third subscriber proxy with following configuration. ( This proxy will subscribe to all types of events by "events.#" topic definition. So this proxy is capable of receiving events from "event" topic and all "children topics" which is under  "events" topic hierarchy)
           1: <?xml version="1.0" encoding="UTF-8"?>


           2: <proxy xmlns="http://ws.apache.org/ns/synapse"

           3:        name="EventSubscriberProxy"

           4:        transports="jms"

           5:        statistics="disable"

           6:        trace="disable"

           7:        startOnLoad="true">

           8:    <target>

           9:       <inSequence>

          10:          <property name="OUT_ONLY" value="true"/>

          11:          <log level="custom">

          12:             <property name="EventSubscriberProxy"

          13:                       value="### I am Event subscriber (EventSubscriberProxy) ###"/>

          14:          </log>

          15:          <log level="full"/>

          16:       </inSequence>

          17:    </target>

          18:    <parameter name="transport.jms.ContentType">

          19:       <rules>

          20:          <jmsProperty>contentType</jmsProperty>

          21:          <default>application/xml</default>

          22:       </rules>

          23:    </parameter>

          24:    <parameter name="transport.jms.ConnectionFactory">myTopicConnectionFactory</parameter>

          25:    <parameter name="transport.jms.DestinationType">topic</parameter>

          26:    <parameter name="transport.jms.SubscriptionDurable">true</parameter>

          27:    <parameter name="transport.jms.Destination">events.#</parameter>

          28:    <parameter name="transport.jms.DurableSubscriberClientID">EventSubscriber</parameter>

          29:    <description/>

          30: </proxy>
  5. Tryout pub/sub model 

    • Sign-in to ESB management console and go to services list (Home > Manage > Services > List)
    • Then select "Try this service" option on "SportsNewsPublisherProxy"
    • Send following request on "tryit" window (instead of tryit you can use SoapUI kind of tool to simulate the same behavior).
           1: <event>

           2:    <news>

           3:       <sports>

           4:          <title>2014 FIFA World Cup</title>

           5:          <description>2014 FIFA World Cup will be held in Brazil from 12th June to 13th July.</description>

           6:       </sports>

           7:    </news>

           8: </event>
    • Once you send the request to the publisher you will see following log messages in ESB carbon log file.
           1: [2014-03-16 01:19:49,592] INFO - LogMediator Message = ************** Start SportsNewsPublisherProxy **************

           2: [2014-03-16 01:19:49,837]  INFO - LogMediator SportsNewsSubscriberProxy = ### I am Sports News subscriber (SportsNewsSubscriberProxy) ###

           3: [2014-03-16 01:19:49,847]  INFO - LogMediator NewsSubscriberProxy = ### I am News subscriber (NewsSubscriberProxy) ###

           4: [2014-03-16 01:19:49,839]  INFO - LogMediator EventSubscriberProxy = ### I am Event subscriber (EventSubscriberProxy) ###
    • From above log messages it will show that all the three subscribers receives the message because all of them subscribed to the topic hierarchy "events.news.sports"
      • First Subscriber subscribed to "events.news.sports" topic 
      • Second Subscriber subscribed to "events.news.*"
      • Third Subscriber subscribed to "events.#" topic
    • Once you are done with above steps you can test the durable topic capabilities of WSO2 MB by deactivating one of the Subscribers. (Note: in this post all the subscribers defined are marked as durable subscribers by using "transport.jms.DurableSubscriberClientID" parameter in proxy definitions)

      • Sign-in to ESB management console and go to services list (Home > Manage > Services > List)
      • Then click on "SportsNewsSubscriberProxy" and click on "Deactivate" option.
      • Then send a request to "SportsNewsPublisherProxy" as explained earlier. 
      • Now you will not see the following log in the carbon console. 
             1: INFO - LogMediator SportsNewsSubscriberProxy = ### I am Sports News subscriber (SportsNewsSubscriberProxy) ###
      • Now click on "Activate" option back to activate the "SportsNewsSubscriberProxy" again.
      • Once it becomes to active state, you will see that it consumes the message which we sent in earlier step. 

Comments

  1. Thank you so much for this sample. I used it and did the following modification to work with ESB 4.9.0 & MB 3.0.0.

    End point URIs to all the Subscribing Proxy Services xml sources. My ESB version 4.9.0 wont let me save without specifying a URI

    Ex: (xml tags removed for posting)
    send
    endpoint
    address uri="http://localhost:9443/services/SportsNewsSubscriberProxy"
    endpoint
    send

    ReplyDelete

Post a Comment

Popular posts from this blog

How to install IBM WebSphere MQ on Ubuntu (Linux)

Following are the steps to install IBM WebSphere MQ version 8 on Ubuntu 14.04. 1) Create a user account with name "mqm" in Ubuntu. This should basically create a user called "mqm" and usergroup called "mqm" 2) Login to "mqm" user account and proceed with next steps 3) Increase the open file limit for the user "mqm" to "10240" or higher value. For this open "/etc/security/limits.conf" file and set the values as bellow. mqm       hard  nofile     10240 mqm       soft   nofile     10240 4) Increase the number of processes allowed for the user "mqm" to "4096" or higher value. For this open "/etc/security/limits.conf" file and set the values as bellow. You will need to edit this file as a sudo user. mqm       hard  nproc      4096 mqm       soft   nproc      4096 5) Install "RPM" on Ubuntu if you already don't have it. sudo apt-get install rpm ...

How to use Dynamic Registry Keys with WSO2 ESB Mediators

From this post I will going to briefly introduce about one of the new features provided by WSO2 ESB . Earlier WSO2 ESB supported static registry keys where users can select a key for the mediator as a static value. But from ESB 4.0.0 users can use dynamic registry keys where users can define XPath expression to evaluate the registry key in run time. For an example let's consider XSLT mediator. With earlier static registry key based method user have to define only single XSLT file for transformation. With the use of dynamic registry key, user will be able to use XPath expressions to dynamically generate the registry key, based on the message context. So with that user can have multiple XSLT files and based on the evaluated key mediator will be able to select the required XSLT file in run time. Following is a sample usage of static and dynamic registry keys and user can use both of them according to situation.   Static Registry Key – define the exact path to find the xslt fil...

Creating a Simple Axis Service(.aar file) and Deploy it in WSO2 Application Server

In this post I am explaining how to Create a Simple Axis Service(.aar file) and Deploy it in WSO2 Application Server using a simple sample. And also at the end I am describing how to do the same thing with creation of a Jar Service. Lets assume "sample-home" as our parent directory and inside that we can create following folder structure. With this folder structure we can include our external libraries (jar files) inside lib folder and  the "services.xml" file inside "META-INF" folder. Following is the sample services.xml definition which I used with this sample creation. < service name = "HelloService" > < Description > This is a sample service to explain simple aar service </ Description > < messageReceivers > < messageReceiver mep = "http://www.w3.org/2006/01/wsdl/in-out" class = "org.apache.axis2.rpc.receivers.RPCMessageReceiver" /> </ messageReceivers > < parame...