下面來自百度百科
HornetQ 是一個支持 集群 和多種協議,可嵌入、高性能的異步消息系統。 HornetQ 完全支持 JMS , HornetQ 不但支持 JMS1.1 API 同時也定義屬于自己的消息 API ,這可以最大限度的提升 HornetQ 的性能和靈活性。在不久的將來更多的協議將被 HornetQ 支持。 [1]
HornetQ 擁有超高的性能, HornetQ 在持久化消息方面的性能可以輕易的超于其它常見的非持久化消息引擎的性能。當然, HornetQ 的非持久化消息的性能會表現的更好!
HornetQ 完全使用 POJO ,純 POJO 的設計讓 HornetQ 可以盡可能少的依賴第三方的包。從設計模式來說, HornetQ 這樣的設計入侵性也最小。 HornetQ 既可以獨立運行,也可以與其它 Java 應用程序服務器集成使用。
HornetQ 擁有完善的錯誤處理機制, HornetQ 提供服務器復制和故障自動轉移功能,該功能可以消除消息丟失或多個重復信息導致服務器出錯。
HornetQ 提供了靈活的集群功能,通過創建 HornetQ 集群,您可以享受到到消息的負載均衡帶來的性能提升。您也可以通過集群,組成一個全球性的消息網絡。您也可以靈活的配置消息路由。
HornetQ 擁有強大的管理功能。 HornetQ 提供了大量的管理 API 和監控服務器。它可以無縫的與應用程序服務器整合,并共同工作在一個 HA 環境中。
?
?
應用場景
首先 HornetQ 是一種消息服務中間件,高效可靠的消息傳遞機制進行平臺無關的 數據 交流,并基于數據通信來進行 分布式系統 的集成。通過提供消息傳遞和消息排隊模型,它可以在分布式環境下擴展進程間的通信。 本文檔主要涉及到 HornetQ 的 JMS 功能的使用, HornetQ 的 JMS 只是對 HornetQ 的一種封裝,適配了 java 的 JMS 協議。
?
如何集成到項目
HornetQ 目前大致有三種方式: standalone , embedded , Integrated ? with JBoss as 。
我個人傾向于 standalone 方式,因為:
1) ? 可以有更多的資源供 HornetQ 單獨使用
2) ? 管理的話只需要關注 HornetQ 這一個產品的問題就行,而無需引入其他的復雜度。
3) ? 原項目中也是把消息中間件作為一個單獨的模塊部署,對原來的流程可以做到無縫承接。
目前我只是關注了 HornetQ standalone 這一模式,其他的暫且沒有 深入。
?
使用 HornetQ 服務端很簡單,直接運行 % HornetQ _HOME%/bin 下的 bat/sh 就可以啟動(優化問題暫時沒有考慮)
?
客戶端推薦用 HornetQ 的 client 和 Spring 做集成, spring 的配置文件內容大致如下所示:
<? xml version = "1.0" encoding = "UTF-8" ?>
< beans xmlns = "http://www.springframework.org/schema/beans"
?????? xmlns:xsi = "http://www.w3.org/2001/XMLSchema-instance"
?????? xsi:schemaLocation = "http://www.springframework.org/schema/beans
?????????? http://www.springframework.org/schema/beans/spring-beans-3.0.xsd" >
?
??? < bean id = "listener" class = "org.hornetq.jms.example.ExampleListener" />
???
?
??? < bean id = "listenerContainer" class = "org.springframework.jms.listener.DefaultMessageListenerContainer" >
??????? < property name = "connectionFactory" ref = "ConnectionFactory" />
??????? < property name = "destination" ref = "/queue/exampleQueue" />
??????? < property name = "messageListener" ref = "listener" />
??? </ bean >
???
???
??? < bean id = "queueTarget" class = "org.springframework.jndi.JndiObjectTargetSource" >
???
??? ??? < property name = "jndiName" >
???
??? ??????? < value > queue/testQueue </ value >
???
??? ??? </ property >
???
??? ??? < property name = "jndiTemplate" >
???
??? ??????? < ref local = "jndiTemplate" />
???
??? ??? </ property >
???
??? </ bean >
???
??? < bean id = "jndiTemplate" class = "org.springframework.jndi.JndiTemplate" >
???
??? ??? < property name = "environment" >
???
??? ??????? < props >
???
??? ???? ?????? < prop key = "java.naming.factory.initial" > org.jnp.interfaces.NamingContextFactory </ prop >
???
??? ??????????? < prop key = "java.naming.provider.url" > jnp :// localhost :1099 </ prop >
???
??? ??????????? < prop key = "java.naming.factory.url.pkgs" > org.jboss.naming:org.jnp.interfaces </ prop >
???
??? ??????? </ props >
???
??? ??? </ property >
?
</ bean >
</ beans >
?
因為 HornetQ 的 client 主要是以 JNDI 和服務端進行連接,所以以上我們都是通過 Spring 提供的 JMS 模板類和 JNDI 模板類來對 HornetQ 的 client 進行配置與管理。
?
使用步驟
?? 具體示例主要是以本地 main 方法為主,用 spring 來管理的話也很簡單 .
首先加入 HornetQ 客戶端必須使用到的 HornetQ 工程的 jar 包
?

?
除了 jboss-client.jar ,其他的都可以在 HornetQ 的下載包里找到, jboss-client.jar 需要單獨的下載 JBoss AS ,我下載的是 JBoss AS7 , jboss-client.jar 的目錄為 % JBoss AS7_HOME%/bin/client
? JMS Queue
1) ?????? Queue Provider
?
public static void main(String[] args) throws Exception{
??????
?????? ??? // 初始化 JNDI
?????? ? ?? Properties properties = new Properties(); ?
??? ??????? properties.put( "java.naming.factory.initial" , ?
??? ??????????????? "org.jnp.interfaces.NamingContextFactory" ); ?
??? ??????? properties.put( "java.naming.factory.url.pkgs" , ?
??? ??????????????? "org.jboss.naming:org.jnp.interfaces" ); ?
??? ??????? properties.put( "java.naming.provider.url" , "jnp://localhost:1099" ); ?
??? ??????? InitialContext ic = new InitialContext(properties);
??? ???????
??? ??????? // 建立 ConnectionFactory
??? ??????? ConnectionFactory cf = (ConnectionFactory) ic ?
??? ??????????????? .lookup( "/ConnectionFactory" );
??? ???????
??? ??????? // 建立到 Queue 連接
??? ??????? Queue orderQueue = (Queue) ic.lookup( "queue/ExpiryQueue" );
??? ???????
??? ??????? // 通過 Queue 建立 Connection
??? ??????? Connection connection = cf.createConnection(); ?
??? ???????
??? ??????? // 通過 Connection 建立 session
??? ??????? Session session = connection.createSession( false , ?
??? ??????????????? Session. AUTO_ACKNOWLEDGE );
??? ???????
??? ??????? // 建立 JMS 生產者
??? ??????? MessageProducer producer = session.createProducer(orderQueue); ?
??? ??????? // 這一步必須,啟動 connection
??? ??????? connection.start(); ?
??? ???????
??? ??????? TextMessage message = ? session.createTextMessage( "First hornetq" );
??? ??????? producer.send(message);
??? ??????? System. out .println( "send success" );
?
?
2) ? Queue Consumer
?
??? ??? // 初始化 JNDI
?????? Properties properties = new Properties(); ?
??????? properties.put( "java.naming.factory.initial" , ?
??????????????? "org.jnp.interfaces.NamingContextFactory" ); ?
??????? properties.put( "java.naming.factory.url.pkgs" , ?
??????????????? "org.jboss.naming:org.jnp.interfaces" ); ?
??????? properties.put( "java.naming.provider.url" , "jnp://localhost:1099" ); ?
??????? InitialContext ic = new InitialContext(properties); ?
??? ??????? ConnectionFactory cf = (ConnectionFactory) ic ?
??? ??????????????? .lookup( "/ConnectionFactory" ); ?
??? ??????? Queue orderQueue = (Queue) ic.lookup( "queue/ExpiryQueue" ); ?
??? ??????? Connection connection = cf.createConnection(); ?
??? ??????? Session session = connection.createSession( false , ?
??? ??????????????? Session. AUTO_ACKNOWLEDGE ); ?
??? ??????? MessageConsumer consumer = session.createConsumer(orderQueue); ?
??? ??????? connection.start(); ?
??? ???????
上面建立連接的部分的注釋參考 Queue Provider
?
/* ? ??????? Message message = ? consumer.receive();*/
??? ???????
??? ??????? consumer.setMessageListener( new MessageListener() {
?????????????
????????????? @Override
????????????? public void onMessage(Message message) {
?
????????????????? if (message instanceof TextMessage) {
?????????? ??????? TextMessage textMessage = ? (TextMessage)message;
?????????? ??????? String text;
???????????????????? try {
???????????????????????? text = textMessage.getText();
????????????? ??????? System. out .println( "Get Text message" + text);
???????????????????? } catch (JMSException e) {
???????????????????????? e.printStackTrace();
???????????????????? }
?
?????????? ???????
?????????? ??????? } else {
?????????? ??????? System. out .println( "Get message" + message);
????????????????? }
?????????????????
?????????????
?????????????????
????????????? }
?????????? });
??? ???????
??? ??????? Thread. sleep (30000);
?
以上可以看到 Consumer 有二種方式
一種是調用 receive ,這樣會阻塞,直到有消息為止,第二種是注冊一個回調函數,實現 MessageListener 接口,這一種是異步的。
?
?
1) Topic Provider
?
??? public static void main(String[] args) throws Exception{
?????? Properties properties = new Properties(); ?
??????? properties.put( "java.naming.factory.initial" , ?
??????????????? "org.jnp.interfaces.NamingContextFactory" ); ?
??????? properties.put( "java.naming.factory.url.pkgs" , ?
??????????????? "org.jboss.naming:org.jnp.interfaces" ); ?
??????? properties.put( "java.naming.provider.url" , "jnp://localhost:1099" ); ?
??????? InitialContext initialContext = new InitialContext(properties);
??????? // Step 2. perform a lookup on the topic
// ??????? Topic topic = (Topic)initialContext.lookup("/topic/exampleTopic");
?
??????? // Step 3. perform a lookup on the Connection Factory
??????? ConnectionFactory cf = (ConnectionFactory)initialContext.lookup( "/ConnectionFactory" );
?
??????? // Step 4. Create a JMS Connection
??????? Connection connection = cf.createConnection();
???????
??????? // Step 11. Start the Connection
??????? connection.start();
?
??????? // Step 5. Create a JMS Session
??????? Session session = connection.createSession( false , Session. AUTO_ACKNOWLEDGE );
???????
// ??????? Topic topic = ? session.l("/topic/exampleTopic");
??????? Topic topic = ? (Topic)initialContext.lookup( "/topic/exampleTopic2" );
?
??????? // Step 6. Create a Message Producer
??????? MessageProducer producer = session.createProducer(topic);
?
??????? // Step 9. Create a Text Message
??????? TextMessage message = session.createTextMessage( "This is a text message" );
?
??????? System. out .println( "Sent message: " + message.getText());
?
??????? // Step 10. Send the Message
??????? producer.send(message);
???????
??????? System. out .println( "Topic send success" );
?
?
???? }
?
Topic 的 Provider 和 Queue 的 Provider 基本類似,只是一個是獲得 Queue ,另外一個是獲得 Topic
?
2) Topic Consumer
?
??? public static void main(String[] args) throws Exception{
?????? Properties properties = new Properties(); ?
??????? properties .put( "java.naming.factory.initial" , ?
??????????????? "org.jnp.interfaces.NamingContextFactory" ); ?
??????? properties .put( "java.naming.factory.url.pkgs" , ?
??????????????? "org.jboss.naming:org.jnp.interfaces" ); ?
??????? properties .put( "java.naming.provider.url" , "jnp://localhost:1099" ); ?
??????? InitialContext initialContext = new InitialContext( properties );
??????? // Step 2. perform a lookup on the topic
??????? Topic topic = (Topic)initialContext.lookup( "/topic/exampleTopic2" );
?
??????? // Step 3. perform a lookup on the Connection Factory
??????? ConnectionFactory cf = (ConnectionFactory)initialContext.lookup( "/ConnectionFactory" );
?
??????? // Step 4. Create a JMS Connection
??????? Connection connection = cf.createConnection();
?
??????? // Step 5. Create a JMS Session
??????? Session session = connection.createSession( false , Session. AUTO_ACKNOWLEDGE );
???????
??????? // Step 7. Create a JMS Message Consumer
??????? MessageConsumer messageConsumer1 = session.createConsumer(topic);
?
??????? // Step 8. Create a JMS Message Consumer
??????? MessageConsumer messageConsumer2 = session.createConsumer(topic);
???????
???? // Step 11. Start the Connection
??????? connection.start();
???????
??????? // Step 12. Receive the message
??????? TextMessage messageReceived = (TextMessage)messageConsumer1.receive();
?
??????? System. out .println( "Consumer 1 Received message: " + messageReceived.getText());
?
??????? // Step 13. Receive the message
??????? messageReceived = (TextMessage)messageConsumer2.receive();
?
??????? System. out .println( "Consumer 2 Received message: " + messageReceived.getText());
}
Topic 模式的測試必須是 consumer 先啟動,然后 provider 再啟動, consumer 才能獲得消息。這是由于 Topic 的特性而決定的。
?
后記:到目前為止,我還沒有找到方法像 ActiveMQ 那樣動態的創建 Queue 或者 Topic 的, ActiveMQ 中,如果向服務端發送請求,如果服務端沒有這個 Queue 或者 Topic ,那么服務端會自動創建一個,但是 HornetQ 中沒有這個功能,必須在配置文件中配置想要的 Queue 或者 Topic , HornetQ 服務端會熱加載配置文件。
如果直接啟動的話, HornetQ 默認加載 %HornetQ_HOME%\config\stand-alone\non-clustered\ hornetq-jms.xml
具體配置如下:
? <queue name="DLQ">
????? <entry name="/queue/DLQ"/>
?? </queue>
??
?? <queue name="ExpiryQueue">
????? <entry name="/queue/ExpiryQueue"/>
?? </queue>
??
?? <topic name="exampleTopic">
????? <entry name="/topic/exampleTopic"/>
?? </topic>
??
?? <topic name="exampleTopic2">
????? <entry name="/topic/exampleTopic2"/>
?? </topic>
?
HornetQ 本身并沒有提供想 ActiveMQ 那樣的網頁管理界面,它必須和 JBoss asapplication server 集成,或者自己寫程序調用它提供的接口,但這樣無形提高了工作量。
有個簡單變通的方法,因為 HornetQ 里面集成了 JMX ,所以可以通過 jdk 的工具 jconsole 來查看里面的一些信息,和操作里面的一些屬性,達到管理的目的
?
?
更多文章、技術交流、商務合作、聯系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號聯系: 360901061
您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元
