Přeskočit na hlavní obsah
  1. Tags/

Jms

2013


Vytvoření JMS Bridge na WebLogicu

Messaging bridge je šikovné řešení, pokud potřebujeme distribuovat zprávy mezi několika messaging systémy. Potřeboval jsem vytvořit JMS bridge na WebLogicu a protože jsem si oblíbil WebLogic Scripting Tool (WLST), tak jsem si napsal skript.

Nicméně dnes, vás milí čtenáři, nechci odfláknout pouhým WLST skriptem, ale podíváme se na téma trochu zeširoka. Prvně si zasadíme JMS bridge do kontextu Enterprise Integration Patterns (EIP), pak se podíváme, jak jde bridge naklikat ve WebLogic konzoli. A samozřejmě vás neochudím o to WLST :-)

Messaging Bridge

Nebyl bych to já, kdybych se nevytasil s nějakým patternem. Tak tady je - Messaging Bridge, jak je definován v EIP. (Věty kurzivou jsou citacemi z knihy Enterprise Integration Patterns: Designing, Building, and Deploying Messaging Solutions.)

Messaging Bridge řeší následující problém: "How can multiple messaging systems be connected so that messages available on one are also available on the others?" Integrační vzory řeší problémy obecně. V tomto případě jde o to, jak dostat zprávy z jednoho systému na jiný. Např. jak dostat zprávy z WebSphere MQ na MSMQ. Nebo JMS. Nebo TIBCO Randevouz. Nebo... Jasný, ne? Odkudkoliv kamkoliv.

Messaging Bridge tedy je "a way for messages on one messaging system that are of interest to applications on another messaging system to be made available on the second messaging system as well."

Protože většinou neexistuje způsob, jak propojit dva různé messaging systémy, propojují se jednotlivé, odpovídající kanály na daných systémech. "The Messaging Bridge is a set of Channel Adapters ... where each pair of adapters connects a pair of corresponding channels. The bridge acts as a map from one set of channels to the other and transforms the message format of one system to the other."

Vzor Messaging Bridge (zdroj EIP)

JMS Bridge

JMS bridge je speciální variantou Messaging bridge, který má několik omezení, nebo spíše možná zjednodušení. Tři hlavní jsou:
  • Propojuje pouze JMS systémy (různých dodavatelů).
  • Zprávy netransformuje (protože pracuje pouze s JMS zprávami).
  • Zprávy z jednoho (zdrojového) systému přeposílá na jiný (cílový). Čili nečiní je pouze dostupnými, ale provádí routing/forwarding.

JMS bridge běžně poskytují všichni dodavatelé JMS systémů, např.:

Zajímavým atributem u JMS bridge je Quality of Service (QoS), který říká, v jakém modu budou zprávy přeposílány:
  • At most once - nanejvýš jednou. Toto je nejbenevolentnější mod. Zpráva buď dorazí (maximálně jednou), anebo taky ne. A nám to tak vyhovuje.
  • Duplicates OK - duplicity jsou v pořádku. V tomto modu jsou zprávy potvrzeny, že dorazily na cílový systém. Může se stát, že stejná zpráva nám dorazí ve více instancích, ale to je v pořádku - řekli jsme přece, duplicity jsou OK. Výhodou je, že se žádná zpráva neztratí.
  • Exactly once - přesně jednou. Toto je nejvíc striktní mod, který nám nejen zaručuje, že každá zpráva dorazí na cílový systém, ale taky že tam dorazí právě jednou. Takový závazek není jen tak a zajistí nám ho JTA, čili distribuovaná transakce.

WebLogic JMS Bridge

Vytvoření JMS bridge na WebLogicu je otázkou chvilky, stačí mít připraveny správné ingredience. Co budeme potřebovat?
  • URL (a credentials) zdrojového serveru,
  • URL (a credentials) cílového serveru,
  • JNDI JMS adapteru (transakční, nebo netransakční)
    • eis.jms.WLSConnectionFactoryJNDIXA
    • eis.jms.WSLConnectionFactoryJNDINoTX
  • JNDI Connection Factory,
  • JNDI JMS destinace (fronty),
  • Quality of Service
    • Exactly-once
    • Duplicate-okay
    • Atmost-once

Pak už stačí jen naházet to do hrnce, zamíchat, podusit... a proklikat se průvodcem. Protože těch obrazovek ve wizardu je zbytečně moc, ukážu jenom screenshoty konečného stavu.

Pokud chceme použít transakční mod Exactly once, je dobré ještě před vytvářením bridge deployovat transakční adaptér jms-xa-adp.rar - vyhneme se tak zbytečným restartům (adapteru či WebLogicu). Adapter najdeme mezi knihovnami WebLogicu a nasazujeme ho jako aplikaci.

Nasazený resource adapter jms-xa-adp.rar (ve struktuře Deployments)

Bridge se sestává ze dvou destinací (zdrojové a cílové):

Definice JMS (source) destination

a samotného bridge:

Definice JMS bridge

Že nám bridge funguje, poznáme podle jeho stavu (záložka Monitoring) a taky doporučuju si nějakou zprávu cvičně přeposlat. Světe div se, ale transakce můžou zlobit.

Stavy JMS bridgů (záložka Monitoring)

WLST

No a máme tady velké finále, ke kterému jsem nenápadně, ale cílevědomě směřoval. Takže tady je: WLST v celé své pythoní kráse :-)
#
# properties
#
user = 'weblogic'
password = 'welcome1'
server = 't3://sandman:7001'
# source credentials
sourceJmsUser = 'weblogic'
sourceJmsPassword = 'welcome1'
sourceURL = 't3://sandman:7003'
# target credentials
targetJmsUser = 'weblogic'
targetJmsPassword = 'welcome1'
targetURL = 't3://sandman:7004'
# JMS servers
servers = ['SourceServer']
targets = []
# queues
queues = [
'EVM_EVE_CMS',
'EVM_EVE_FC',
'EVM_EVE_GEN',
'EVM_EVE_PTS',
'NTF_PARTY',
'NTF_PRODUCT']
adapterJNDI = 'eis.jms.WLSConnectionFactoryJNDIXA'
connectionFactory = 'jms/sws/SWSConnectionFactory'
qualityOfService = 'Exactly-once'
jndiPrefix = 'jms/sws/'

# connection
connect(user, password, server)

# edit mode
edit()
startEdit()

# get targets
for server in servers:
targets.append(getMBean('/Servers/' + server))
print '\nTargets:', targets, '\n'

#
# create bridges
#
print '\n### Create bridges:\n'
for queue in queues:
bridgeName = queue + '_bridge'
sourceName = queue + '_source'
targetName = queue + '_target'
# delete an old bridge
ref = getMBean('/MessagingBridges/' + bridgeName)
if ref != None:
cd('/MessagingBridges')
delete(bridgeName, 'MessagingBridge')
# delete an old source destination
ref = getMBean('/JMSBridgeDestinations/' + sourceName)
if ref != None:
cd('/JMSBridgeDestinations')
delete(sourceName, 'JMSBridgeDestination')
# delete an old target destination
ref = getMBean('/JMSBridgeDestinations/' + targetName)
if ref != None:
cd('/JMSBridgeDestinations')
delete(targetName, 'JMSBridgeDestination')
# create a new source destination
cd('/')
cmo.createJMSBridgeDestination(sourceName)
cd('/JMSBridgeDestinations/' + sourceName)
cmo.setAdapterJNDIName(adapterJNDI)
cmo.setConnectionFactoryJNDIName(connectionFactory)
cmo.setConnectionURL(sourceURL)
cmo.setDestinationJNDIName(jndiPrefix + queue)
cmo.setUserName(sourceJmsUser)
cmo.setUserPassword(sourceJmsPassword)
print 'Source:', cmo
# create a new target destination
cd('/')
cmo.createJMSBridgeDestination(targetName)
cd('/JMSBridgeDestinations/' + targetName)
cmo.setAdapterJNDIName(adapterJNDI)
cmo.setConnectionFactoryJNDIName(connectionFactory)
cmo.setConnectionURL(targetURL)
cmo.setDestinationJNDIName(jndiPrefix + queue)
cmo.setUserName(targetJmsUser)
cmo.setUserPassword(targetJmsPassword)
print 'Target:', cmo
# create a new bridge
cd('/')
cmo.createMessagingBridge(bridgeName)
cd('/MessagingBridges/' + bridgeName)
cmo.setSourceDestination(getMBean('/JMSBridgeDestinations/' + sourceName))
cmo.setTargetDestination(getMBean('/JMSBridgeDestinations/' + targetName))
cmo.setStarted(true)
cmo.setQualityOfService(qualityOfService)
cmo.setTargets(targets)
print 'Bridge:', cmo, '\n'

# save and finish
save()
activate()
disconnect()
exit()

Související články

Vytvoření WebLogic Distributed Queue pomocí WLST

·3 min

Pokud nějaká aplikace používá JMS zdroje, bývají  tyto zpravidla externí. (Výjimkou je JMS broker embeddovaný uvnitř aplikace.) Tyto externí zdroje bývají často vytvořeny na aplikačním serveru, ve kterém je většinou JMS server už obsažen. Pokud naše aplikace používá např. JMS fronty, musí je “někdo” na daném JMS serveru vytvořit. Ten někdo je na vývojovém a někdy i testovacím prostředí vývojář, na dalších prostředích už to bývá administrátor.

Podle daného aplikačního serveru se JMS zdroje dají buď naklikat v nějaké administrátorské konzoli, nebo je potřeba poeditovat/vytvořit nějaké konfigurační soubory. Třetí možností je tyto  zdroje nějakým nástrojem naskriptovat. V případě aplikačního serveru WebLogic je takovým nástrojem WebLogic Scripting Tool (WLST).

Jak už jsem psal v minlém zápisku o mazání dat z MDS, WLST je v Jythonu napsaný nástroj pro správu WebLogic serveru, který funguje ve dvou režimech - offline a online. V online režimu se WLST připojuje k běžícímu WebLogicu a operuje nad stromem jeho MBeans. Pro správu JMS zdrojů je potřeba používat WLST online.

Distributed Queue

Věc, kterou jsem potřeboval vyřešit, bylo vytvoření distribuované fronty ve WebLogicovém clusteru. Cluster byl velmi jednoduchý - admin server a dva nody (managed servery). Vzhledem ke clusteru bylo potřeba vytvořit logickou frontu, která by měla stejný JNDI (jako na vývojovém prostředí s jedním nodem) a zastřešovala by fronty na jednotlivých nodech. Na WebLogicu je toto řešeno distribuovanými destinacemi (queue/topic). Pokud si to neumíte představit, distribuovaná fronta funguje v podstatě jako klasický load balancer.

Distribuovaná fronta (WebLogic Administration Console)

Členové distribuované destinace (WebLogic Administration Console)

WLST a JMS

Použití WLST je po krátké praxi poměrně intuitivní a jednoduché. Zpočátku může dělat problém se orientovat ve struktuře (stromech) MBean. V tomto může napomoci celkem slušná dokumentace: navigace MBeans, Javadoc a MBean reference. Pak už stačí jenom lehké základy Pythonu.

V následujícím skriptu je několik věcí, které můžou trochu ztížit čtení/pochopení skriptu, takže ještě kratičká legenda:
  • Target. Cílové umístění zdroje, nebo aplikace, např. server, cluster, JMS server ad. V případě deploymentu (aplikace) tím říkám, na které nody/clustery chci aplikaci nasadit.
  • SubDeployment. Mechanismus pro seskupení a umístění JMS zdrojů. V rámci subdeploymentu říkám, na které cíle (targets) chci dané zdroje nasadit. Může to být libovolná kombinace, nebo podmnožina.
  • cmo. Proměnná, která reprezentuje “aktuálně spravovaný objekt” (current management object). Je to vlastně MBeana, která má momentálně “focus”. Tak, jak se prochází stromem MBean, tak se tato proměnná automaticky mění.

# properties
user = ‘weblogic’
password = ’<password>’
server = ‘t3://<host>:7001’
subDeploymentName = ‘EVMJMSServers’
queuePath = ‘JMSResource/SOAJMSModule/UniformDistributedQueues/’
queueName = ‘EVM_DLQ’
jndiPrefix = ‘jms/b2b/’
loadBalancing = ‘Round-Robin’

# connection
connect(user, password, server)

# edit mode
edit()
startEdit()

# get targets
s1 = getMBean(’/JMSServers/SOAJMSServer_auto_1’)
s2 = getMBean(’/JMSServers/SOAJMSServer_auto_2’)

#
# create a SubDeployment
#
cd(’/JMSSystemResources/SOAJMSModule’)
# delete an old SubDeployment
if cmo.lookupSubDeployment(subDeploymentName):
delete(subDeploymentName, ‘SubDeployment’)
# create a new SubDeployment
cmo.createSubDeployment(subDeploymentName)
subDeployment = cmo.lookupSubDeployment(subDeploymentName)
subDeployment.setTargets([s1, s2])

#
# create a ldistributed queue
#
resource = cmo.getJMSResource()
# delete an old queue
ref = getMBean(queuePath + queueName)
if ref != None:
cd(‘JMSResource/SOAJMSModule’)
delete(queueName, ‘UniformDistributedQueue’)
# create a new queue
resource.createUniformDistributedQueue(queueName)
distributedQueue = resource.lookupUniformDistributedQueue(queueName)
distributedQueue.setJNDIName(jndiPrefix + queueName)
distributedQueue.setLoadBalancingPolicy(loadBalancing)
distributedQueue.setSubDeploymentName(subDeploymentName)

# save and finish
save()
activate()
disconnect()
exit()

Související články

2012


ActiveMQ, messaging podle Apache

·3 min
Poslední dobou jsem se zabýval integračním projektem, který používal komerční technologii od IBM - WebSphere Messaage Broker, který jako infrastrukturu využívá WebSphere MQ. Abych měl, jako solution architekt, k tomuto řešení nějakou alternativu, rozhodl jsem se nastudovat open source messagingovou platformu ActiveMQ od Apache Software Foundation (ASF).

ActiveMQ je messagingový server postavený nad specifikací Java Message Service (JMS), a který ve spojení i frameworkem Apache Camel implementuje Enterprise Integration Patterns (EIP). Obecné vlastnosti ActiveMQ se dají shrnout do následujících bodů:
  • implementace JMS 1.1,
  • integrace se Springem,
  • integrace s (Java) aplikačními servery,
  • vlastní protokol OpenWire pro high perfomance klienty v Javě, C, C++ a C#,
  • embedded broker,
  • broker clustering,
  • REST API,
  • AJAX API,
  • ad.
Výborným zdrojem informací je kniha AcitveMQ in Action přímo od autorů a přispěvatelů ActiveMQ. Vypíchnul bych z ní pár momentů, které mne zaujaly.

Komunikační protokoly
Klienti se můžou k brokeru připojit pomocí různých protokolů, které jsou vystaveny pomocí tzv. transportních konektorů. Mmj. jsou k dispozici obligátní:
  • TCP
  • SSL
  • HTTP(S)
  • UDP
Pokud klient i broker běží v jednom JVM, může klient použít VM protokol, který spustí embedded broker. Komunikace pak neprobíhá po síti, jako u ostatních typů konektorů, ale klient volá přímo metody na objektu (instanci) brokeru.

Pro konfiguraci konektoru se používá následující URI:
  • scheme://host:port?queryKey=queryValue
Konfigurace několika konektorů pak může vypadat třeba takto:
<transportConnectors>
<transportConnector
name="openwire"
uri="tcp://localhost:61616?trace=true"/>
<transportConnector
name="ssl"
uri="ssl://localhost:61617"/>
<transportConnector
name="vm"
uri="vm://localhost"/>
</transportConnectors>
Message Storage
JMS specifikace definuje dva způsoby doručení zpráv (DeliveryMode) - perzistentní a neperzistentní. Pokud jsou zpráva, nebo producent nastavený jako perzistentní, musí JMS provider zajistit bezpečné uložení zpráv (aby např. přežily výpadek serveru). ActiveMQ nabízí pro uskladnění zpráv čtyři strategie:

  • KahaDB message store. Rychlá a škálovatelná file-based perzistence s transakčním žurnálem a rychlým zotavením.
  • AMQ message store. Předchůdce KahaDB, obdobné vlastnosti.
  • JDBC message store. Perzistence zpráv do relační databáze.
  • Memory message store. Všechny pezistentní zprávy jsou drženy v paměti, tj. nejsou perzistovány ve smyslu JMS specifikace.
KahaDB se skládá ze tří komponent:
  • Cache drží zprávy pro aktivní konzumenty.
  • Data logy slouží jako transakční žurnál. Obsahují uložené zprávy a transakční příkazy.
  • BTree indexy referencují zprávy v data logu.

Schéma KahaDB  (zdroj ActiveMQ in Action)

REST API
ActiveMQ poskytuje jednoduché API pro publikaci a konzumaci zpráv RESTovým způsobem. Vzhledem k tomu, že JMS poskytuje pouze dvě operace - send a receive - je mapování na HTTP velmi přímočaré. Pro publikování zpráv je to (HTTP) POST, pro jejich konzumaci (HTTP) GET nebo DELETE.

Mapování na URI pak vypadá takto:
  • http://host:port/queue
  • http://host:port/topic/subtopic

High Availability
Pokud budeme chtít zajistit vysokou dostupnost našeho messagingového řešení, budeme potřebovat několik brokerů běžících na různých strojích. Něco jako:


High Availability v režii ActiveMQ je zajištěna dvěma typy Master/Slave konfigurací:
  • Shared nothing
  • Shared storage
U shared nothing konfigurece má master i slave vlastní message store. Slave se po startu připojí k masteru a veškeré stavy masteru (zprávy, akcknowledgements, transakce atd.) jsou replikovány na slave. Když master spadne, jsou všichni klienti přesměrováni (pomocí fail over protokolu) na slave, který se stává novým masterem. Konfigurace fail over protokolu vypadá takto:
  • failover://(tcp://master:61616,tcp://slave:61616)
Omezením tohoto řešení je, že master může mít definovaný pouze jeden slave (a slave nemůže mít definovaný další vlastní slave).

Shared nothing master/slave (zdroj ActiveMQ in Action)
Shared storage konfigurace umožňuje, aby bylo definováno více slave brokerů, které spolu s masterem, sdílejí společný storage mechanismus - tím může být sdílený filesystem, nebo relační databáze. Master má pak storage zamknutý. Když master spadne, jeden ze slave brokerů se stane novým masterem a zamkne si storage pro sebe.

Shared storage master/slave (zdroj ActiveMQ in Action)
Závěrem
Pokud to mám říct jednou větou - ActiveMQ se mi architektonicky/technologicky líbí a pokud bych na nějakém projektu potřeboval řešit messaging v Javě, určitě by to byl žhavý kandidát. Taky mi to nedá, abych nesrovnal ActiveMQWebSphere MQ. Co se týče funkčností, jsou obě řešení srovnatelná. Veliký rozdíl ovšem bude v pracnosti a nákladech - ActiveMQ půjde implementovat nesrovnatelně levněji a rychleji. No a samozřejmě cena, zde je rozdíl astronomický - ActiveMQ je zdarma, WebSphere MQ bude stát řádově miliony korun jenom na licencích.

Další cesta, jak rozvíjet znalosti o ActiveMQ je celkem jednoznačná - Apache Camel, což je implementace EIP od ASF, která řeší věci jako vytváření zpráv, jejich směrování, transformaci, orchestraci ad.

WebSphere MQ, interakce s Javou

·3 min

Pokud se chceme k WebShere MQ (WMQ) připojit z Javy, máme k dispozici dvě možnosti - buď použít WMQ třídy pro Javu, nebo je možné komunikovat pomocí Java Message Service (JMS). Obě možnosti mají svoje pro a proti, takže krátké shrnutí v pár bodech:

  • WMQ třídy pro Javu:
    • zapouzdřují Message Queue Interface (MQI),
    • poskytují plnou sadu funkčností WMQ,
    • je to proprietární řešení, ale jednodušší k používání, než JMS.
  • (WMQ třídy pro) JMS:
    • Java “industry standard” pro messaging,
    • je součástí Java EE specifikace (a tedy součástí většiny (Java) aplikačních serverů),
    • umožňuje spravovat administrované objekty (connection factory, fronty ad.) v centrální repository.
Společná konfigurace
Ať už použijeme jeden, nebo druhý způsob, v obou případech je potřeba mít vytvořené některé objekty. Všechny použité typy objektů jsem popisoval již v minulém zápisu. Jedinou informací navíc je, že vytvořený kanál musí být typu server-connection.


WMQ třídy pro Javu
Třídy pro Javu jsou poměrně přímočaré - stačí, pokud člověk rozumí hierarchii a funkci základních objektů ve WMQ:

  1. Vytvoří se instance Queue Manageru.
  2. Z něj se získá instace Queue, s parametrem daného typu otevření (procházení, vstup, výstup atd.).
  3. Vytvoří se Message a nastaví se jí nějaká data.
  4. Zpráva se vloží do (nebo načte z) fronty.
  5. Zavřou se zdroje.
package com.adastracorp.jprase.wmq.java;

import com.ibm.mq.MQMessage;
import com.ibm.mq.MQQueue;
import com.ibm.mq.MQQueueManager;
import com.ibm.mq.constants.CMQC;


public class JavaProvider {

private static final String QM_NAME = “QM_JAVA”;
private static final String HOSTNAME =
“192.168.6.128”;
private static final String CHANNEL =
“JAVA.CHANNEL”;
private static final int PORT = 5557;
private static final String QUEUE = “JPRASE”;

private static void init() {
MQEnvironment.hostname = HOSTNAME;
MQEnvironment.channel = CHANNEL;
MQEnvironment.port = PORT;
}

public static void main(String[] args)
throws Exception {
init();

MQQueueManager queueManager =
new MQQueueManager(QM_NAME);
MQQueue queue = queueManager
.accessQueue(QUEUE,
CMQC.MQOO_OUTPUT |
CMQC.MQOO_INQUIRE);

MQMessage message = new MQMessage();
message.writeUTF(“Hello, WMQ!");

queue.put(message);

queue.close();
queueManager.disconnect();
}
}

WMQ třídy pro JMS
U JMS je trochu více konfigurace - je potřeba vytvořit Initial Context a do něj vložit Connection Factory a Destination. Destination je mapovaná na (existující) frontu a queue managera. WMQ v současné verzi (7.0.1.2) poskytuje pro JNDI dvě implementace (ale je možné použít i jiné, např. JNDI na WebSphere AS):

  • LDAP server (com.sun.jndi.ldap.com.sun.jndi.fscontext.RefFSContextFactory),
  • File system (com.sun.jndi.fscontext.RefFSContextFactory).


Výsledná konfigurace vypadá takto:


Kód samotný je pak klasické JMS:
  1. Vytvoření InitialContext.
  2. Vyhledání ConnectionFactory.
  3. Vytvoření Connection.
  4. Vytvoření Session.
  5. Vytvoření Destination.
  6. Vytvoření MessageProducer/MessageConsumer.
  7. Vytvoření Message (pro producenta).
  8. Odeslání (příjem) Message.
  9. Zavření zdrojů.
package com.adastracorp.jprase.wmq.jms;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.Context;

public class JmsProvider {

private static final String CONTEXT_FACTORY =
“com.sun.jndi.fscontext.RefFSContextFactory”;
private static final String PROVIDER_URL =
“file:///jms”;

private static Context getContext()
throws NamingException {
Properties env = new Properties();
env.put(Context.INITIAL_CONTEXT_FACTORY,
CONTEXT_FACTORY);
env.put(Context.PROVIDER_URL, PROVIDER_URL);

return new InitialContext(env);
}

public static void main(String[] args)
throws Exception {
Context context = getContext();

ConnectionFactory factory = (ConnectionFactory)
context.lookup(“jmsConnFact”);
Connection connection = factory
.createConnection();
Session session = connection
.createSession(false,
Session.AUTO_ACKNOWLEDGE);
Destination destination = session
.createQueue(“JPRASE”);
MessageProducer producer = session
.createProducer(destination);
producer.setDeliveryMode(
DeliveryMode.NON_PERSISTENT);
TextMessage message = session
.createTextMessage(
“Hello, JMS!");
producer.send(message);

producer.close();
session.close();
connection.close();
}
}