在weblogic 控制台创建两种模式的对应资源
Queue(点对点消息传送模型)
在点对点消息传送模型中,应用程序由消息队列,发送者,接收者组成。每一个消息发送给一个特殊的消息队列,该队列保存了所有发送给它的消息(除了被接收者消费掉的和过期的消息)。点对点消息模型有一些特性,如下:
每个消息只有一个接收者;
消息发送者和接收者并没有时间依赖性;
当消息发送者发送消息的时候,无论接收者程序在不在运行,都能获取到消息;
当接收者收到消息的时候,会发送确认收到通知(acknowledgement)。
package com.landing.quene;
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.NamingException;
import org.junit.Test;
import com.landing.utils.ContextUtil;
//消息发送方
public class Sender {
@Test
public void SendMessage() {
Context ctx = ContextUtil.getContext();
QueueConnection conn = null;
QueueSession session = null;
try {
//根据JNDI获取QueueConnectionFactory对象
QueueConnectionFactory qConnectionFactory = (QueueConnectionFactory) ctx.lookup("javax.jms.QueueConnectionFactory");
//根据QueueConnectionFactory对象获取QueueConnection对象
conn = qConnectionFactory.createQueueConnection();
//根据QueueConnection对象获取QueueSession对象那个
session = conn.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
//根据JNDI回去queue对象
Queue queue = (Queue) ctx.lookup("queue_test");
//创建消息发送者
QueueSender sender = (QueueSender) session.createSender(queue);
//创建消息
TextMessage tMessage = session.createTextMessage("hello world");
//发送消息
sender.send(tMessage);
} catch (NamingException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally{
if(session != null){
try {
session.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
if(conn != null) {
try {
conn.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
}
package com.landing.quene;
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueReceiver;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.NamingException;
import com.landing.utils.ContextUtil;
//消息接收方
public class Receiver {
public static TextMessage receiveMessage() {
Context ctx = ContextUtil.getContext();
QueueConnection conn = null;
QueueSession session = null;
TextMessage tMessage = null;
try {
//根据JNDI获取QueueConnectionFactory对象
QueueConnectionFactory qConnectionFactory = (QueueConnectionFactory) ctx.lookup("javax.jms.QueueConnectionFactory");
//根据QueueConnectionFactory对象获取QueueConnection对象
conn = qConnectionFactory.createQueueConnection();
//根据QueueConnection对象获取QueueSession对象那个
session = conn.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
//根据JNDI回去queue对象
Queue queue = (Queue) ctx.lookup("queue_test");
//创建消息接收方
QueueReceiver receive = (QueueReceiver) session.createReceiver(queue);
//开始接收消息
conn.start();
tMessage = (TextMessage) receive.receive();
} catch (NamingException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally{
if(session != null){
try {
session.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
if(conn != null) {
try {
conn.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
return tMessage;
}
public static void main(String[] args) {
System.out.println(receiveMessage());
}
}
Topic(发布/订阅消息传递模型)
在发布/订阅消息模型中,发布者发布一个消息,该消息通过topic传递给所有的客户端。在这种模型中,发布者和订阅者彼此不知道对方,是匿名的且可以动态发布和订阅topic。topic主要用于保存和传递消息,且会一直保存消息直到消息被传递给客户端。
发布/订阅消息模型特性如下:
一个消息可以传递给多个订阅者
发布者和订阅者有时间依赖性,只有当客户端创建订阅后才能接受消息,且订阅者需一直保持活动状态以接收消息。
为了缓和这样严格的时间相关性,JMS允许订阅者创建一个可持久化的订阅。这样,即使订阅者没有被激活(运行),它也能接收到发布者的消息。
package com.landing.topic;
import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.naming.Context;
import javax.naming.NamingException;
import org.junit.Test;
import com.landing.utils.ContextUtil;
//消息发布方
public class Publisher {
@Test
public void sendMessage() {
Context ctx = ContextUtil.getContext();
TopicConnection conn = null;
TopicSession session = null;
try {
//获取连接工厂
TopicConnectionFactory connFactory = (TopicConnectionFactory) ctx.lookup("javax.jms.TopicConnectionFactory");
//根据连接工厂获取对应的Connection对象
conn = connFactory.createTopicConnection();
//根据Connection对象获取Session对象
session = conn.createTopicSession(false,Session.AUTO_ACKNOWLEDGE);
//根据JNDI获取topic对象
Topic topic = (Topic) ctx.lookup("topic_test");
//创建消息发布者
TopicPublisher topicPublisher = session.createPublisher(topic);
TextMessage message = session.createTextMessage();
message.setText("Hello World");
topicPublisher.publish(message);
} catch (NamingException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally{
if(session != null){
try {
session.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
if(conn != null) {
try {
conn.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
}
package com.landing.topic;
import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import javax.naming.Context;
import javax.naming.NamingException;
import org.junit.Test;
import com.landing.utils.ContextUtil;
//消息接收方
public class Subcriber {
@Test
public TextMessage receiveMessage() {
Context ctx = ContextUtil.getContext();
TopicConnection conn = null;
TopicSession session = null;
TextMessage message = null;
try {
//获取连接工厂
TopicConnectionFactory connFactory = (TopicConnectionFactory) ctx.lookup("javax.jms.TopicConnectionFactory");
//根据连接工厂获取对应的Connection对象
conn = connFactory.createTopicConnection();
//根据Connection对象获取Session对象
session = conn.createTopicSession(false,Session.AUTO_ACKNOWLEDGE);
//根据JNDI获取topic对象
Topic topic = (Topic) ctx.lookup("topic_test");
//创建消息订阅者
TopicSubscriber topicSubscriber = session.createSubscriber(topic);
//开启消息接收
conn.start();
message = (TextMessage) topicSubscriber.receive();
} catch (NamingException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally{
if(session != null){
try {
session.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
if(conn != null) {
try {
conn.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
return message;
}
}
//通过ContextUtil来获取Context对象
package com.landing.utils;
import java.util.Hashtable;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
public class ContextUtil {
private static Context context;
public static Context getContext() {
Hashtable env = new Hashtable();
env.put(Context.INITIAL_CONTEXT_FACTORY, "weblogic.jndi.WLInitialContextFactory");
env.put(Context.PROVIDER_URL, "t3://localhost:7001");
try {
if(context == null) {
context = new InitialContext(env);
}
} catch (NamingException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return context;
}
}