第一次接触使用jms,参考了论坛大神的代码,现在自己做了一个简单实现案列,首先需要在weblogic建立jms服务,jms模块等等,可以参照论坛大神发的相关帖子。建立好服务后新建一个工程,目录结构如下:
当然需要导入weblogic的jar包,由于经常要用到context上下文对象,建立了一个工具类:
package com.landingbj.util;
import java.util.Properties;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
/**
* TODO 获取jndi上下文对象工具类
* @author psf
*
*/
public class ContextManager {
private static Context context=null;
public static Context getContext(){
if (context==null) {
Properties p=new Properties();
p.put(Context.INITIAL_CONTEXT_FACTORY,"weblogic.jndi.WLInitialContextFactory");
p.put(Context.PROVIDER_URL, "t3://localhost:7001");
try {
context=new InitialContext(p);
} catch (NamingException e) {
e.printStackTrace();
}
}
return context;
}
}
其中topic消息生产者代码:
package com.landingbj.topic;
import java.util.Date;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.NamingException;
import com.landingbj.util.ContextManager;
/**
* TODO jms topic发送信息
* @author psf
*
*/
public class SendMessage {
//文本信息
private TextMessage message;
//信息制造者
private MessageProducer sender;
public static void main(String[] args) {
SendMessage sendMessage=new SendMessage();
for (int i = 0; i <10; i++) {
sendMessage.send("topic mess"+i);
}
}
public SendMessage() {
Context context=ContextManager.getContext();
try {
//从weblogic中获取连接工厂
ConnectionFactory factory=(ConnectionFactory) context.lookup("myconn");
//获取连接
Connection connection=factory.createConnection();
//创建会话
Session session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//从weblogic中的主题获取管理对象Destination
Destination destination=(Destination) context.lookup("Topic-0");
// 创建一个消息生产者
sender=session.createProducer(destination);
message=session.createTextMessage();
} catch (NamingException e) {
e.printStackTrace();
} catch (JMSException e) {
e.printStackTrace();
}
}
public void send(String mess){
try {
message.setText(mess);
sender.send(message);
message.setText("当前时间:"+new Date().toString());
sender.send(message);
} catch (JMSException e) {
e.printStackTrace();
};
}
}
消息消费者:
package com.landingbj.topic;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.naming.Context;
import javax.naming.NamingException;
import com.landingbj.util.ContextManager;
/**
* TODO 接收topic信息
* @author psf
*
*/
public class ReceiveMessage {
private TextMessage message;
//消息消费者顶级接口,其子接口有QueueReceiver和TopicSubscriber
private MessageConsumer receiver;
public static void main(String[] args) {
ReceiveMessage receiveMessage=new ReceiveMessage();
for (int i = 0; i <20; i++) {
receiveMessage.receive();
}
}
public ReceiveMessage() {
Context context=ContextManager.getContext();
try {
ConnectionFactory factory=(ConnectionFactory) context.lookup("myconn");
Connection connection=factory.createConnection();
//clientID是JMS server用来唯一标记链接的,因此在全局中不能重复
connection.setClientID("landingbj");
//"开启"消息接收,此后即可接收消息;不过对于Producer而言,无论链接处于何种状态,均可以发送消息;此方法主要对消费者有效
connection.start();
//创建会话,并指定此Session的事务性和消息确认模式
//AUTO_ACKNOWLEDGE 消息自动确认,CLIENT_ACKNOWLEDGE:客户端确认,DUPS_OK_ACKNOWLEDGE:"可重复消息确认"
Session session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic=(Topic) context.lookup("Topic-0");
//创建一个“耐久订阅者”,并指定订阅者的名称(name,需要全局唯一)
receiver=session.createDurableSubscriber(topic, "landingbj");
message=session.createTextMessage();
} catch (JMSException e) {
e.printStackTrace();
} catch (NamingException e) {
e.printStackTrace();
}
}
public void receive(){
try {
if(message!=null){
message=(TextMessage) receiver.receive();
System.out.println(message.getText());
}else {
System.out.println("没有信息可读!");
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}
jms队列消息发送
package com.landingbj.queue;
import java.util.Date;
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.NamingException;
import com.landingbj.util.ContextManager;
/**
* TODO jms队列发送信息
* @author psf
*
*/
public class SendMessage {
private QueueSender sender;
private TextMessage message;
public static void main(String[] args) {
SendMessage sendMessage=new SendMessage();
for (int i = 0; i <5; i++) {
sendMessage.send("queue mess"+i);
}
}
public SendMessage() {
Context context=ContextManager.getContext();
try {
QueueConnectionFactory queueConnectionFactory=(QueueConnectionFactory) context.lookup("myconn");
QueueConnection queueConnection=queueConnectionFactory.createQueueConnection();
QueueSession session=queueConnection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE);
Queue queue=(Queue) context.lookup("Queue-0");
sender=session.createSender(queue);
message=session.createTextMessage();
} catch (NamingException e) {
e.printStackTrace();
} catch (JMSException e) {
e.printStackTrace();
}
}
public void send(String mess){
try {
message.setText(mess);
sender.send(message);
message.setText("当前时间"+new Date().toString());
sender.send(message);
} catch (JMSException e) {
e.printStackTrace();
};
}
}
消息接收
package com.landingbj.queue;
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueReceiver;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.NamingException;
import com.landingbj.util.ContextManager;
public class ReceiveMessage {
private QueueReceiver receiver;
private TextMessage message;
public static void main(String[] args) {
ReceiveMessage receiveMessage=new ReceiveMessage();
for (int i = 0; i <10; i++) {
receiveMessage.receive();
}
}
public ReceiveMessage() {
Context context=ContextManager.getContext();
try {
QueueConnectionFactory queueConnectionFactory=(QueueConnectionFactory) context.lookup("myconn");
QueueConnection queueConnection = queueConnectionFactory.createQueueConnection();
queueConnection.start();
QueueSession session=queueConnection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE);
Queue queue=(Queue) context.lookup("Queue-0");
receiver=session.createReceiver(queue);
message=session.createTextMessage();
} catch (JMSException e) {
e.printStackTrace();
} catch (NamingException e) {
e.printStackTrace();
}
}
public void receive(){
try {
if(message!=null){
message=(TextMessage) receiver.receive();
System.out.println(message.getText());
}else {
System.out.println("没有信息可读!");
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}
weblogic控制台信息如下:
目前,就写到这里,相关概念可以参考其它相关帖子。