001 package com.croftsoft.core.net.jms; 002 003 // imported J2SE packages 004 005 import java.io.Serializable; 006 import javax.naming.Context; 007 import javax.naming.InitialContext; 008 import javax.naming.NamingException; 009 import java.util.*; 010 011 // imported J2EE packages 012 013 import javax.jms.JMSException; 014 import javax.jms.Message; 015 import javax.jms.MessageListener; 016 import javax.jms.ObjectMessage; 017 import javax.jms.Session; 018 import javax.jms.Topic; 019 import javax.jms.TopicConnection; 020 import javax.jms.TopicConnectionFactory; 021 import javax.jms.TopicPublisher; 022 import javax.jms.TopicSession; 023 import javax.jms.TopicSubscriber; 024 025 // imported CroftSoft packages 026 027 import com.croftsoft.core.lang.lifecycle.Lifecycle; 028 import com.croftsoft.core.lang.NullArgumentException; 029 import com.croftsoft.core.util.queue.ListQueue; 030 import com.croftsoft.core.util.queue.Queue; 031 032 /********************************************************************* 033 * Exchanges serializable Objects with a Topic via Queues. 034 * 035 * @version 036 * $Id: Courier.java,v 1.3 2008/09/20 04:12:46 croft Exp $ 037 * @since 038 * 2001-02-22 039 * @author 040 * <a href="https://www.croftsoft.com/">David Wallace Croft</a> 041 *********************************************************************/ 042 043 public final class Courier 044 implements Lifecycle, MessageListener, Runnable 045 ////////////////////////////////////////////////////////////////////// 046 ////////////////////////////////////////////////////////////////////// 047 { 048 049 private static final String 050 DEFAULT_JNDI_TOPIC_CONNECTION_FACTORY_NAME 051 = "jms/TopicConnectionFactory"; 052 053 private static final String DEFAULT_JNDI_TOPIC_NAME = "jms/Topic"; 054 055 private static final int STATE_UNINITIALIZED = 0; 056 057 private static final int STATE_INITIALIZED = 1; 058 059 private static final int STATE_STARTED = 2; 060 061 ////////////////////////////////////////////////////////////////////// 062 ////////////////////////////////////////////////////////////////////// 063 064 private final Queue incomingQueue; 065 066 private final Queue outgoingQueue; 067 068 private final String jndiTopicName; 069 070 private final String jndiTopicConnectionFactoryName; 071 072 private int state = STATE_UNINITIALIZED; 073 074 private TopicConnection topicConnection; 075 076 private TopicSession topicSession; 077 078 private TopicPublisher topicPublisher; 079 080 private TopicSubscriber topicSubscriber; 081 082 private Thread thread; 083 084 private Object lockObject = new Object ( ); 085 086 private boolean isOkToRun = false; 087 088 ////////////////////////////////////////////////////////////////////// 089 ////////////////////////////////////////////////////////////////////// 090 091 public static void main ( String [ ] args ) 092 ////////////////////////////////////////////////////////////////////// 093 { 094 Queue incomingQueue = new ListQueue ( new ArrayList ( ) ); 095 096 Queue outgoingQueue = new ListQueue ( new ArrayList ( ) ); 097 098 Serializable outgoingSerializable = "Test"; 099 100 if ( args.length > 0 ) 101 { 102 outgoingSerializable = args [ 0 ]; 103 } 104 105 String jndiTopicName = DEFAULT_JNDI_TOPIC_NAME; 106 107 if ( args.length > 1 ) 108 { 109 jndiTopicName = args [ 1 ]; 110 } 111 112 String jndiTopicConnectionFactoryName 113 = DEFAULT_JNDI_TOPIC_CONNECTION_FACTORY_NAME; 114 115 if ( args.length > 2 ) 116 { 117 jndiTopicConnectionFactoryName = args [ 2 ]; 118 } 119 120 Lifecycle lifecycle = new Courier ( incomingQueue, outgoingQueue, 121 jndiTopicName, jndiTopicConnectionFactoryName ); 122 123 System.out.println ( "Initializing..." ); 124 125 lifecycle.init ( ); 126 127 System.out.println ( "Starting..." ); 128 129 lifecycle.start ( ); 130 131 System.out.println ( "Transmitting..." ); 132 133 outgoingQueue.append ( outgoingSerializable ); 134 135 System.out.println ( "Receiving..." ); 136 137 try 138 { 139 System.out.println ( incomingQueue.pull ( ) ); 140 } 141 catch ( InterruptedException ex ) 142 { 143 ex.printStackTrace ( ); 144 } 145 146 System.out.println ( "Stopping..." ); 147 148 lifecycle.stop ( ); 149 150 System.out.println ( "Destroying..." ); 151 152 lifecycle.destroy ( ); 153 } 154 155 ////////////////////////////////////////////////////////////////////// 156 ////////////////////////////////////////////////////////////////////// 157 158 public Courier ( 159 Queue incomingQueue, 160 Queue outgoingQueue, 161 String jndiTopicName, 162 String jndiTopicConnectionFactoryName ) 163 ////////////////////////////////////////////////////////////////////// 164 { 165 NullArgumentException.check ( this.incomingQueue = incomingQueue ); 166 167 NullArgumentException.check ( this.outgoingQueue = outgoingQueue ); 168 169 NullArgumentException.check ( this.jndiTopicName = jndiTopicName ); 170 171 NullArgumentException.check ( this.jndiTopicConnectionFactoryName 172 = jndiTopicConnectionFactoryName ); 173 } 174 175 public Courier ( 176 Queue incomingQueue, 177 Queue outgoingQueue, 178 String jndiTopicName ) 179 ////////////////////////////////////////////////////////////////////// 180 { 181 this ( incomingQueue, outgoingQueue, jndiTopicName, 182 DEFAULT_JNDI_TOPIC_CONNECTION_FACTORY_NAME ); 183 } 184 185 public Courier ( 186 Queue incomingQueue, 187 Queue outgoingQueue ) 188 ////////////////////////////////////////////////////////////////////// 189 { 190 this ( incomingQueue, outgoingQueue, DEFAULT_JNDI_TOPIC_NAME ); 191 } 192 193 ////////////////////////////////////////////////////////////////////// 194 ////////////////////////////////////////////////////////////////////// 195 196 public synchronized void init ( ) 197 ////////////////////////////////////////////////////////////////////// 198 { 199 if ( state != STATE_UNINITIALIZED ) 200 { 201 throw new IllegalStateException ( "already initialized" ); 202 } 203 204 try 205 { 206 Context context = new InitialContext ( ); 207 208 TopicConnectionFactory topicConnectionFactory 209 = ( TopicConnectionFactory ) context.lookup ( 210 jndiTopicConnectionFactoryName ); 211 212 Topic topic = ( Topic ) context.lookup ( jndiTopicName ); 213 214 topicConnection = topicConnectionFactory.createTopicConnection ( ); 215 216 topicSession = topicConnection.createTopicSession ( 217 false, Session.AUTO_ACKNOWLEDGE ); 218 219 topicPublisher = topicSession.createPublisher ( topic ); 220 221 String messageSelector = null; 222 223 boolean noLocal = true; 224 225 topicSubscriber = topicSession.createSubscriber ( 226 topic, messageSelector, noLocal ); 227 228 topicSubscriber.setMessageListener ( this ); 229 230 state = STATE_INITIALIZED; 231 } 232 catch ( NamingException ex ) 233 { 234 ex.printStackTrace ( ); 235 } 236 catch ( JMSException ex ) 237 { 238 ex.printStackTrace ( ); 239 240 // Do we need to close some stuff here? 241 242 // Do I need a finalize method? 243 } 244 } 245 246 public synchronized void start ( ) 247 ////////////////////////////////////////////////////////////////////// 248 { 249 if ( state != STATE_INITIALIZED ) 250 { 251 throw new IllegalStateException ( 252 "not initialized or already started" ); 253 } 254 255 // Use of the lockObject ensures that a new thread cannot be started 256 // until the previously running thread has completed. 257 258 synchronized ( lockObject ) 259 { 260 isOkToRun = true; 261 262 try 263 { 264 topicConnection.start ( ); 265 266 thread = new Thread ( this ); 267 268 thread.start ( ); 269 270 state = STATE_STARTED; 271 } 272 catch ( JMSException ex ) 273 { 274 // do some cleanup here? 275 ex.printStackTrace ( ); 276 } 277 } 278 } 279 280 public void onMessage ( Message message ) 281 ////////////////////////////////////////////////////////////////////// 282 { 283 // No need to synchronize as the Session passes messages serially. 284 285 try 286 { 287 if ( message instanceof ObjectMessage ) 288 { 289 Object messageObject 290 = ( ( ObjectMessage ) message ).getObject ( ); 291 292 incomingQueue.append ( messageObject ); 293 } 294 else 295 { 296 // ... else what? 297 } 298 } 299 catch ( Exception ex ) 300 { 301 // must catch all Exceptions 302 303 ex.printStackTrace ( ); 304 } 305 } 306 307 public void run ( ) 308 ////////////////////////////////////////////////////////////////////// 309 { 310 if ( thread != Thread.currentThread ( ) ) 311 { 312 throw new IllegalStateException ( "call start() instead" ); 313 } 314 315 // Use of the lockObject ensures that a new thread cannot be started 316 // until the previously running thread has completed. 317 318 synchronized ( lockObject ) 319 { 320 while ( isOkToRun ) 321 { 322 try 323 { 324 Serializable outgoingSerializable 325 = ( Serializable ) outgoingQueue.pull ( ); 326 327 ObjectMessage objectMessage 328 = topicSession.createObjectMessage ( ); 329 330 objectMessage.setObject ( outgoingSerializable ); 331 332 topicPublisher.publish ( objectMessage ); 333 } 334 catch ( InterruptedException ex ) 335 { 336 // Will exit loop if isOkToRun is now false. 337 } 338 catch ( JMSException ex ) 339 { 340 ex.printStackTrace ( ); 341 342 // What kind of cleanup and state transition here? 343 344 isOkToRun = false; 345 } 346 } 347 } 348 } 349 350 public synchronized void stop ( ) 351 ////////////////////////////////////////////////////////////////////// 352 { 353 if ( state != STATE_STARTED ) 354 { 355 throw new IllegalStateException ( "not started" ); 356 } 357 358 isOkToRun = false; 359 360 thread.interrupt ( ); 361 362 thread = null; 363 364 try 365 { 366 topicConnection.stop ( ); 367 } 368 catch ( JMSException ex ) 369 { 370 ex.printStackTrace ( ); 371 372 // what kind of clean-up here? 373 } 374 375 state = STATE_INITIALIZED; 376 } 377 378 public synchronized void destroy ( ) 379 ////////////////////////////////////////////////////////////////////// 380 { 381 if ( state != STATE_INITIALIZED ) 382 { 383 throw new IllegalStateException ( "not initialized" ); 384 } 385 386 try 387 { 388 topicSubscriber.close ( ); 389 } 390 catch ( Exception ex ) 391 { 392 ex.printStackTrace ( ); 393 } 394 395 // what about others? 396 397 try 398 { 399 topicConnection.close ( ); 400 } 401 catch ( Exception ex ) 402 { 403 ex.printStackTrace ( ); 404 } 405 406 state = STATE_UNINITIALIZED; 407 } 408 409 ////////////////////////////////////////////////////////////////////// 410 ////////////////////////////////////////////////////////////////////// 411 }