1 package org.apache.jcs.auxiliary.lateral.socket.tcp;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.io.IOException;
23 import java.io.ObjectInputStream;
24 import java.io.ObjectOutputStream;
25 import java.io.Serializable;
26 import java.net.InetAddress;
27 import java.net.ServerSocket;
28 import java.net.Socket;
29 import java.util.HashMap;
30
31 import org.apache.commons.logging.Log;
32 import org.apache.commons.logging.LogFactory;
33 import org.apache.jcs.auxiliary.lateral.LateralCacheInfo;
34 import org.apache.jcs.auxiliary.lateral.LateralElementDescriptor;
35 import org.apache.jcs.auxiliary.lateral.behavior.ILateralCacheListener;
36 import org.apache.jcs.auxiliary.lateral.socket.tcp.behavior.ITCPLateralCacheAttributes;
37 import org.apache.jcs.engine.behavior.ICacheElement;
38 import org.apache.jcs.engine.behavior.ICompositeCacheManager;
39 import org.apache.jcs.engine.control.CompositeCache;
40 import org.apache.jcs.engine.control.CompositeCacheManager;
41
42 import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
43 import EDU.oswego.cs.dl.util.concurrent.ThreadFactory;
44
45 /***
46 * Listens for connections from other TCP lateral caches and handles them. The
47 * initialization method starts a listening thread, which creates a socket
48 * server. When messages are received they are passed to a pooled executor which
49 * then calls the appropriate handle method.
50 */
51 public class LateralTCPListener
52 implements ILateralCacheListener, Serializable
53 {
54 private static final long serialVersionUID = -9107062664967131738L;
55
56 private final static Log log = LogFactory.getLog( LateralTCPListener.class );
57
58 /*** How long the server will block on an accept(). 0 is infinte. */
59 private final static int acceptTimeOut = 0;
60
61 /*** The CacheHub this listener is associated with */
62 private transient ICompositeCacheManager cacheManager;
63
64 /*** Map of available instances, keyed by port */
65 protected final static HashMap instances = new HashMap();
66
67 /*** The socket listener */
68 private ListenerThread receiver;
69
70 private ITCPLateralCacheAttributes tcpLateralCacheAttributes;
71
72 private int port;
73
74 private PooledExecutor pooledExecutor;
75
76 private int putCnt = 0;
77
78 private int removeCnt = 0;
79
80 private int getCnt = 0;
81
82 /***
83 * Use the vmid by default. This can be set for testing. If we ever need to
84 * run more than one per vm, then we need a new technique.
85 */
86 private long listenerId = LateralCacheInfo.listenerId;
87
88 /***
89 * Gets the instance attribute of the LateralCacheTCPListener class.
90 * <p>
91 * @param ilca
92 * ITCPLateralCacheAttributes
93 * @param cacheMgr
94 * @return The instance value
95 */
96 public synchronized static ILateralCacheListener getInstance( ITCPLateralCacheAttributes ilca,
97 ICompositeCacheManager cacheMgr )
98 {
99 ILateralCacheListener ins = (ILateralCacheListener) instances.get( String.valueOf( ilca.getTcpListenerPort() ) );
100
101 if ( ins == null )
102 {
103 ins = new LateralTCPListener( ilca );
104
105 ins.init();
106
107 ins.setCacheManager( cacheMgr );
108
109 instances.put( String.valueOf( ilca.getTcpListenerPort() ), ins );
110
111 if ( log.isDebugEnabled() )
112 {
113 log.debug( "created new listener " + ilca.getTcpListenerPort() );
114 }
115 }
116
117 return ins;
118 }
119
120 /***
121 * Only need one since it does work for all regions, just reference by
122 * multiple region names.
123 * <p>
124 * @param ilca
125 */
126 protected LateralTCPListener( ITCPLateralCacheAttributes ilca )
127 {
128 this.setTcpLateralCacheAttributes( ilca );
129 }
130
131 /***
132 * This starts the ListenerThread on the specified port.
133 */
134 public void init()
135 {
136 try
137 {
138 this.port = getTcpLateralCacheAttributes().getTcpListenerPort();
139
140 receiver = new ListenerThread();
141 receiver.setDaemon( true );
142 receiver.start();
143
144 pooledExecutor = new PooledExecutor();
145 pooledExecutor.setThreadFactory( new MyThreadFactory() );
146 }
147 catch ( Exception ex )
148 {
149 log.error( ex );
150
151 throw new IllegalStateException( ex.getMessage() );
152 }
153 }
154
155 /***
156 * Let the lateral cache set a listener_id. Since there is only one
157 * listerenr for all the regions and every region gets registered? the id
158 * shouldn't be set if it isn't zero. If it is we assume that it is a
159 * reconnect.
160 * <p>
161 * By default, the listener id is the vmid.
162 * <p>
163 * The service should set this value. This value will never be changed by a
164 * server we connect to. It needs to be non static, for unit tests.
165 * <p>
166 * The service will use the value it sets in all send requests to the
167 * sender.
168 * <p>
169 * @param id
170 * The new listenerId value
171 * @throws IOException
172 */
173 public void setListenerId( long id )
174 throws IOException
175 {
176 this.listenerId = id;
177 if ( log.isDebugEnabled() )
178 {
179 log.debug( "set listenerId = " + id );
180 }
181 }
182
183 /***
184 * Gets the listenerId attribute of the LateralCacheTCPListener object
185 * <p>
186 * @return The listenerId value
187 * @throws IOException
188 */
189 public long getListenerId()
190 throws IOException
191 {
192 return this.listenerId;
193 }
194
195 /***
196 * Increments the put count. Gets the cache that was injected by the lateral
197 * factory. Calls put on the cache.
198 * <p>
199 * @see org.apache.jcs.engine.behavior.ICacheListener#handlePut(org.apache.jcs.engine.behavior.ICacheElement)
200 */
201 public void handlePut( ICacheElement element )
202 throws IOException
203 {
204 putCnt++;
205 if ( log.isInfoEnabled() )
206 {
207 if ( getPutCnt() % 100 == 0 )
208 {
209 log.info( "Put Count (port " + getTcpLateralCacheAttributes().getTcpListenerPort() + ") = "
210 + getPutCnt() );
211 }
212 }
213
214 if ( log.isDebugEnabled() )
215 {
216 log.debug( "handlePut> cacheName=" + element.getCacheName() + ", key=" + element.getKey() );
217 }
218
219 getCache( element.getCacheName() ).localUpdate( element );
220 }
221
222 /***
223 * Increments the remove count. Gets the cache that was injected by the
224 * lateral factory. Calls remove on the cache.
225 * <p>
226 * @see org.apache.jcs.engine.behavior.ICacheListener#handleRemove(java.lang.String,
227 * java.io.Serializable)
228 */
229 public void handleRemove( String cacheName, Serializable key )
230 throws IOException
231 {
232 removeCnt++;
233 if ( log.isInfoEnabled() )
234 {
235 if ( getRemoveCnt() % 100 == 0 )
236 {
237 log.info( "Remove Count = " + getRemoveCnt() );
238 }
239 }
240
241 if ( log.isDebugEnabled() )
242 {
243 log.debug( "handleRemove> cacheName=" + cacheName + ", key=" + key );
244 }
245
246 getCache( cacheName ).localRemove( key );
247 }
248
249 /***
250 * Gets the cache that was injected by the lateral factory. Calls removeAll
251 * on the cache.
252 * <p>
253 * @see org.apache.jcs.engine.behavior.ICacheListener#handleRemoveAll(java.lang.String)
254 */
255 public void handleRemoveAll( String cacheName )
256 throws IOException
257 {
258 if ( log.isDebugEnabled() )
259 {
260 log.debug( "handleRemoveAll> cacheName=" + cacheName );
261 }
262
263 getCache( cacheName ).localRemoveAll();
264 }
265
266 /***
267 * Gets the cache that was injected by the lateral factory. Calls get on the
268 * cache.
269 * <p>
270 * @param cacheName
271 * @param key
272 * @return Serializable
273 * @throws IOException
274 */
275 public Serializable handleGet( String cacheName, Serializable key )
276 throws IOException
277 {
278 getCnt++;
279 if ( log.isInfoEnabled() )
280 {
281 if ( getGetCnt() % 100 == 0 )
282 {
283 log.info( "Get Count (port " + getTcpLateralCacheAttributes().getTcpListenerPort() + ") = "
284 + getGetCnt() );
285 }
286 }
287
288 if ( log.isDebugEnabled() )
289 {
290 log.debug( "handleGet> cacheName=" + cacheName + ", key = " + key );
291 }
292
293 return getCache( cacheName ).localGet( key );
294 }
295
296 /***
297 * Right now this does nothing.
298 * <p>
299 * @see org.apache.jcs.engine.behavior.ICacheListener#handleDispose(java.lang.String)
300 */
301 public void handleDispose( String cacheName )
302 throws IOException
303 {
304 if ( log.isInfoEnabled() )
305 {
306 log.info( "handleDispose > cacheName=" + cacheName );
307 }
308
309
310
311
312 }
313
314 /***
315 * Gets the cacheManager attribute of the LateralCacheTCPListener object.
316 * <p>
317 * Normally this is set by the factory. If it wasn't set the listener
318 * defaults to the expected singleton behavior of the cache amanger.
319 * <p>
320 * @param name
321 * @return CompositeCache
322 */
323 protected CompositeCache getCache( String name )
324 {
325 if ( getCacheManager() == null )
326 {
327
328 setCacheManager( CompositeCacheManager.getInstance() );
329
330 if ( log.isDebugEnabled() )
331 {
332 log.debug( "cacheMgr = " + getCacheManager() );
333 }
334 }
335
336 return getCacheManager().getCache( name );
337 }
338
339 /***
340 * This is roughly the number of updates the lateral has received.
341 * <p>
342 * @return Returns the putCnt.
343 */
344 public int getPutCnt()
345 {
346 return putCnt;
347 }
348
349 /***
350 * @return Returns the getCnt.
351 */
352 public int getGetCnt()
353 {
354 return getCnt;
355 }
356
357 /***
358 * @return Returns the removeCnt.
359 */
360 public int getRemoveCnt()
361 {
362 return removeCnt;
363 }
364
365 /***
366 * @param cacheMgr
367 * The cacheMgr to set.
368 */
369 public void setCacheManager( ICompositeCacheManager cacheMgr )
370 {
371 this.cacheManager = cacheMgr;
372 }
373
374 /***
375 * @return Returns the cacheMgr.
376 */
377 public ICompositeCacheManager getCacheManager()
378 {
379 return cacheManager;
380 }
381
382 /***
383 * @param tcpLateralCacheAttributes
384 * The tcpLateralCacheAttributes to set.
385 */
386 public void setTcpLateralCacheAttributes( ITCPLateralCacheAttributes tcpLateralCacheAttributes )
387 {
388 this.tcpLateralCacheAttributes = tcpLateralCacheAttributes;
389 }
390
391 /***
392 * @return Returns the tcpLateralCacheAttributes.
393 */
394 public ITCPLateralCacheAttributes getTcpLateralCacheAttributes()
395 {
396 return tcpLateralCacheAttributes;
397 }
398
399 /***
400 * Processes commands from the server socket. There should be one listener
401 * for each configured TCP lateral.
402 */
403 public class ListenerThread
404 extends Thread
405 {
406 /*** Main processing method for the ListenerThread object */
407 public void run()
408 {
409 try
410 {
411 log.info( "Listening on port " + port );
412
413 ServerSocket serverSocket = new ServerSocket( port );
414 serverSocket.setSoTimeout( acceptTimeOut );
415
416 ConnectionHandler handler;
417
418 while ( true )
419 {
420 if ( log.isDebugEnabled() )
421 {
422 log.debug( "Waiting for clients to connect " );
423 }
424
425 Socket socket = serverSocket.accept();
426
427 if ( log.isDebugEnabled() )
428 {
429 InetAddress inetAddress = socket.getInetAddress();
430
431 log.debug( "Connected to client at " + inetAddress );
432 }
433
434 handler = new ConnectionHandler( socket );
435
436 pooledExecutor.execute( handler );
437 }
438 }
439 catch ( Exception e )
440 {
441 log.error( "Exception caught in TCP listener", e );
442 }
443 }
444 }
445
446 /***
447 * A Separate thread taht runs when a command comes into the
448 * LateralTCPReceiver.
449 */
450 public class ConnectionHandler
451 implements Runnable
452 {
453 private Socket socket;
454
455 /***
456 * Construct for a given socket
457 * @param socket
458 */
459 public ConnectionHandler( Socket socket )
460 {
461 this.socket = socket;
462 }
463
464 /***
465 * Main processing method for the LateralTCPReceiverConnection object
466 */
467 public void run()
468 {
469 ObjectInputStream ois;
470
471 try
472 {
473 ois = new ObjectInputStream( socket.getInputStream() );
474 }
475 catch ( Exception e )
476 {
477 log.error( "Could not open ObjectInputStream on " + socket, e );
478
479 return;
480 }
481
482 LateralElementDescriptor led;
483
484 try
485 {
486 while ( true )
487 {
488 led = (LateralElementDescriptor) ois.readObject();
489
490 if ( led == null )
491 {
492 log.debug( "LateralElementDescriptor is null" );
493 continue;
494 }
495 if ( led.requesterId == getListenerId() )
496 {
497 log.debug( "from self" );
498 }
499 else
500 {
501 if ( log.isDebugEnabled() )
502 {
503 log.debug( "receiving LateralElementDescriptor from another" + "led = " + led
504 + ", led.command = " + led.command + ", led.ce = " + led.ce );
505 }
506
507 handle( led );
508 }
509 }
510 }
511 catch ( java.io.EOFException e )
512 {
513 log.info( "Caught java.io.EOFException closing connection." );
514 }
515 catch ( java.net.SocketException e )
516 {
517 log.info( "Caught java.net.SocketException closing connection." );
518 }
519 catch ( Exception e )
520 {
521 log.error( "Unexpected exception.", e );
522 }
523
524 try
525 {
526 ois.close();
527 }
528 catch ( Exception e )
529 {
530 log.error( "Could not close object input stream.", e );
531 }
532 }
533
534 /***
535 * This calls the appropriate method, based on the command sent in the
536 * Lateral element descriptor.
537 * <p>
538 * @param led
539 * @throws IOException
540 */
541 private void handle( LateralElementDescriptor led )
542 throws IOException
543 {
544 String cacheName = led.ce.getCacheName();
545 Serializable key = led.ce.getKey();
546
547 if ( led.command == LateralElementDescriptor.UPDATE )
548 {
549 handlePut( led.ce );
550 }
551 else if ( led.command == LateralElementDescriptor.REMOVE )
552 {
553
554
555
556 if ( led.valHashCode != -1 )
557 {
558 if ( getTcpLateralCacheAttributes().isFilterRemoveByHashCode() )
559 {
560 ICacheElement test = getCache( cacheName ).localGet( key );
561 if ( test != null )
562 {
563 if ( test.getVal().hashCode() == led.valHashCode )
564 {
565 if ( log.isDebugEnabled() )
566 {
567 log.debug( "Filtering detected identical hashCode [" + led.valHashCode
568 + "], not issuing a remove for led " + led );
569 }
570 return;
571 }
572 else
573 {
574 if ( log.isDebugEnabled() )
575 {
576 log.debug( "Different hashcodes, in cache [" + test.getVal().hashCode()
577 + "] sent [" + led.valHashCode + "]" );
578 }
579 }
580 }
581 }
582 }
583 handleRemove( cacheName, key );
584 }
585 else if ( led.command == LateralElementDescriptor.REMOVEALL )
586 {
587 handleRemoveAll( cacheName );
588 }
589 else if ( led.command == LateralElementDescriptor.GET )
590 {
591 Serializable obj = handleGet( cacheName, key );
592
593 ObjectOutputStream oos = new ObjectOutputStream( socket.getOutputStream() );
594
595 if ( oos != null )
596 {
597 oos.writeObject( obj );
598 oos.flush();
599 }
600 }
601 }
602 }
603
604 /***
605 * Allows us to set the daemon status on the executor threads
606 * <p>
607 * @author aaronsm
608 */
609 class MyThreadFactory
610 implements ThreadFactory
611 {
612
613
614
615
616 public Thread newThread( Runnable runner )
617 {
618 Thread t = new Thread( runner );
619 t.setDaemon( true );
620 t.setPriority( Thread.MIN_PRIORITY );
621 return t;
622 }
623 }
624 }