View Javadoc

1   package org.apache.jcs.auxiliary.lateral.socket.tcp.discovery;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.io.ByteArrayInputStream;
23  import java.io.IOException;
24  import java.io.ObjectInputStream;
25  import java.net.DatagramPacket;
26  import java.net.InetAddress;
27  import java.net.MulticastSocket;
28  import java.util.ArrayList;
29  import java.util.Iterator;
30  
31  import org.apache.commons.logging.Log;
32  import org.apache.commons.logging.LogFactory;
33  import org.apache.jcs.auxiliary.lateral.LateralCacheAttributes;
34  import org.apache.jcs.auxiliary.lateral.LateralCacheInfo;
35  import org.apache.jcs.auxiliary.lateral.LateralCacheNoWait;
36  import org.apache.jcs.auxiliary.lateral.socket.tcp.LateralTCPCacheManager;
37  import org.apache.jcs.auxiliary.lateral.socket.tcp.TCPLateralCacheAttributes;
38  import org.apache.jcs.auxiliary.lateral.socket.tcp.behavior.ITCPLateralCacheAttributes;
39  import org.apache.jcs.engine.behavior.ICache;
40  import org.apache.jcs.engine.behavior.ICompositeCacheManager;
41  import org.apache.jcs.engine.behavior.IShutdownObserver;
42  
43  import EDU.oswego.cs.dl.util.concurrent.BoundedBuffer;
44  import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
45  import EDU.oswego.cs.dl.util.concurrent.ThreadFactory;
46  
47  /***
48   * Receives UDP Discovery messages.
49   */
50  public class UDPDiscoveryReceiver
51      implements Runnable, IShutdownObserver
52  {
53      private final static Log log = LogFactory.getLog( UDPDiscoveryReceiver.class );
54  
55      private final byte[] m_buffer = new byte[65536];
56  
57      private MulticastSocket m_socket;
58  
59      // todo consider using the threadpool manager to
60      // get this thread pool
61      // for now place a tight restrcition on the pool size
62      private static final int maxPoolSize = 10;
63  
64      private PooledExecutor pooledExecutor = null;
65  
66      // number of messages received.
67      private int cnt = 0;
68  
69      /***
70       * Service to get cache names and hande request broadcasts
71       */
72      protected UDPDiscoveryService service = null;
73  
74      private String multicastAddressString = "";
75  
76      private int multicastPort = 0;
77  
78      private ICompositeCacheManager cacheMgr;
79  
80      private boolean shutdown = false;
81  
82      /***
83       * Constructor for the LateralUDPReceiver object.
84       * <p>
85       * We determine out own host using InetAddress
86       *
87       * @param service
88       * @param multicastAddressString
89       * @param multicastPort
90       * @param cacheMgr
91       * @exception IOException
92       */
93      public UDPDiscoveryReceiver( UDPDiscoveryService service, String multicastAddressString, int multicastPort,
94                                  ICompositeCacheManager cacheMgr )
95          throws IOException
96      {
97          this.service = service;
98          this.multicastAddressString = multicastAddressString;
99          this.multicastPort = multicastPort;
100         this.cacheMgr = cacheMgr;
101 
102         // create a small thread pool to handle a barage
103         pooledExecutor = new PooledExecutor( new BoundedBuffer( 100 ), maxPoolSize );
104         pooledExecutor.discardOldestWhenBlocked();
105         //pooledExecutor.setMinimumPoolSize(1);
106         pooledExecutor.setThreadFactory( new MyThreadFactory() );
107 
108         if ( log.isInfoEnabled() )
109         {
110             log.info( "constructing listener, [" + this.multicastAddressString + ":" + this.multicastPort + "]" );
111         }
112 
113         try
114         {
115             createSocket( this.multicastAddressString, this.multicastPort );
116         }
117         catch ( IOException ioe )
118         {
119             // consider eatign this so we can go on, or constructing the socket
120             // later
121             throw ioe;
122         }
123     }
124 
125     /***
126      * Creates the socket for this class.
127      *
128      * @param multicastAddressString
129      * @param multicastPort
130      * @throws IOException
131      */
132     private void createSocket( String multicastAddressString, int multicastPort )
133         throws IOException
134     {
135         try
136         {
137             m_socket = new MulticastSocket( multicastPort );
138             m_socket.joinGroup( InetAddress.getByName( multicastAddressString ) );
139         }
140         catch ( IOException e )
141         {
142             log.error( "Could not bind to multicast address [" + multicastAddressString + ":" + multicastPort + "]", e );
143             throw e;
144         }
145     }
146 
147     /***
148      * Highly unreliable. If it is processing one message while another comes in ,
149      * the second message is lost. This is for low concurency peppering.
150      *
151      * @return the object message
152      * @throws IOException
153      */
154     public Object waitForMessage()
155         throws IOException
156     {
157         final DatagramPacket packet = new DatagramPacket( m_buffer, m_buffer.length );
158 
159         Object obj = null;
160         try
161         {
162             m_socket.receive( packet );
163 
164             final ByteArrayInputStream byteStream = new ByteArrayInputStream( m_buffer, 0, packet.getLength() );
165 
166             final ObjectInputStream objectStream = new ObjectInputStream( byteStream );
167 
168             obj = objectStream.readObject();
169 
170         }
171         catch ( Exception e )
172         {
173             log.error( "Error receving multicast packet", e );
174         }
175         return obj;
176     }
177 
178     /*** Main processing method for the LateralUDPReceiver object */
179     public void run()
180     {
181         try
182         {
183             while ( !shutdown )
184             {
185                 Object obj = waitForMessage();
186 
187                 // not thread safe, but just for debugging
188                 cnt++;
189 
190                 if ( log.isDebugEnabled() )
191                 {
192                     log.debug( getCnt() + " messages received." );
193                 }
194 
195                 UDPDiscoveryMessage message = null;
196 
197                 try
198                 {
199                     message = (UDPDiscoveryMessage) obj;
200                     // check for null
201                     if ( message != null )
202                     {
203                         MessageHandler handler = new MessageHandler( message );
204 
205                         pooledExecutor.execute( handler );
206 
207                         if ( log.isDebugEnabled() )
208                         {
209                             log.debug( "Passed handler to executor." );
210                         }
211                     }
212                     else
213                     {
214                         log.warn( "message is null" );
215                     }
216                 }
217                 catch ( ClassCastException cce )
218                 {
219                     log.warn( "Received unknown message type " + cce.getMessage() );
220                 }
221             } // end while
222         }
223         catch ( Exception e )
224         {
225             log.error( "Unexpected exception in UDP receiver.", e );
226             try
227             {
228                 Thread.sleep( 100 );
229                 // TODO consider some failure count so we don't do this
230                 // forever.
231             }
232             catch ( Exception e2 )
233             {
234                 log.error( "Problem sleeping", e2 );
235             }
236         }
237         return;
238     }
239 
240     /***
241      * @param cnt
242      *            The cnt to set.
243      */
244     public void setCnt( int cnt )
245     {
246         this.cnt = cnt;
247     }
248 
249     /***
250      * @return Returns the cnt.
251      */
252     public int getCnt()
253     {
254         return cnt;
255     }
256 
257     /***
258      * Separate thread run when a command comes into the UDPDiscoveryReceiver.
259      */
260     public class MessageHandler
261         implements Runnable
262     {
263         private UDPDiscoveryMessage message = null;
264 
265         /***
266          * @param message
267          */
268         public MessageHandler( UDPDiscoveryMessage message )
269         {
270             this.message = message;
271         }
272 
273         /*
274          * (non-Javadoc)
275          *
276          * @see java.lang.Runnable#run()
277          */
278         public void run()
279         {
280             // consider comparing ports here instead.
281             if ( message.getRequesterId() == LateralCacheInfo.listenerId )
282             {
283                 if ( log.isDebugEnabled() )
284                 {
285                     log.debug( "from self" );
286                 }
287             }
288             else
289             {
290                 if ( log.isDebugEnabled() )
291                 {
292                     log.debug( "from another" );
293                     log.debug( "Message = " + message );
294                 }
295 
296                 // if this is a request message, have the service handle it and
297                 // return
298                 if ( message.getMessageType() == UDPDiscoveryMessage.REQUEST_BROADCAST )
299                 {
300                     if ( log.isDebugEnabled() )
301                     {
302                         log.debug( "Message is a Request Broadcase, will have the service handle it." );
303                     }
304                     service.serviceRequestBroadcast();
305                     return;
306                 }
307 
308                 try
309                 {
310                     // get a cache and add it to the no waits
311                     // the add method should not add the same.
312                     // we need the listener port from the original config.
313                     ITCPLateralCacheAttributes lca = null;
314                     if ( service.getTcpLateralCacheAttributes() != null )
315                     {
316                         lca = (ITCPLateralCacheAttributes) service.getTcpLateralCacheAttributes().copy();
317                     }
318                     else
319                     {
320                         lca = new TCPLateralCacheAttributes();
321                     }
322                     lca.setTransmissionType( LateralCacheAttributes.TCP );
323                     lca.setTcpServer( message.getHost() + ":" + message.getPort() );
324                     LateralTCPCacheManager lcm = LateralTCPCacheManager.getInstance( lca, cacheMgr );
325 
326                     ArrayList regions = message.getCacheNames();
327                     if ( regions != null )
328                     {
329                         // for each region get the cache
330                         Iterator it = regions.iterator();
331                         while ( it.hasNext() )
332                         {
333                             String cacheName = (String) it.next();
334 
335                             try
336                             {
337                                 ICache ic = lcm.getCache( cacheName );
338 
339                                 if ( log.isDebugEnabled() )
340                                 {
341                                     log.debug( "Got cache, ic = " + ic );
342                                 }
343 
344                                 // add this to the nowaits for this cachename
345                                 if ( ic != null )
346                                 {
347                                     service.addNoWait( (LateralCacheNoWait) ic );
348                                     if ( log.isDebugEnabled() )
349                                     {
350                                         log.debug( "Called addNoWait for cacheName " + cacheName );
351                                     }
352                                 }
353                             }
354                             catch ( Exception e )
355                             {
356                                 log.error( "Problem creating no wait", e );
357                             }
358                         }
359                         // end while
360                     }
361                     else
362                     {
363                         log.warn( "No cache names found in message " + message );
364                     }
365                 }
366                 catch ( Exception e )
367                 {
368                     log.error( "Problem getting lateral maanger", e );
369                 }
370             }
371         }
372     }
373 
374     /***
375      * Allows us to set the daemon status on the executor threads
376      *
377      * @author aaronsm
378      *
379      */
380     class MyThreadFactory
381         implements ThreadFactory
382     {
383         /*
384          * (non-Javadoc)
385          *
386          * @see EDU.oswego.cs.dl.util.concurrent.ThreadFactory#newThread(java.lang.Runnable)
387          */
388         public Thread newThread( Runnable runner )
389         {
390             Thread t = new Thread( runner );
391             t.setDaemon( true );
392             t.setPriority( Thread.MIN_PRIORITY );
393             return t;
394         }
395     }
396 
397     /*
398      * (non-Javadoc)
399      *
400      * @see org.apache.jcs.engine.behavior.ShutdownObserver#shutdown()
401      */
402     public void shutdown()
403     {
404         try
405         {
406             shutdown = true;
407             m_socket.close();
408             pooledExecutor.shutdownNow();
409         }
410         catch ( Exception e )
411         {
412             log.error( "Problem closing socket" );
413         }
414     }
415 }
416 // end class