View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.transport.vmpipe.support;
21  
22  import java.util.Queue;
23  import java.util.concurrent.ConcurrentLinkedQueue;
24  
25  import org.apache.mina.common.ByteBuffer;
26  import org.apache.mina.common.IdleStatus;
27  import org.apache.mina.common.IoSession;
28  import org.apache.mina.common.IoFilter.WriteRequest;
29  import org.apache.mina.common.support.AbstractIoFilterChain;
30  
31  /**
32   * @todo Document me!
33   *
34   * @author The Apache Directory Project (mina-dev@directory.apache.org)
35   * @version $Rev: 635494 $, $Date: 2008-03-10 18:04:45 +0900 (Mon, 10 Mar 2008) $
36   */
37  public class VmPipeFilterChain extends AbstractIoFilterChain {
38  
39      private final Queue<Event> eventQueue = new ConcurrentLinkedQueue<Event>();
40  
41      private volatile boolean flushEnabled;
42      private volatile boolean sessionOpened;
43  
44      public VmPipeFilterChain(IoSession session) {
45          super(session);
46      }
47  
48      public void start() {
49          flushEnabled = true;
50          flushEvents();
51          flushPendingDataQueues( (VmPipeSessionImpl) getSession() );
52      }
53  
54      private void pushEvent(Event e) {
55          eventQueue.offer(e);
56          if ( flushEnabled ) {
57              flushEvents();
58          }
59      }
60  
61      private void flushEvents() {
62          Event e;
63          while ((e = eventQueue.poll()) != null) {
64              fireEvent(e);
65          }
66      }
67  
68      private void fireEvent(Event e) {
69          VmPipeSessionImpl session = (VmPipeSessionImpl) getSession();
70          EventType type = e.getType();
71          Object data = e.getData();
72  
73          if (type == EventType.RECEIVED) {
74              if( sessionOpened && session.getTrafficMask().isReadable() && session.getLock().tryLock()) {
75                  try {
76                      int byteCount = 1;
77                      if (data instanceof ByteBuffer) {
78                          byteCount = ((ByteBuffer) data).remaining();
79                      }
80  
81                      session.increaseReadBytes(byteCount);
82  
83                      super.fireMessageReceived(session, data);
84                  } finally {
85                      session.getLock().unlock();
86                  }
87  
88                  flushPendingDataQueues( session );
89              } else {
90                  session.pendingDataQueue.add(data);
91              }
92          } else if (type == EventType.WRITE) {
93              super.fireFilterWrite(session, (WriteRequest) data);
94          } else if (type == EventType.SENT) {
95              super.fireMessageSent(session, (WriteRequest) data);
96          } else if (type == EventType.EXCEPTION) {
97              super.fireExceptionCaught(session, (Throwable) data);
98          } else if (type == EventType.IDLE) {
99              super.fireSessionIdle(session, (IdleStatus) data);
100         } else if (type == EventType.OPENED) {
101             super.fireSessionOpened(session);
102             sessionOpened = true;
103         } else if (type == EventType.CREATED) {
104             session.getLock().lock();
105             try {
106                 super.fireSessionCreated(session);
107             } finally {
108                 session.getLock().unlock();
109             }
110         } else if (type == EventType.CLOSED) {
111             super.fireSessionClosed(session);
112         } else if (type == EventType.CLOSE) {
113             super.fireFilterClose(session);
114         }
115     }
116 
117     private static void flushPendingDataQueues( VmPipeSessionImpl s ) {
118         s.updateTrafficMask();
119         s.getRemoteSession().updateTrafficMask();
120     }
121 
122     @Override
123     public void fireFilterClose(IoSession session) {
124         pushEvent(new Event(EventType.CLOSE, null));
125     }
126 
127     @Override
128     public void fireFilterWrite(IoSession session, WriteRequest writeRequest) {
129         pushEvent(new Event(EventType.WRITE, writeRequest));
130     }
131 
132     @Override
133     public void fireExceptionCaught(IoSession session, Throwable cause) {
134         pushEvent(new Event(EventType.EXCEPTION, cause));
135     }
136 
137     @Override
138     public void fireMessageSent(IoSession session, WriteRequest request) {
139         pushEvent(new Event(EventType.SENT, request));
140     }
141 
142     @Override
143     public void fireSessionClosed(IoSession session) {
144         pushEvent(new Event(EventType.CLOSED, null));
145     }
146 
147     @Override
148     public void fireSessionCreated(IoSession session) {
149         pushEvent(new Event(EventType.CREATED, null));
150     }
151 
152     @Override
153     public void fireSessionIdle(IoSession session, IdleStatus status) {
154         pushEvent(new Event(EventType.IDLE, status));
155     }
156 
157     @Override
158     public void fireSessionOpened(IoSession session) {
159         pushEvent(new Event(EventType.OPENED, null));
160     }
161 
162     @Override
163     public void fireMessageReceived(IoSession session, Object message) {
164         pushEvent(new Event(EventType.RECEIVED, message));
165     }
166 
167     @Override
168     protected void doWrite(IoSession session, WriteRequest writeRequest) {
169         VmPipeSessionImpl s = (VmPipeSessionImpl) session;
170         if (s.isConnected()) {
171             if ( s.getTrafficMask().isWritable() && s.getLock().tryLock()) {
172                 try {
173                     Object message = writeRequest.getMessage();
174 
175                     int byteCount = 1;
176                     Object messageCopy = message;
177                     if (message instanceof ByteBuffer) {
178                         ByteBuffer rb = (ByteBuffer) message;
179                         rb.mark();
180                         byteCount = rb.remaining();
181                         ByteBuffer wb = ByteBuffer.allocate(rb.remaining());
182                         wb.put(rb);
183                         wb.flip();
184                         rb.reset();
185                         messageCopy = wb;
186                     }
187 
188                     // Avoid unwanted side effect that scheduledWrite* becomes negative
189                     // by increasing them.
190                     s.increaseScheduledWriteBytes(byteCount);
191                     s.increaseScheduledWriteRequests();
192                     
193                     s.increaseWrittenBytes(byteCount);
194                     s.increaseWrittenMessages();
195 
196                     s.getRemoteSession().getFilterChain().fireMessageReceived(
197                         s.getRemoteSession(), messageCopy);
198                     s.getFilterChain().fireMessageSent(s, writeRequest);
199                 } finally {
200                     s.getLock().unlock();
201                 }
202 
203                 flushPendingDataQueues( s );
204             } else {
205                 s.pendingDataQueue.add(writeRequest);
206             }
207         } else {
208             writeRequest.getFuture().setWritten(false);
209         }
210     }
211 
212     @Override
213     protected void doClose(IoSession session) {
214         VmPipeSessionImpl s = (VmPipeSessionImpl) session;
215 
216         try {
217             s.getLock().lock();
218 
219             if (!session.getCloseFuture().isClosed()) {
220                 s.getServiceListeners().fireSessionDestroyed(s);
221                 s.getRemoteSession().close();
222             }
223         } finally {
224             s.getLock().unlock();
225         }
226     }
227 
228     // FIXME Copied and pasted from {@link ExecutorFilter}.
229     private static class EventType {
230         public static final EventType CREATED = new EventType("CREATED");
231 
232         public static final EventType OPENED = new EventType("OPENED");
233 
234         public static final EventType CLOSED = new EventType("CLOSED");
235 
236         public static final EventType RECEIVED = new EventType("RECEIVED");
237 
238         public static final EventType SENT = new EventType("SENT");
239 
240         public static final EventType IDLE = new EventType("IDLE");
241 
242         public static final EventType EXCEPTION = new EventType("EXCEPTION");
243 
244         public static final EventType WRITE = new EventType("WRITE");
245 
246         public static final EventType CLOSE = new EventType("CLOSE");
247 
248         private final String value;
249 
250         private EventType(String value) {
251             this.value = value;
252         }
253 
254         @Override
255         public String toString() {
256             return value;
257         }
258     }
259 
260     private static class Event {
261         private final EventType type;
262 
263         private final Object data;
264 
265         private Event(EventType type, Object data) {
266             this.type = type;
267             this.data = data;
268         }
269 
270         public Object getData() {
271             return data;
272         }
273 
274         public EventType getType() {
275             return type;
276         }
277     }
278 }