longgangbai 发表于 2013-1-30 02:12:32

ActiveMQ基于derby数据库的spring整合

服务端代码:
package easyway.activemq.app.demo3;import java.io.File;import java.io.FileInputStream;import java.io.IOException;import java.io.InputStream;import javax.jms.Connection;import javax.jms.Destination;import javax.jms.JMSException;import javax.jms.MessageProducer;import javax.jms.Session;import javax.jms.StreamMessage;import org.apache.activemq.ActiveMQConnectionFactory;import org.springframework.context.support.ClassPathXmlApplicationContext;/** * 消息的创建者 * @author longgangbai * */public class StreamMsgProducer {public static void main(String[] args) {ClassPathXmlApplicationContext ctx=new ClassPathXmlApplicationContext("activemq-jdbc.xml");ActiveMQConnectionFactory activeMqfactory=(ActiveMQConnectionFactory)ctx.getBean("connectionFactory");Connection conn = null;try {conn = activeMqfactory.createConnection();conn.start();Session session = conn.createSession(false,Session.AUTO_ACKNOWLEDGE);Destination queue = session.createQueue("streamMsg");MessageProducer producer = session.createProducer(queue);             File file=new File("C:\\send.txt");InputStream in = new FileInputStream(file);byte[] buffer = new byte;int c = -1;while ((c = in.read(buffer)) > 0) {StreamMessage smsg = session.createStreamMessage();smsg.writeBytes(buffer, 0, c);producer.send(smsg);System.out.println("send: " + c);}in.close();} catch (JMSException e) {e.printStackTrace();} catch (IOException e) {e.printStackTrace();} finally {if (conn != null) {try {conn.close();} catch (JMSException e) {e.printStackTrace();}}}}} 
客户端代码:
package easyway.activemq.app.demo3;import java.io.FileOutputStream;import java.io.IOException;import java.io.OutputStream;import javax.jms.Connection;import javax.jms.Destination;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.MessageConsumer;import javax.jms.Session;import javax.jms.StreamMessage;import org.apache.activemq.ActiveMQConnectionFactory;import org.apache.activemq.xbean.BrokerFactoryBean;import org.springframework.context.support.ClassPathXmlApplicationContext;/** * 消息的消费者 * @author longgangbai * */public class StreamMsgConsumer {public void receive() {ClassPathXmlApplicationContext ctx=new ClassPathXmlApplicationContext("activemq-jdbc.xml");ActiveMQConnectionFactory activeMqfactory=(ActiveMQConnectionFactory)ctx.getBean("connectionFactory");Connection conn = null;try {conn = activeMqfactory.createConnection();conn.start();Session session = conn.createSession(false,Session.AUTO_ACKNOWLEDGE);Destination queue = session.createQueue("streamMsg");MessageConsumer consumer = session.createConsumer(queue);OutputStream out = new FileOutputStream("c:\\receive.txt");byte[] buffer = new byte;while (true) {Message msg = consumer.receive(5000);if (msg == null) {break;}if (msg instanceof StreamMessage) {StreamMessage smsg = (StreamMessage) msg;int c = smsg.readBytes(buffer);out.write(buffer, 0, c);System.out.println("Receive: " + c);}}out.close();} catch (JMSException e) {e.printStackTrace();} catch (IOException e) {e.printStackTrace();} finally {if (conn != null) {try {conn.close();} catch (JMSException e) {e.printStackTrace();}}}}public static void main(String[] args) {new StreamMsgConsumer().receive();}} 
activemq的配置如下:
<?xml version="1.0" encoding="UTF-8"?><beansxmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsdhttp://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd"><!-- Allows us to use system properties as variables in this configuration file --><bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/><broker useJmx="false" brokerName="jdbcBroker" xmlns="http://activemq.apache.org/schema/core">    <persistenceAdapter>       <jdbcPersistenceAdapter dataDirectory="activemq-data" dataSource="#derby-ds"/>    </persistenceAdapter>    <transportConnectors>       <transportConnector name="default" uri="tcp://localhost:61619"/>    </transportConnectors></broker><bean id="derbyds" class="org.apache.activemq.store.jdbc.adapter.DB2JDBCAdapter"/>   <!-- Embedded Derby DataSource Sample Setup --><bean id="derby-ds" class="org.apache.derby.jdbc.EmbeddedDataSource">    <property name="databaseName" value="derbydb"/>    <property name="createDatabase" value="create"/></bean></beans> 
activemq-jdbc.xml的配置如下:
<?xml version="1.0" encoding="UTF-8"?><beansxmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsdhttp://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">   <bean id="broker" class="org.apache.activemq.xbean.BrokerFactoryBean">      <property name="config"value="classpath:activemq.xml"/>      <property name="start"value="true"/>   </bean>      <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory" depends-on="broker">      <property name="brokerURL" value="tcp://localhost:61619"/>   </bean></beans> 
 
 
页: [1]
查看完整版本: ActiveMQ基于derby数据库的spring整合