本人實現的功能為activemq將消息持久化到數據庫的方法:
1:前言
???? 這一段給公司開發消息總線有機會研究ActiveMQ,今天撰文給大家介紹一下他的持久化消息。本文只介紹三種方式,分別是持久化為文件,MYSql,Oracle。下面逐一介紹。
A:持久化為文件
???? 這個你裝ActiveMQ時默認就是這種,只要你設置消息為持久化就可以了。涉及到的配置和代碼有
< kahaDB directory = " ${activemq.base}/data/kahadb " />
</ persistenceAdapter >
producer.Send(request, MsgDeliveryMode.Persistent, level, TimeSpan.MinValue);
B:持久化為MySql
???? 你首先需要把MySql的驅動放到ActiveMQ的Lib目錄下,我用的文件名字是:mysql-connector-java-5.0.4-bin.jar
???? 接下來你修改配置文件
< jdbcPersistenceAdapter dataDirectory = " ${activemq.base}/data " dataSource = " #derby-ds " />
</ persistenceAdapter >
在配置文件中的broker節點外增加
< property name = " driverClassName " value = " com.mysql.jdbc.Driver " />
< property name = " url " value = " jdbc:mysql://localhost/activemq?relaxAutoCommit=true " />
< property name = " username " value = " activemq " />
< property name = " password " value = " activemq " />
< property name = " maxActive " value = " 200 " />
< property name = " poolPreparedStatements " value = " true " />
</ bean >
從配置中可以看出數據庫的名稱是activemq,你需要手動在MySql中增加這個庫。
然后重新啟動消息隊列,你會發現多了3張表
1:activemq_acks
2:activemq_lock
3:activemq_msgs
C:持久化為Oracle
??? 和持久化為MySql一樣。這里我說兩點
1;在ActiveMQ安裝文件夾里的Lib文件夾中增加Oracle的JDBC驅動。驅動文件位于Oracle客戶端安裝文件中的product\11.1.0\client_1\jdbc\lib文件夾下。
2:
< property name = " driverClassName " value = " oracle.jdbc.driver.OracleDriver " />
< property name = " url " value = " jdbc:oracle:thin:@10.53.132.47:1521:test " />
< property name = " username " value = " qdcommu " />
< property name = " password " value = " qdcommu " />
< property name = " maxActive " value = " 200 " />
< property name = " poolPreparedStatements " value = " true " />
</ bean >
這里的jdbc:oracle:thin:@10.53.132.47:1521:test按照自己實際情況設置一下就可以了,特別注意的是test是SID即服務名稱而不是TNS中配置的節點名。各位同學只需要替換IP,端口和這個SID就可以了。
消息消費者的事先代碼:
package easyway.activemq.app; import javax.jms.Connection; import javax.jms.Destination; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; /*** * 消息持久化到數據庫 * @author longgangbai */ public class MessageCustomer { private static Logger logger=LogManager.getLogger(MessageProductor.class); private String username=ActiveMQConnectionFactory.DEFAULT_USER; private String password=ActiveMQConnectionFactory.DEFAULT_PASSWORD; private String url=ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL; private static String QUEUENAME="ActiveMQ.QUEUE"; protected static final int messagesExpected = 10; protected ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory( url+"?jms.prefetchPolicy.all=0&jms.redeliveryPolicy.maximumRedeliveries="+messagesExpected); /*** * 創建Broker服務對象 * @return * @throws Exception */ public BrokerService createBroker()throws Exception{ BrokerService broker=new BrokerService(); broker.addConnector(url); return broker; } /** * 啟動BrokerService進程 * @throws Exception */ public void init() throws Exception{ BrokerService brokerService=createBroker(); brokerService.start(); } /** * 接收的信息 * @return * @throws Exception */ public int receiveMessage() throws Exception{ Connection connection=connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(true, Session.SESSION_TRANSACTED); return receiveMessages(messagesExpected,session); } /** * 接受信息的方法 * @param messagesExpected * @param session * @return * @throws Exception */ protected int receiveMessages(int messagesExpected, Session session) throws Exception { int messagesReceived = 0; for (int i=0; i<messagesExpected; i++) { Destination destination = session.createQueue(QUEUENAME); MessageConsumer consumer = session.createConsumer(destination); Message message = null; try { logger.debug("Receiving message " + (messagesReceived+1) + " of " + messagesExpected); message = consumer.receive(2000); logger.info("Received : " + message); if (message != null) { session.commit(); messagesReceived++; } } catch (Exception e) { logger.debug("Caught exception " + e); session.rollback(); } finally { if (consumer != null) { consumer.close(); } } } return messagesReceived; } public String getPassword() { return password; } public void setPassword(String password) { this.password = password; } public String getUrl() { return url; } public void setUrl(String url) { this.url = url; } public String getUsername() { return username; } public void setUsername(String username) { this.username = username; } }
?消息生產者的代碼:
package easyway.activemq.app; import java.io.File; import java.util.Properties; import javax.jms.Connection; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.sql.DataSource; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.store.jdbc.adapter.MySqlJDBCAdapter; import org.apache.commons.dbcp.BasicDataSourceFactory; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import easyway.activemq.app.utils.BrokenPersistenceAdapter; /** * 消息持久化到數據庫 * @author longgangbai * */ public class MessageProductor { private static Logger logger=LogManager.getLogger(MessageProductor.class); private String username=ActiveMQConnectionFactory.DEFAULT_USER; private String password=ActiveMQConnectionFactory.DEFAULT_PASSWORD; private String url=ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL; private static String queueName="ActiveMQ.QUEUE"; private BrokerService brokerService; protected static final int messagesExpected = 10; protected ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory( "tcp://localhost:61617?jms.prefetchPolicy.all=0&jms.redeliveryPolicy.maximumRedeliveries="+messagesExpected); /*** * 創建Broker服務對象 * @return * @throws Exception */ public BrokerService createBroker()throws Exception{ BrokerService broker=new BrokerService(); BrokenPersistenceAdapter jdbc=createBrokenPersistenceAdapter(); broker.setPersistenceAdapter(jdbc); jdbc.setDataDirectory(System.getProperty("user.dir")+File.separator+"data"+File.separator); jdbc.setAdapter(new MySqlJDBCAdapter()); broker.setPersistent(true); broker.addConnector("tcp://localhost:61617"); //broker.addConnector(ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL); return broker; } /** * 創建Broken的持久化適配器 * @return * @throws Exception */ public BrokenPersistenceAdapter createBrokenPersistenceAdapter() throws Exception{ BrokenPersistenceAdapter jdbc=new BrokenPersistenceAdapter(); DataSource datasource=createDataSource(); jdbc.setDataSource(datasource); jdbc.setUseDatabaseLock(false); //jdbc.deleteAllMessages(); return jdbc; } /** * 創建數據源 * @return * @throws Exception */ public DataSource createDataSource() throws Exception{ Properties props=new Properties(); props.put("driverClassName", "com.mysql.jdbc.Driver"); props.put("url", "jdbc:mysql://localhost:3306/activemq"); props.put("username", "root"); props.put("password", "root"); DataSource datasource=BasicDataSourceFactory.createDataSource(props); return datasource; } /** * 啟動BrokerService進程 * @throws Exception */ public void init() throws Exception{ createBrokerService(); brokerService.start(); } public BrokerService createBrokerService() throws Exception{ if(brokerService==null){ brokerService=createBroker(); } return brokerService; } public void sendMessage() throws JMSException{ Connection connection=connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue(queueName); MessageProducer producer = session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.PERSISTENT); for(int i=0;i<messagesExpected;i++){ logger.debug("Sending message " + (i+1) + " of " + messagesExpected); producer.send(session.createTextMessage("test message " + (i+1))); } connection.close(); } public String getPassword() { return password; } public void setPassword(String password) { this.password = password; } public String getUrl() { return url; } public void setUrl(String url) { this.url = url; } public String getUsername() { return username; } public void setUsername(String username) { this.username = username; } }
?
持久化適配器類
package easyway.activemq.app.utils; import java.io.IOException; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * * @author longgangbai * */ public class BrokenPersistenceAdapter extends JDBCPersistenceAdapter { private final Logger LOG = LoggerFactory.getLogger(BrokenPersistenceAdapter.class); private boolean shouldBreak = false; @Override public void commitTransaction(ConnectionContext context) throws IOException { if ( shouldBreak ) { LOG.warn("Throwing exception on purpose"); throw new IOException("Breaking on purpose"); } LOG.debug("in commitTransaction"); super.commitTransaction(context); } public void setShouldBreak(boolean shouldBreak) { this.shouldBreak = shouldBreak; } }
?
測測試代碼如下:
?
?
package easyway.activemq.app.test; import easyway.activemq.app.MessageProductor; public class MessageProductorTest { public static void main(String[] args) throws Exception { MessageProductor productor =new MessageProductor(); productor.init(); productor.sendMessage(); //productor.createBrokerService().stop(); } }
?
package easyway.activemq.app.test; import easyway.activemq.app.MessageCustomer; public class MessageCustomerTest { public static void main(String[] args) throws Exception { MessageCustomer customer=new MessageCustomer(); //customer.init(); //當兩臺機器在不同的服務器上啟動客戶端的broker進程 customer.receiveMessage(); } }
?
?
備注:運行過程為:首先執行MessageProductorTest,MessageCustomerTest。
??????? mysql數據庫activemq必須存在。關于消息持久化的表結構如下:
?
更多文章、技術交流、商務合作、聯系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號聯系: 360901061
您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元
