Coverage report

  %line %branch
org.apache.jcs.engine.CacheEventQueue$RemoveAllEvent
0% 
0% 

 1  
 package org.apache.jcs.engine;
 2  
 
 3  
 /*
 4  
  * Licensed to the Apache Software Foundation (ASF) under one
 5  
  * or more contributor license agreements.  See the NOTICE file
 6  
  * distributed with this work for additional information
 7  
  * regarding copyright ownership.  The ASF licenses this file
 8  
  * to you under the Apache License, Version 2.0 (the
 9  
  * "License"); you may not use this file except in compliance
 10  
  * with the License.  You may obtain a copy of the License at
 11  
  *
 12  
  *   http://www.apache.org/licenses/LICENSE-2.0
 13  
  *
 14  
  * Unless required by applicable law or agreed to in writing,
 15  
  * software distributed under the License is distributed on an
 16  
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 17  
  * KIND, either express or implied.  See the License for the
 18  
  * specific language governing permissions and limitations
 19  
  * under the License.
 20  
  */
 21  
 
 22  
 import java.io.IOException;
 23  
 import java.io.Serializable;
 24  
 import java.util.ArrayList;
 25  
 
 26  
 import org.apache.commons.logging.Log;
 27  
 import org.apache.commons.logging.LogFactory;
 28  
 import org.apache.jcs.engine.behavior.ICacheElement;
 29  
 import org.apache.jcs.engine.behavior.ICacheEventQueue;
 30  
 import org.apache.jcs.engine.behavior.ICacheListener;
 31  
 import org.apache.jcs.engine.stats.StatElement;
 32  
 import org.apache.jcs.engine.stats.Stats;
 33  
 import org.apache.jcs.engine.stats.behavior.IStatElement;
 34  
 import org.apache.jcs.engine.stats.behavior.IStats;
 35  
 
 36  
 /**
 37  
  * An event queue is used to propagate ordered cache events to one and only one target listener.
 38  
  * <p>
 39  
  * This is a modified version of the experimental version. It should lazy initilaize the processor
 40  
  * thread, and kill the thread if the queue goes emtpy for a specified period, now set to 1 minute.
 41  
  * If something comes in after that a new processor thread should be created.
 42  
  */
 43  
 public class CacheEventQueue
 44  
     implements ICacheEventQueue
 45  
 {
 46  
     /** The logger. */
 47  
     private static final Log log = LogFactory.getLog( CacheEventQueue.class );
 48  
 
 49  
     /** The type of queue -- there are pooled and single */
 50  
     private static final int queueType = SINGLE_QUEUE_TYPE;
 51  
 
 52  
     /** default */
 53  
     private static final int DEFAULT_WAIT_TO_DIE_MILLIS = 10000;
 54  
 
 55  
     /**
 56  
      * time to wait for an event before snuffing the background thread if the queue is empty. make
 57  
      * configurable later
 58  
      */
 59  
     private int waitToDieMillis = DEFAULT_WAIT_TO_DIE_MILLIS;
 60  
 
 61  
     /**
 62  
      * When the events are pulled off the queue, the tell the listener to handle the specific event
 63  
      * type. The work is done by the listener.
 64  
      */
 65  
     private ICacheListener listener;
 66  
 
 67  
     /** Id of the listener registed with this queue */
 68  
     private long listenerId;
 69  
 
 70  
     /** The cache region name, if applicable. */
 71  
     private String cacheName;
 72  
 
 73  
     /** Maximum number of failures before we buy the farm. */
 74  
     private int maxFailure;
 75  
 
 76  
     /** in milliseconds */
 77  
     private int waitBeforeRetry;
 78  
 
 79  
     /** this is true if there is no worker thread. */
 80  
     private boolean destroyed = true;
 81  
 
 82  
     /**
 83  
      * This means that the queue is functional. If we reached the max number of failures, the queue
 84  
      * is marked as non functional and will never work again.
 85  
      */
 86  
     private boolean working = true;
 87  
 
 88  
     /** the thread that works the queue. */
 89  
     private Thread processorThread;
 90  
 
 91  
     /** sync */
 92  
     private Object queueLock = new Object();
 93  
 
 94  
     /** the head of the queue */
 95  
     private Node head = new Node();
 96  
 
 97  
     /** the end of the queue */
 98  
     private Node tail = head;
 99  
 
 100  
     /** Number of items in the queue */
 101  
     private int size = 0;
 102  
 
 103  
     /**
 104  
      * Constructs with the specified listener and the cache name.
 105  
      * <p>
 106  
      * @param listener
 107  
      * @param listenerId
 108  
      * @param cacheName
 109  
      */
 110  
     public CacheEventQueue( ICacheListener listener, long listenerId, String cacheName )
 111  
     {
 112  
         this( listener, listenerId, cacheName, 10, 500 );
 113  
     }
 114  
 
 115  
     /**
 116  
      * Constructor for the CacheEventQueue object
 117  
      * <p>
 118  
      * @param listener
 119  
      * @param listenerId
 120  
      * @param cacheName
 121  
      * @param maxFailure
 122  
      * @param waitBeforeRetry
 123  
      */
 124  
     public CacheEventQueue( ICacheListener listener, long listenerId, String cacheName, int maxFailure,
 125  
                             int waitBeforeRetry )
 126  
     {
 127  
         if ( listener == null )
 128  
         {
 129  
             throw new IllegalArgumentException( "listener must not be null" );
 130  
         }
 131  
 
 132  
         this.listener = listener;
 133  
         this.listenerId = listenerId;
 134  
         this.cacheName = cacheName;
 135  
         this.maxFailure = maxFailure <= 0 ? 3 : maxFailure;
 136  
         this.waitBeforeRetry = waitBeforeRetry <= 0 ? 500 : waitBeforeRetry;
 137  
 
 138  
         if ( log.isDebugEnabled() )
 139  
         {
 140  
             log.debug( "Constructed: " + this );
 141  
         }
 142  
     }
 143  
 
 144  
     /**
 145  
      * What type of queue is this.
 146  
      * <p>
 147  
      * (non-Javadoc)
 148  
      * @see org.apache.jcs.engine.behavior.ICacheEventQueue#getQueueType()
 149  
      */
 150  
     public int getQueueType()
 151  
     {
 152  
         return queueType;
 153  
     }
 154  
 
 155  
     /**
 156  
      * Kill the processor thread and indicate that the queue is detroyed and no longer alive, but it
 157  
      * can still be working.
 158  
      */
 159  
     public synchronized void stopProcessing()
 160  
     {
 161  
         destroyed = true;
 162  
         processorThread = null;
 163  
     }
 164  
 
 165  
     /**
 166  
      * Returns the time to wait for events before killing the background thread.
 167  
      * @return int
 168  
      */
 169  
     public int getWaitToDieMillis()
 170  
     {
 171  
         return waitToDieMillis;
 172  
     }
 173  
 
 174  
     /**
 175  
      * Sets the time to wait for events before killing the background thread.
 176  
      * <p>
 177  
      * @param wtdm the ms for the q to sit idle.
 178  
      */
 179  
     public void setWaitToDieMillis( int wtdm )
 180  
     {
 181  
         waitToDieMillis = wtdm;
 182  
     }
 183  
 
 184  
     /**
 185  
      * Creates a brief string identifying the listener and the region.
 186  
      * <p>
 187  
      * @return String debugging info.
 188  
      */
 189  
     public String toString()
 190  
     {
 191  
         return "CacheEventQueue [listenerId=" + listenerId + ", cacheName=" + cacheName + "]";
 192  
     }
 193  
 
 194  
     /**
 195  
      * If they queue has an active thread it is considered alive.
 196  
      * <p>
 197  
      * @return The alive value
 198  
      */
 199  
     public synchronized boolean isAlive()
 200  
     {
 201  
         return ( !destroyed );
 202  
     }
 203  
 
 204  
     /**
 205  
      * Sets whether the queue is actively processing -- if there are working threads.
 206  
      * <p>
 207  
      * @param aState
 208  
      */
 209  
     public synchronized void setAlive( boolean aState )
 210  
     {
 211  
         destroyed = !aState;
 212  
     }
 213  
 
 214  
     /**
 215  
      * @return The listenerId value
 216  
      */
 217  
     public long getListenerId()
 218  
     {
 219  
         return listenerId;
 220  
     }
 221  
 
 222  
     /**
 223  
      * Event Q is emtpy.
 224  
      * <p>
 225  
      * Calling destroy interupts the processor thread.
 226  
      */
 227  
     public synchronized void destroy()
 228  
     {
 229  
         if ( !destroyed )
 230  
         {
 231  
             destroyed = true;
 232  
 
 233  
             if ( log.isInfoEnabled() )
 234  
             {
 235  
                 log.info( "Destroying queue, stats =  " + getStatistics() );
 236  
             }
 237  
 
 238  
             // sychronize on queue so the thread will not wait forever,
 239  
             // and then interrupt the QueueProcessor
 240  
 
 241  
             if ( processorThread != null )
 242  
             {
 243  
                 synchronized ( queueLock )
 244  
                 {
 245  
                     processorThread.interrupt();
 246  
                 }
 247  
             }
 248  
             processorThread = null;
 249  
 
 250  
             if ( log.isInfoEnabled() )
 251  
             {
 252  
                 log.info( "Cache event queue destroyed: " + this );
 253  
             }
 254  
         }
 255  
         else
 256  
         {
 257  
             if ( log.isInfoEnabled() )
 258  
             {
 259  
                 log.info( "Destroy was called after queue was destroyed.  Doing nothing.  Stats =  " + getStatistics() );
 260  
             }
 261  
         }
 262  
     }
 263  
 
 264  
     /**
 265  
      * This adds a put event ot the queue. When it is processed, the element will be put to the
 266  
      * listener.
 267  
      * <p>
 268  
      * @param ce The feature to be added to the PutEvent attribute
 269  
      * @exception IOException
 270  
      */
 271  
     public synchronized void addPutEvent( ICacheElement ce )
 272  
         throws IOException
 273  
     {
 274  
         if ( isWorking() )
 275  
         {
 276  
             put( new PutEvent( ce ) );
 277  
         }
 278  
         else
 279  
         {
 280  
             if ( log.isWarnEnabled() )
 281  
             {
 282  
                 log.warn( "Not enqueuing Put Event for [" + this + "] because it's non-functional." );
 283  
             }
 284  
         }
 285  
     }
 286  
 
 287  
     /**
 288  
      * This adds a remove event to the queue. When processed the listener's remove method will be
 289  
      * called for the key.
 290  
      * <p>
 291  
      * @param key The feature to be added to the RemoveEvent attribute
 292  
      * @exception IOException
 293  
      */
 294  
     public synchronized void addRemoveEvent( Serializable key )
 295  
         throws IOException
 296  
     {
 297  
         if ( isWorking() )
 298  
         {
 299  
             put( new RemoveEvent( key ) );
 300  
         }
 301  
         else
 302  
         {
 303  
             if ( log.isWarnEnabled() )
 304  
             {
 305  
                 log.warn( "Not enqueuing Remove Event for [" + this + "] because it's non-functional." );
 306  
             }
 307  
         }
 308  
     }
 309  
 
 310  
     /**
 311  
      * This adds a remove all event to the queue. When it is processed, all elements will be removed
 312  
      * from the cache.
 313  
      * <p>
 314  
      * @exception IOException
 315  
      */
 316  
     public synchronized void addRemoveAllEvent()
 317  
         throws IOException
 318  
     {
 319  
         if ( isWorking() )
 320  
         {
 321  
             put( new RemoveAllEvent() );
 322  
         }
 323  
         else
 324  
         {
 325  
             if ( log.isWarnEnabled() )
 326  
             {
 327  
                 log.warn( "Not enqueuing RemoveAll Event for [" + this + "] because it's non-functional." );
 328  
             }
 329  
         }
 330  
     }
 331  
 
 332  
     /**
 333  
      * @exception IOException
 334  
      */
 335  
     public synchronized void addDisposeEvent()
 336  
         throws IOException
 337  
     {
 338  
         if ( isWorking() )
 339  
         {
 340  
             put( new DisposeEvent() );
 341  
         }
 342  
         else
 343  
         {
 344  
             if ( log.isWarnEnabled() )
 345  
             {
 346  
                 log.warn( "Not enqueuing Dispose Event for [" + this + "] because it's non-functional." );
 347  
             }
 348  
         }
 349  
     }
 350  
 
 351  
     /**
 352  
      * Adds an event to the queue.
 353  
      * <p>
 354  
      * @param event
 355  
      */
 356  
     private void put( AbstractCacheEvent event )
 357  
     {
 358  
         Node newNode = new Node();
 359  
         if ( log.isDebugEnabled() )
 360  
         {
 361  
             log.debug( "Event entering Queue for " + cacheName + ": " + event );
 362  
         }
 363  
 
 364  
         newNode.event = event;
 365  
 
 366  
         synchronized ( queueLock )
 367  
         {
 368  
             size++;
 369  
             tail.next = newNode;
 370  
             tail = newNode;
 371  
             if ( isWorking() )
 372  
             {
 373  
                 if ( !isAlive() )
 374  
                 {
 375  
                     destroyed = false;
 376  
                     processorThread = new QProcessor( this );
 377  
                     processorThread.start();
 378  
                     if ( log.isInfoEnabled() )
 379  
                     {
 380  
                         log.info( "Cache event queue created: " + this );
 381  
                     }
 382  
                 }
 383  
                 else
 384  
                 {
 385  
                     queueLock.notify();
 386  
                 }
 387  
             }
 388  
         }
 389  
     }
 390  
 
 391  
     /**
 392  
      * Returns the next cache event from the queue or null if there are no events in the queue.
 393  
      * <p>
 394  
      * We have an empty node at the head and the tail. When we take an item from the queue we move
 395  
      * the next node to the head and then clear the value from that node. This value is returned.
 396  
      * <p>
 397  
      * When the queue is empty the head node is the same as the tail node.
 398  
      * <p>
 399  
      * @return An event to process.
 400  
      */
 401  
     private AbstractCacheEvent take()
 402  
     {
 403  
         synchronized ( queueLock )
 404  
         {
 405  
             // wait until there is something to read
 406  
             if ( head == tail )
 407  
             {
 408  
                 return null;
 409  
             }
 410  
 
 411  
             Node node = head.next;
 412  
 
 413  
             AbstractCacheEvent value = node.event;
 414  
 
 415  
             if ( log.isDebugEnabled() )
 416  
             {
 417  
                 log.debug( "head.event = " + head.event );
 418  
                 log.debug( "node.event = " + node.event );
 419  
             }
 420  
 
 421  
             // Node becomes the new head (head is always empty)
 422  
 
 423  
             node.event = null;
 424  
             head = node;
 425  
 
 426  
             size--;
 427  
             return value;
 428  
         }
 429  
     }
 430  
 
 431  
     /**
 432  
      * This method returns semi structured data on this queue.
 433  
      * <p>
 434  
      * (non-Javadoc)
 435  
      * @see org.apache.jcs.engine.behavior.ICacheEventQueue#getStatistics()
 436  
      */
 437  
     public IStats getStatistics()
 438  
     {
 439  
         IStats stats = new Stats();
 440  
         stats.setTypeName( "Cache Event Queue" );
 441  
 
 442  
         ArrayList elems = new ArrayList();
 443  
 
 444  
         IStatElement se = null;
 445  
 
 446  
         se = new StatElement();
 447  
         se.setName( "Working" );
 448  
         se.setData( "" + this.working );
 449  
         elems.add( se );
 450  
 
 451  
         se = new StatElement();
 452  
         se.setName( "Alive" );
 453  
         se.setData( "" + this.isAlive() );
 454  
         elems.add( se );
 455  
 
 456  
         se = new StatElement();
 457  
         se.setName( "Empty" );
 458  
         se.setData( "" + this.isEmpty() );
 459  
         elems.add( se );
 460  
 
 461  
         int size = 0;
 462  
         synchronized ( queueLock )
 463  
         {
 464  
             // wait until there is something to read
 465  
             if ( head == tail )
 466  
             {
 467  
                 size = 0;
 468  
             }
 469  
             else
 470  
             {
 471  
                 Node n = head;
 472  
                 while ( n != null )
 473  
                 {
 474  
                     n = n.next;
 475  
                     size++;
 476  
                 }
 477  
             }
 478  
 
 479  
             se = new StatElement();
 480  
             se.setName( "Size" );
 481  
             se.setData( "" + size );
 482  
             elems.add( se );
 483  
         }
 484  
 
 485  
         // get an array and put them in the Stats object
 486  
         IStatElement[] ses = (IStatElement[]) elems.toArray( new StatElement[0] );
 487  
         stats.setStatElements( ses );
 488  
 
 489  
         return stats;
 490  
     }
 491  
 
 492  
     // /////////////////////////// Inner classes /////////////////////////////
 493  
 
 494  
     /** The queue is composed of nodes. */
 495  
     private static class Node
 496  
     {
 497  
         /** Next node in the singly linked list. */
 498  
         Node next = null;
 499  
 
 500  
         /** The payload. */
 501  
         CacheEventQueue.AbstractCacheEvent event = null;
 502  
     }
 503  
 
 504  
     /**
 505  
      * This is the thread that works the queue.
 506  
      * <p>
 507  
      * @author asmuts
 508  
      * @created January 15, 2002
 509  
      */
 510  
     private class QProcessor
 511  
         extends Thread
 512  
     {
 513  
         /** The queue to work */
 514  
         CacheEventQueue queue;
 515  
 
 516  
         /**
 517  
          * Constructor for the QProcessor object
 518  
          * <p>
 519  
          * @param aQueue the event queue to take items from.
 520  
          */
 521  
         QProcessor( CacheEventQueue aQueue )
 522  
         {
 523  
             super( "CacheEventQueue.QProcessor-" + aQueue.cacheName );
 524  
 
 525  
             setDaemon( true );
 526  
             queue = aQueue;
 527  
         }
 528  
 
 529  
         /**
 530  
          * Main processing method for the QProcessor object.
 531  
          * <p>
 532  
          * Waits for a specified time (waitToDieMillis) for something to come in and if no new
 533  
          * events come in during that period the run method can exit and the thread is dereferenced.
 534  
          */
 535  
         public void run()
 536  
         {
 537  
             AbstractCacheEvent event = null;
 538  
 
 539  
             while ( queue.isAlive() )
 540  
             {
 541  
                 event = queue.take();
 542  
 
 543  
                 if ( log.isDebugEnabled() )
 544  
                 {
 545  
                     log.debug( "Event from queue = " + event );
 546  
                 }
 547  
 
 548  
                 if ( event == null )
 549  
                 {
 550  
                     synchronized ( queueLock )
 551  
                     {
 552  
                         try
 553  
                         {
 554  
                             queueLock.wait( queue.getWaitToDieMillis() );
 555  
                         }
 556  
                         catch ( InterruptedException e )
 557  
                         {
 558  
                             log.warn( "Interrupted while waiting for another event to come in before we die." );
 559  
                             return;
 560  
                         }
 561  
                         event = queue.take();
 562  
                         if ( log.isDebugEnabled() )
 563  
                         {
 564  
                             log.debug( "Event from queue after sleep = " + event );
 565  
                         }
 566  
                     }
 567  
                     if ( event == null )
 568  
                     {
 569  
                         queue.stopProcessing();
 570  
                     }
 571  
                 }
 572  
 
 573  
                 if ( queue.isWorking() && queue.isAlive() && event != null )
 574  
                 {
 575  
                     event.run();
 576  
                 }
 577  
             }
 578  
             if ( log.isDebugEnabled() )
 579  
             {
 580  
                 log.debug( "QProcessor exiting for " + queue );
 581  
             }
 582  
         }
 583  
     }
 584  
 
 585  
     /**
 586  
      * Retries before declaring failure.
 587  
      * <p>
 588  
      * @author asmuts
 589  
      * @created January 15, 2002
 590  
      */
 591  
     private abstract class AbstractCacheEvent
 592  
         implements Runnable
 593  
     {
 594  
         /** Number of failures encountered processing this event. */
 595  
         int failures = 0;
 596  
 
 597  
         /** Have we finished the job */
 598  
         boolean done = false;
 599  
 
 600  
         /**
 601  
          * Main processing method for the AbstractCacheEvent object
 602  
          */
 603  
         public void run()
 604  
         {
 605  
             try
 606  
             {
 607  
                 doRun();
 608  
             }
 609  
             catch ( IOException e )
 610  
             {
 611  
                 if ( log.isWarnEnabled() )
 612  
                 {
 613  
                     log.warn( e );
 614  
                 }
 615  
                 if ( ++failures >= maxFailure )
 616  
                 {
 617  
                     if ( log.isWarnEnabled() )
 618  
                     {
 619  
                         log.warn( "Error while running event from Queue: " + this
 620  
                             + ". Dropping Event and marking Event Queue as non-functional." );
 621  
                     }
 622  
                     setWorking( false );
 623  
                     setAlive( false );
 624  
                     return;
 625  
                 }
 626  
                 if ( log.isInfoEnabled() )
 627  
                 {
 628  
                     log.info( "Error while running event from Queue: " + this + ". Retrying..." );
 629  
                 }
 630  
                 try
 631  
                 {
 632  
                     Thread.sleep( waitBeforeRetry );
 633  
                     run();
 634  
                 }
 635  
                 catch ( InterruptedException ie )
 636  
                 {
 637  
                     if ( log.isErrorEnabled() )
 638  
                     {
 639  
                         log.warn( "Interrupted while sleeping for retry on event " + this + "." );
 640  
                     }
 641  
                     // TODO consider if this is best. maybe we shoudl just
 642  
                     // destroy
 643  
                     setWorking( false );
 644  
                     setAlive( false );
 645  
                 }
 646  
             }
 647  
         }
 648  
 
 649  
         /**
 650  
          * @exception IOException
 651  
          */
 652  
         protected abstract void doRun()
 653  
             throws IOException;
 654  
     }
 655  
 
 656  
     /**
 657  
      * An element should be put in the cache.
 658  
      * <p>
 659  
      * @author asmuts
 660  
      * @created January 15, 2002
 661  
      */
 662  
     private class PutEvent
 663  
         extends AbstractCacheEvent
 664  
     {
 665  
         /** The element to put to the listener */
 666  
         private ICacheElement ice;
 667  
 
 668  
         /**
 669  
          * Constructor for the PutEvent object.
 670  
          * <p>
 671  
          * @param ice
 672  
          * @exception IOException
 673  
          */
 674  
         PutEvent( ICacheElement ice )
 675  
             throws IOException
 676  
         {
 677  
             this.ice = ice;
 678  
         }
 679  
 
 680  
         /**
 681  
          * Call put on the listener.
 682  
          * <p>
 683  
          * @exception IOException
 684  
          */
 685  
         protected void doRun()
 686  
             throws IOException
 687  
         {
 688  
             listener.handlePut( ice );
 689  
         }
 690  
 
 691  
         /**
 692  
          * For debugging.
 693  
          * <p>
 694  
          * @return Info on the key and value.
 695  
          */
 696  
         public String toString()
 697  
         {
 698  
             return new StringBuffer( "PutEvent for key: " ).append( ice.getKey() ).append( " value: " )
 699  
                 .append( ice.getVal() ).toString();
 700  
         }
 701  
 
 702  
     }
 703  
 
 704  
     /**
 705  
      * An element should be removed from the cache.
 706  
      * <p>
 707  
      * @author asmuts
 708  
      * @created January 15, 2002
 709  
      */
 710  
     private class RemoveEvent
 711  
         extends AbstractCacheEvent
 712  
     {
 713  
         /** The key to remove from the listener */
 714  
         private Serializable key;
 715  
 
 716  
         /**
 717  
          * Constructor for the RemoveEvent object
 718  
          * <p>
 719  
          * @param key
 720  
          * @exception IOException
 721  
          */
 722  
         RemoveEvent( Serializable key )
 723  
             throws IOException
 724  
         {
 725  
             this.key = key;
 726  
         }
 727  
 
 728  
         /**
 729  
          * Call remove on the listener.
 730  
          * <p>
 731  
          * @exception IOException
 732  
          */
 733  
         protected void doRun()
 734  
             throws IOException
 735  
         {
 736  
             listener.handleRemove( cacheName, key );
 737  
         }
 738  
 
 739  
         /**
 740  
          * For debugging.
 741  
          * <p>
 742  
          * @return Info on the key to remove.
 743  
          */
 744  
         public String toString()
 745  
         {
 746  
             return new StringBuffer( "RemoveEvent for " ).append( key ).toString();
 747  
         }
 748  
 
 749  
     }
 750  
 
 751  
     /**
 752  
      * All elements should be removed from the cache when this event is processed.
 753  
      * <p>
 754  
      * @author asmuts
 755  
      * @created January 15, 2002
 756  
      */
 757  0
     private class RemoveAllEvent
 758  
         extends AbstractCacheEvent
 759  
     {
 760  
         /**
 761  
          * Call removeAll on the listener.
 762  
          * <p>
 763  
          * @exception IOException
 764  
          */
 765  
         protected void doRun()
 766  
             throws IOException
 767  
         {
 768  0
             listener.handleRemoveAll( cacheName );
 769  0
         }
 770  
 
 771  
         /**
 772  
          * For debugging.
 773  
          * <p>
 774  
          * @return The name of the event.
 775  
          */
 776  
         public String toString()
 777  
         {
 778  0
             return "RemoveAllEvent";
 779  
         }
 780  
 
 781  
     }
 782  
 
 783  
     /**
 784  
      * The cache should be disposed when this event is processed.
 785  
      * <p>
 786  
      * @author asmuts
 787  
      * @created January 15, 2002
 788  
      */
 789  
     private class DisposeEvent
 790  
         extends AbstractCacheEvent
 791  
     {
 792  
         /**
 793  
          * Called when gets to the end of the queue
 794  
          * <p>
 795  
          * @exception IOException
 796  
          */
 797  
         protected void doRun()
 798  
             throws IOException
 799  
         {
 800  
             listener.handleDispose( cacheName );
 801  
         }
 802  
 
 803  
         /**
 804  
          * For debugging.
 805  
          * <p>
 806  
          * @return The name of the event.
 807  
          */
 808  
         public String toString()
 809  
         {
 810  
             return "DisposeEvent";
 811  
         }
 812  
     }
 813  
 
 814  
     /**
 815  
      * @return whether the queue is functional.
 816  
      */
 817  
     public boolean isWorking()
 818  
     {
 819  
         return working;
 820  
     }
 821  
 
 822  
     /**
 823  
      * This means that the queue is functional. If we reached the max number of failures, the queue
 824  
      * is marked as non functional and will never work again.
 825  
      * <p>
 826  
      * @param b
 827  
      */
 828  
     public void setWorking( boolean b )
 829  
     {
 830  
         working = b;
 831  
     }
 832  
 
 833  
     /**
 834  
      * @return whether there are any items in the queue.
 835  
      */
 836  
     public boolean isEmpty()
 837  
     {
 838  
         return tail == head;
 839  
     }
 840  
 
 841  
     /**
 842  
      * Returns the number of elements in the queue.
 843  
      * <p>
 844  
      * @return number of items in the queue.
 845  
      */
 846  
     public int size()
 847  
     {
 848  
         return size;
 849  
     }
 850  
 }

This report is generated by jcoverage, Maven and Maven JCoverage Plugin.