WSO2 MB is a standers complaint message broker which supports JMS and AMQP standards and it will allow interoperability between many languages. It supports two of the main standers patterns of communication.
- Point-to-Point messaging through queues where one application sends messages directly to another application.
- Publish/Subscribe pattern through topics where one application publishes messages to a topic and other applications who are subscribed to this topic will receive these messages.
This post will explain how to use WSO2 ESB as a publisher and subscriber for WSO2 MB which will act as the middle hub for message exchange. Also it will explain the hierarchical topic capabilities and durable topic capabilities of WSO2 MB.
As an example lets take a news publisher service which publishes various types of news. And there are subscribers who are interested on various types of news.
As an example lets take a news publisher service which publishes various types of news. And there are subscribers who are interested on various types of news.
- Download and unzip WSO2 ESB 4.8.1 and WSO2 MB 2.1.0
- Locate wso2mb-2.1.0/bin folder and start wso2mb-2.1.0 using ./wso2server.sh (or wso2server.bat) command.
- Configure WSO2 ESB
- Open carbon.xml inside "wso2esb-4.8.1/repository/conf" folder and update port Offset
- Open axis2.xml inside wso2esb-4.8.1/repository/conf/axis2" and uncomment JMSListener for WSO2 MB.
- Uncomment JMS sender inside inside wso2esb-4.8.1/repository/conf/axis2/axis2.xml"
- Open "wso2esb-4.8.1/repository/conf/jndi.properties" file and add following configuration.
- 1: # register some connection factories
2: # connectionfactory.[jndiname] = [ConnectionURL]
3: connectionfactory.QueueConnectionFactory = amqp://admin:admin@clientID/carbon?brokerlist='tcp://localhost:5672'
4: connectionfactory.TopicConnectionFactory = amqp://admin:admin@clientID/carbon?brokerlist='tcp://localhost:5672'
5:
6:
7: # register some queues in JNDI using the form
8: # queue.[jndiName] = [physicalName]
9:
10:
11: # register some topics in JNDI using the form
12: # topic.[jndiName] = [physicalName]
- Copy following two jar file from "wso2mb-2.1.0/client-lib" to "wso2esb-4.8.1/repository/components/lib" folder
- geronimo-jms_1.1_spec-1.1.0.wso2v1.jar
- andes-client-0.13.wso2v8.jar
- Locate wso2esb-4.8.1/bin folder and start wso2esb-4.8.1 using ./wso2server.sh (or wso2server.bat) command.
- Create publisher and subscribers from ESB
- Sign-in to ESB admin console and create a proxy service called "SportsNewsPublisherProxy" with following configuration ( This is the publisher which will publish to a topic called "events.news.sports").1: <?xml version="1.0" encoding="UTF-8"?>
2: <proxy xmlns="http://ws.apache.org/ns/synapse"
3: name="SportsNewsPublisherProxy"
4: transports="https,http"
5: statistics="disable"
6: trace="disable"
7: startOnLoad="true">
8: <target>
9: <inSequence>
10: <property name="OUT_ONLY" value="true"/>
11: <property name="FORCE_SC_ACCEPTED" value="true" scope="axis2"/>
12: <log level="custom">
13: <property name="Message"
14: value="************** Start SportsNewsPublisherProxy **************"/>
15: </log>
16: <log level="full"/>
17: <send>
18: <endpoint>
19: <address uri="jms:/events.news.sports?transport.jms.ConnectionFactoryJNDIName=TopicConnectionFactory&java.naming.factory.initial=org.wso2.andes.jndi.PropertiesFileInitialContextFactory&java.naming.provider.url=repository/conf/jndi.properties&transport.jms.DestinationType=topic"/>
20: </endpoint>
21: </send>
22: </inSequence>
23: </target>
24: <description/>
25: </proxy>
- Create another proxy service as the first subscriber with following configuration. (This subscriber will subscribe only for Sports news with the topic definition of "events.news.sports") 1: <?xml version="1.0" encoding="UTF-8"?>
2: <proxy xmlns="http://ws.apache.org/ns/synapse"
3: name="SportsNewsSubscriberProxy"
4: transports="jms"
5: statistics="disable"
6: trace="disable"
7: startOnLoad="true">
8: <target>
9: <inSequence>
10: <property name="OUT_ONLY" value="true"/>
11: <log level="custom">
12: <property name="SportsNewsSubscriberProxy"
13: value="### I am Sports News subscriber (SportsNewsSubscriberProxy) ###"/>
14: </log>
15: <log level="full"/>
16: </inSequence>
17: </target>
18: <parameter name="transport.jms.ContentType">
19: <rules>
20: <jmsProperty>contentType</jmsProperty>
21: <default>application/xml</default>
22: </rules>
23: </parameter>
24: <parameter name="transport.jms.ConnectionFactory">myTopicConnectionFactory</parameter>
25: <parameter name="transport.jms.DestinationType">topic</parameter>
26: <parameter name="transport.jms.SubscriptionDurable">true</parameter>
27: <parameter name="transport.jms.Destination">events.news.sports</parameter>
28: <parameter name="transport.jms.DurableSubscriberClientID">SportsNewsSubscriber</parameter>
29: <description/>
30: </proxy>
- Create the second subscriber proxy with following configuration. (This proxy will subscribe to all types of news by "events.news.*" topic definition. So this proxy is capable of receiving events from any "immediate children" from "events.news" topic hierarchy)1: <?xml version="1.0" encoding="UTF-8"?>
2: <proxy xmlns="http://ws.apache.org/ns/synapse"
3: name="NewsSubscriberProxy"
4: transports="jms"
5: statistics="disable"
6: trace="disable"
7: startOnLoad="true">
8: <target>
9: <inSequence>
10: <property name="OUT_ONLY" value="true"/>
11: <log level="custom">
12: <property name="NewsSubscriberProxy"
13: value="### I am News subscriber (NewsSubscriberProxy) ###"/>
14: </log>
15: <log level="full"/>
16: </inSequence>
17: </target>
18: <parameter name="transport.jms.ContentType">
19: <rules>
20: <jmsProperty>contentType</jmsProperty>
21: <default>application/xml</default>
22: </rules>
23: </parameter>
24: <parameter name="transport.jms.ConnectionFactory">myTopicConnectionFactory</parameter>
25: <parameter name="transport.jms.DestinationType">topic</parameter>
26: <parameter name="transport.jms.SubscriptionDurable">true</parameter>
27: <parameter name="transport.jms.Destination">events.news.*</parameter>
28: <parameter name="transport.jms.DurableSubscriberClientID">NewsSubscriber</parameter>
29: <description/>
30: </proxy>
- Create the third subscriber proxy with following configuration. ( This proxy will subscribe to all types of events by "events.#" topic definition. So this proxy is capable of receiving events from "event" topic and all "children topics" which is under "events" topic hierarchy)1: <?xml version="1.0" encoding="UTF-8"?>
2: <proxy xmlns="http://ws.apache.org/ns/synapse"
3: name="EventSubscriberProxy"
4: transports="jms"
5: statistics="disable"
6: trace="disable"
7: startOnLoad="true">
8: <target>
9: <inSequence>
10: <property name="OUT_ONLY" value="true"/>
11: <log level="custom">
12: <property name="EventSubscriberProxy"
13: value="### I am Event subscriber (EventSubscriberProxy) ###"/>
14: </log>
15: <log level="full"/>
16: </inSequence>
17: </target>
18: <parameter name="transport.jms.ContentType">
19: <rules>
20: <jmsProperty>contentType</jmsProperty>
21: <default>application/xml</default>
22: </rules>
23: </parameter>
24: <parameter name="transport.jms.ConnectionFactory">myTopicConnectionFactory</parameter>
25: <parameter name="transport.jms.DestinationType">topic</parameter>
26: <parameter name="transport.jms.SubscriptionDurable">true</parameter>
27: <parameter name="transport.jms.Destination">events.#</parameter>
28: <parameter name="transport.jms.DurableSubscriberClientID">EventSubscriber</parameter>
29: <description/>
30: </proxy>
- Sign-in to ESB admin console and create a proxy service called "SportsNewsPublisherProxy" with following configuration ( This is the publisher which will publish to a topic called "events.news.sports").
- Tryout pub/sub model
- Sign-in to ESB management console and go to services list (Home > Manage > Services > List)
- Then select "Try this service" option on "SportsNewsPublisherProxy"
- Send following request on "tryit" window (instead of tryit you can use SoapUI kind of tool to simulate the same behavior).1: <event>
2: <news>
3: <sports>
4: <title>2014 FIFA World Cup</title>
5: <description>2014 FIFA World Cup will be held in Brazil from 12th June to 13th July.</description>
6: </sports>
7: </news>
8: </event>
- Once you send the request to the publisher you will see following log messages in ESB carbon log file.1: [2014-03-16 01:19:49,592] INFO - LogMediator Message = ************** Start SportsNewsPublisherProxy **************
2: [2014-03-16 01:19:49,837] INFO - LogMediator SportsNewsSubscriberProxy = ### I am Sports News subscriber (SportsNewsSubscriberProxy) ###
3: [2014-03-16 01:19:49,847] INFO - LogMediator NewsSubscriberProxy = ### I am News subscriber (NewsSubscriberProxy) ###
4: [2014-03-16 01:19:49,839] INFO - LogMediator EventSubscriberProxy = ### I am Event subscriber (EventSubscriberProxy) ###
- From above log messages it will show that all the three subscribers receives the message because all of them subscribed to the topic hierarchy "events.news.sports"
- First Subscriber subscribed to "events.news.sports" topic
- Second Subscriber subscribed to "events.news.*"
- Third Subscriber subscribed to "events.#" topic
- Once you are done with above steps you can test the durable topic capabilities of WSO2 MB by deactivating one of the Subscribers. (Note: in this post all the subscribers defined are marked as durable subscribers by using "transport.jms.DurableSubscriberClientID" parameter in proxy definitions)
- Sign-in to ESB management console and go to services list (Home > Manage > Services > List)
- Then click on "SportsNewsSubscriberProxy" and click on "Deactivate" option.
- Then send a request to "SportsNewsPublisherProxy" as explained earlier.
- Now you will not see the following log in the carbon console. 1: INFO - LogMediator SportsNewsSubscriberProxy = ### I am Sports News subscriber (SportsNewsSubscriberProxy) ###
- Now click on "Activate" option back to activate the "SportsNewsSubscriberProxy" again.
- Once it becomes to active state, you will see that it consumes the message which we sent in earlier step.
1: <Offset>1</Offset>
1: <!--Uncomment this and configure as appropriate for JMS transport support with WSO2 MB 2.x.x -->
2:
3: ransportReceiver name="jms" class="org.apache.axis2.transport.jms.JMSListener">
4: <parameter name="myTopicConnectionFactory" locked="false">
5: <parameter name="java.naming.factory.initial" locked="false">org.wso2.andes.jndi.PropertiesFileInitialContextFactory</parameter>
6: <parameter name="java.naming.provider.url" locked="false">repository/conf/jndi.properties</parameter>
7: <parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false">TopicConnectionFactory</parameter>
8: <parameter name="transport.jms.ConnectionFactoryType" locked="false">topic</parameter>
9: </parameter>
10:
11:
12: <parameter name="myQueueConnectionFactory" locked="false">
13: <parameter name="java.naming.factory.initial" locked="false">org.wso2.andes.jndi.PropertiesFileInitialContextFactory</parameter>
14: <parameter name="java.naming.provider.url" locked="false">repository/conf/jndi.properties</parameter>
15: <parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false">QueueConnectionFactory</parameter>
16: <parameter name="transport.jms.ConnectionFactoryType" locked="false">queue</parameter>
17: </parameter>
18:
19:
20: <parameter name="default" locked="false">
21: <parameter name="java.naming.factory.initial" locked="false">org.wso2.andes.jndi.PropertiesFileInitialContextFactory</parameter>
22: <parameter name="java.naming.provider.url" locked="false">repository/conf/jndi.properties</parameter>
23: <parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false">QueueConnectionFactory</parameter>
24: <parameter name="transport.jms.ConnectionFactoryType" locked="false">queue</parameter>
25: </parameter>
26: </transportReceiver>
1: <!-- uncomment this and configure to use connection pools for sending messages-->
2: <transportSender name="jms" class="org.apache.axis2.transport.jms.JMSSender"/>
Thank you so much for this sample. I used it and did the following modification to work with ESB 4.9.0 & MB 3.0.0.
ReplyDeleteEnd point URIs to all the Subscribing Proxy Services xml sources. My ESB version 4.9.0 wont let me save without specifying a URI
Ex: (xml tags removed for posting)
send
endpoint
address uri="http://localhost:9443/services/SportsNewsSubscriberProxy"
endpoint
send