首先进入weblogic控制台,配置JMS
相关参数如下:
新建两个FileStore目标分别为两个受管服务器,新建两个JMS服务器,目标分别为两个受管服务器
新建JMS模块
新建两个部署,目标如下:
新建连接工厂,选择第一个子部署,新建分布式topic选择第二个子部署
创建EJB项目,新建Topic类型的消息驱动bean,代码如下:
package mdb;
import javax.ejb.ActivationConfigProperty;
import javax.ejb.EJBException;
import javax.ejb.MessageDriven;
import javax.ejb.MessageDrivenBean;
import javax.ejb.MessageDrivenContext;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
@MessageDriven(mappedName = "jms/t", activationConfig = {
@ActivationConfigProperty(propertyName = "acknowledgeMode", propertyValue = "Auto-acknowledge"),
@ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Topic"),
@ActivationConfigProperty(propertyName = "subscriptionDurability", propertyValue = "Durable"),
@ActivationConfigProperty(propertyName = "clientId", propertyValue = "MsgDrvTest"),
@ActivationConfigProperty(propertyName = "subscriptionName", propertyValue = "MsgDrvTest") })
public class MsgDrvTest implements MessageListener,MessageDrivenBean {
public void onMessage(Message message) {
TextMessage tm = (TextMessage) message;
try {
String text = tm.getText();
System.out.println("Received new message:"+text);
} catch (JMSException e) {
e.printStackTrace();
}
}
public void ejbRemove() throws EJBException {
System.out.println("ejbRemove called");
}
public void setMessageDrivenContext(MessageDrivenContext ctx)
throws EJBException {
System.out.println("setMessageDrivenContext called");
}
}
创建web项目,新建一个servlet,代码如下:
package topic;
import java.io.IOException;
import java.io.PrintWriter;
import javax.jms.DeliveryMode;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
public class TPublisher extends HttpServlet {
private InitialContext ic;
private Context initContext;
private TopicConnectionFactory tcf;
private TopicConnection tcon;
private TopicSession tsession;
private Topic topic;
private TopicPublisher publisher;
private TextMessage msg;
public void doGet(HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException {
response.setContentType("text/html");
PrintWriter out = response.getWriter();
new TPublisher().publish();
out.println("</HTML>");
out.flush();
out.close();
}
public void doPost(HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException {
this.doGet(request, response);
}
public void publish(){
try {
ic =new InitialContext();
initContext = ic;
tcf = (TopicConnectionFactory) initContext.lookup("jms/cf");
tcon = tcf.createTopicConnection();
tsession = tcon.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
topic = (Topic) initContext.lookup("jms/t");
initContext.close();
publisher = tsession.createPublisher(topic);
publisher.setDeliveryMode(DeliveryMode.PERSISTENT);
String messageText = null;
Integer i = 0;
while(true){
System.out.println("Enter message to send or 'quit':");
i++;
messageText = i.toString();
System.out.println(messageText);
if("quit".equals(messageText))
break;
msg = tsession.createTextMessage(messageText);
publisher.publish(msg);
Thread.sleep(10000);
}
System.out.println("Exiting...");
tcon.close();
System.out.println("GoodBye!");
} catch (Exception e) {
e.printStackTrace();
System.exit(1);
}
}
}
将EJB打包成jar文件部署到ManagedServer_1上,命名test_ejb_mdb
将WEB程序打包成war文件部署到ManagedServer_2命名test_jms
配置集群时做好的启动命令
注意先启动代理服务器再启动受管服务器,直到全部处于运行状态
查看EJB和WEB应用程序的运行状况:
最终两个受管服务器发布订阅消息