当前位置: 代码迷 >> Web前端 >> 基于Weblogic8的JMS行列实例配置
  详细解决方案

基于Weblogic8的JMS行列实例配置

热度:243   发布时间:2013-01-17 10:28:54.0
基于Weblogic8的JMS队列实例配置

预计达成目标:Weblogic8下实现消息的异步传输与监听

?

为了实现不间断发送与监听,使用了Quartz自动调度,这个不是这篇文章重点,不做讲解,也可以将quartz去掉

quartz引用的jar包比较多,这里就不作为附件了,只上传jms需要依赖的jar包?,其中还有weblogic.jar,太大了就没有上传,可以在weblogic的安装bin目录下找到.?

?

配置步骤

一、建立 Weblogic Domain:建立步骤不再累述

二、在Console控制台配置连接工厂与消息队列

????? 1、配置连接工厂?

????????

? 按如下路径点击Service>JMS> Connection Factories在右侧弹出的页面,点击链接Configure a new JMS Connection Factory...”,在新页面中输入NameJNDI Name此处两处均输入连接工厂名为jms/connFactory,点击“Create则生成JMS连接工厂

??? 如下图:

????

?

?

?

?????? 2、配置消息队列

????? 如下图位置配置队列,这个jmsServer是自己建立的

?

????????

?

?

到此位置就配置了一个 名称为 jms/connFactory?? 连接工厂以及名为 queueasd 的队列

?

?

三、消息发送测试类

?

package com.javasd.jms;

import java.util.Properties;

import javax.jms.BytesMessage;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.naming.Context;
import javax.naming.InitialContext;

import org.quartz.CronTrigger;
import org.quartz.Job;
import org.quartz.JobDetail;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.quartz.Scheduler;
import org.quartz.SchedulerFactory;
import org.quartz.impl.StdSchedulerFactory;

public class TestJmsConnection implements Job {
	
	public static void main(String args[]) throws Exception {
		
		//quartz调度,不做讲解
		SchedulerFactory schedFact=new StdSchedulerFactory();
		Scheduler sched=schedFact.getScheduler();
		sched.start();
		
		
		JobDetail jobDetail=new JobDetail("a","b",TestJmsConnection.class);
        jobDetail.getJobDataMap().put("name","lucy");
       
        CronTrigger trigger=new  CronTrigger("c","d");
        trigger.setCronExpression("0/100 * * * * ? " ); // 启动之后立即执行 每一秒继续重复。
        sched.scheduleJob(jobDetail, trigger);
			
		
	}

	public void execute(JobExecutionContext arg0) throws JobExecutionException {
		
		try{
			
			
			//这里才是jms发送端的逻辑
			Properties properties = new Properties();
			//设置连接属性
			//这个设置是固定的
			properties.put(Context.INITIAL_CONTEXT_FACTORY, "weblogic.jndi.WLInitialContextFactory");
			//设置端口及IP 我的domain在本地,端口为9001
			properties.put(Context.PROVIDER_URL, "t3://localhost:9001");
			//与weblogic的console用户名密码一致
			properties.put(Context.SECURITY_PRINCIPAL, "weblogic");
			properties.put(Context.SECURITY_CREDENTIALS, "weblogic");
			//实例化上下文
			Context ctx = new InitialContext(properties);
				
				//获取连接工厂
				QueueConnectionFactory queueFactory = (QueueConnectionFactory)ctx.lookup("jms/connFactory");
				//根据连接工厂获取连接
				QueueConnection queueConn = queueFactory.createQueueConnection();
				//根据连接获取操作的session实例
				QueueSession qSession = queueConn.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
				//通过上下文jndi查找配置的队列,我配置的队列名称及jndi名称均为 queueasd
				Queue queue = (Queue)ctx.lookup("queueasd");
				//通过session以及指定的队列实例化消息发送器
				QueueSender queueSender = qSession.createSender(queue);
				//打开连接
				queueConn.start();
				
				//测试用要发送的字符串
				String s = "just test sending of jms under weblogic8!";
				//转换为byte数组
				byte[] bytes = s.getBytes();
				int byteLength = bytes.length;
				//通过session的createBytesMessage方法实例化一个jms识别的消息实体对象
				BytesMessage bytesMessage = qSession.createBytesMessage(); 
				//为这个message对象设置值
				//设置消息的长度
				bytesMessage.writeInt(byteLength);
				//设置消息的内容
				bytesMessage.writeBytes(bytes);
				//发送消息
				queueSender.send(bytesMessage);
		} catch(Exception e ){
			e.printStackTrace();
		}
		
		
	}
	
	
	
}

?

?????? ?

?四、消息接收类

package com.javasd.jms;

import java.util.Properties;

import javax.jms.BytesMessage;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueReceiver;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;

import org.quartz.CronTrigger;
import org.quartz.Job;
import org.quartz.JobDetail;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.quartz.Scheduler;
import org.quartz.SchedulerFactory;
import org.quartz.impl.StdSchedulerFactory;


public class TestJmsReceive implements Job{

	public static void main(String args[]) throws Exception {
		
		try{
			//quartz
			SchedulerFactory schedFact=new StdSchedulerFactory();
			Scheduler sched=schedFact.getScheduler();
			sched.start();
			
			
			JobDetail jobDetail=new JobDetail("a","b",TestJmsReceive.class);
            jobDetail.getJobDataMap().put("name","lucy");
           
            CronTrigger trigger=new  CronTrigger("c","d");
            trigger.setCronExpression("0/5 * * * * ? " ); // 启动之后立即执行 每一秒继续重复。
            sched.scheduleJob(jobDetail, trigger);

			
			
		} catch(Exception e){
			
		}
		
	   
		
	    
	    
		
	}

	public void execute(JobExecutionContext arg0) throws JobExecutionException {
		
		//与sender中的定义一样,在实际应用中可以考虑单独讲这些设置为静态变量
		Properties properties = new Properties();
		properties.put(Context.INITIAL_CONTEXT_FACTORY, "weblogic.jndi.WLInitialContextFactory");
		properties.put(Context.PROVIDER_URL, "t3://localhost:9001");
		properties.put(Context.SECURITY_PRINCIPAL, "weblogic");
		properties.put(Context.SECURITY_CREDENTIALS, "weblogic");
		
		
		Context ctx = null;
		QueueConnection qConn = null;
		QueueConnectionFactory queueFactory = null;
		QueueSession qSession = null;
		Queue queue = null;
		
		QueueReceiver qreceiver = null;
		try {
			ctx = new InitialContext(properties);
			queueFactory = (QueueConnectionFactory)ctx.lookup("jms/connFactory");
			qConn = queueFactory.createQueueConnection();
			qSession = qConn.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
			queue = (Queue) ctx.lookup("queueasd");
			//从这里开始与sender有差别
			//用指定的队列构造Receiver
			qreceiver = qSession.createReceiver(queue);
			//为这个receiver设置监听器MessageListener,使用的内部类
			
			qreceiver.setMessageListener(new MessageListener() {
				public void onMessage(Message msg)  {
					//如果接收到的消息是byte类型的message,可以使text等类型,这个可以自己尝试
					if(msg instanceof BytesMessage){
						//转换为jms识别的消息对象
						BytesMessage bytesMessage = (BytesMessage)msg;
						try {
							int length = bytesMessage.readInt();
							
							byte[] bytes = new byte[length];
							//获取传送过来的消息
							bytesMessage.readBytes(bytes);
							//将消息进行展现
							String s = new String(bytes);
							System.out.println(s);
							
						} catch (JMSException e) {
							// TODO Auto-generated catch block
							e.printStackTrace();
						}
						
					}
					
				}
		}
);
			//开启连结进行监听
			qConn.start();

			
		} catch (Exception e1) {
			// TODO Auto-generated catch block
			e1.printStackTrace();
		}
	    
	}
}

?

?

这样就完成了一个最基本的JMS消息发送以及接收的测试实例,在实际项目应用中项目组用到的jms对连接池以及多线程进行了优化,但是在应对实际情况的时候还是力不从心,不知大家怎么实现这个jms规范的亦或者是有更好的技术解决方案,望不吝赐教

?

?

?

?

?

???

??????

?

?

?

  相关解决方案