1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.transport.vmpipe.support;
21
22 import org.apache.mina.common.ByteBuffer;
23 import org.apache.mina.common.IdleStatus;
24 import org.apache.mina.common.IoSession;
25 import org.apache.mina.common.IoFilter.WriteRequest;
26 import org.apache.mina.common.support.AbstractIoFilterChain;
27
28 import edu.emory.mathcs.backport.java.util.Queue;
29 import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentLinkedQueue;
30 import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
31
32
33
34
35
36 public class VmPipeFilterChain extends AbstractIoFilterChain {
37
38 private final Queue eventQueue = new ConcurrentLinkedQueue();
39
40 private final AtomicBoolean flushEnabled = new AtomicBoolean();
41 private final AtomicBoolean sessionOpened = new AtomicBoolean();
42
43 public VmPipeFilterChain(IoSession session) {
44 super(session);
45 }
46
47 public void start() {
48 flushEnabled.set(true);
49 flushEvents();
50 flushPendingDataQueues((VmPipeSessionImpl) getSession());
51 }
52
53 private void pushEvent(Event e) {
54 eventQueue.offer(e);
55 if (flushEnabled.get()) {
56 flushEvents();
57 }
58 }
59
60 private void flushEvents() {
61 Event e;
62 while ((e = (Event) eventQueue.poll()) != null) {
63 fireEvent(e);
64 }
65 }
66
67 private void fireEvent(Event e) {
68 VmPipeSessionImpl session = (VmPipeSessionImpl) getSession();
69 EventType type = e.getType();
70 Object data = e.getData();
71
72 if (type == EventType.RECEIVED) {
73 if (sessionOpened.get() && session.getTrafficMask().isReadable() && session.getLock().tryLock()) {
74 try {
75 int byteCount = 1;
76 if (data instanceof ByteBuffer) {
77 byteCount = ((ByteBuffer) data).remaining();
78 }
79
80 session.increaseReadBytes(byteCount);
81
82 super.fireMessageReceived(session, data);
83 } finally {
84 session.getLock().unlock();
85 }
86
87 flushPendingDataQueues(session);
88 } else {
89 session.pendingDataQueue.add(data);
90 }
91 } else if (type == EventType.WRITE) {
92 super.fireFilterWrite(session, (WriteRequest) data);
93 } else if (type == EventType.SENT) {
94 super.fireMessageSent(session, (WriteRequest) data);
95 } else if (type == EventType.EXCEPTION) {
96 super.fireExceptionCaught(session, (Throwable) data);
97 } else if (type == EventType.IDLE) {
98 super.fireSessionIdle(session, (IdleStatus) data);
99 } else if (type == EventType.OPENED) {
100 super.fireSessionOpened(session);
101 sessionOpened.set(true);
102 } else if (type == EventType.CREATED) {
103 session.getLock().lock();
104 try {
105 super.fireSessionCreated(session);
106 } finally {
107 session.getLock().unlock();
108 }
109 } else if (type == EventType.CLOSED) {
110 super.fireSessionClosed(session);
111 } else if (type == EventType.CLOSE) {
112 super.fireFilterClose(session);
113 }
114 }
115
116 private static void flushPendingDataQueues( VmPipeSessionImpl s ) {
117 s.updateTrafficMask();
118 s.getRemoteSession().updateTrafficMask();
119 }
120
121 public void fireFilterClose(IoSession session) {
122 pushEvent(new Event(EventType.CLOSE, null));
123 }
124
125 public void fireFilterWrite(IoSession session, WriteRequest writeRequest) {
126 pushEvent(new Event(EventType.WRITE, writeRequest));
127 }
128
129 public void fireExceptionCaught(IoSession session, Throwable cause) {
130 pushEvent(new Event(EventType.EXCEPTION, cause));
131 }
132
133 public void fireMessageSent(IoSession session, WriteRequest request) {
134 pushEvent(new Event(EventType.SENT, request));
135 }
136
137 public void fireSessionClosed(IoSession session) {
138 pushEvent(new Event(EventType.CLOSED, null));
139 }
140
141 public void fireSessionCreated(IoSession session) {
142 pushEvent(new Event(EventType.CREATED, null));
143 }
144
145 public void fireSessionIdle(IoSession session, IdleStatus status) {
146 pushEvent(new Event(EventType.IDLE, status));
147 }
148
149 public void fireSessionOpened(IoSession session) {
150 pushEvent(new Event(EventType.OPENED, null));
151 }
152
153 public void fireMessageReceived(IoSession session, Object message) {
154 pushEvent(new Event(EventType.RECEIVED, message));
155 }
156
157 protected void doWrite(IoSession session, WriteRequest writeRequest) {
158 VmPipeSessionImpl s = (VmPipeSessionImpl) session;
159 if (s.isConnected()) {
160 if (s.getTrafficMask().isWritable() && s.getLock().tryLock()) {
161 try {
162 Object message = writeRequest.getMessage();
163
164 int byteCount = 1;
165 Object messageCopy = message;
166 if (message instanceof ByteBuffer) {
167 ByteBuffer rb = (ByteBuffer) message;
168 rb.mark();
169 byteCount = rb.remaining();
170 ByteBuffer wb = ByteBuffer.allocate(rb.remaining());
171 wb.put(rb);
172 wb.flip();
173 rb.reset();
174 messageCopy = wb;
175 }
176
177
178
179 s.increaseScheduledWriteBytes(byteCount);
180 s.increaseScheduledWriteRequests();
181
182 s.increaseWrittenBytes(byteCount);
183 s.increaseWrittenMessages();
184
185 s.getRemoteSession().getFilterChain().fireMessageReceived(
186 s.getRemoteSession(), messageCopy);
187 s.getFilterChain().fireMessageSent(s, writeRequest);
188 } finally {
189 s.getLock().unlock();
190 }
191
192 flushPendingDataQueues( s );
193 } else {
194 s.pendingDataQueue.add(writeRequest);
195 }
196 } else {
197 writeRequest.getFuture().setWritten(false);
198 }
199 }
200
201 protected void doClose(IoSession session) {
202 VmPipeSessionImpl s = (VmPipeSessionImpl) session;
203 s.getLock().lock();
204 try {
205 if (!session.getCloseFuture().isClosed()) {
206 s.getServiceListeners().fireSessionDestroyed(s);
207 s.getRemoteSession().close();
208 }
209 } finally {
210 s.getLock().unlock();
211 }
212 }
213
214
215 private static class EventType {
216 public static final EventType CREATED = new EventType("CREATED");
217
218 public static final EventType OPENED = new EventType("OPENED");
219
220 public static final EventType CLOSED = new EventType("CLOSED");
221
222 public static final EventType RECEIVED = new EventType("RECEIVED");
223
224 public static final EventType SENT = new EventType("SENT");
225
226 public static final EventType IDLE = new EventType("IDLE");
227
228 public static final EventType EXCEPTION = new EventType("EXCEPTION");
229
230 public static final EventType WRITE = new EventType("WRITE");
231
232 public static final EventType CLOSE = new EventType("CLOSE");
233
234 private final String value;
235
236 private EventType(String value) {
237 this.value = value;
238 }
239
240 public String toString() {
241 return value;
242 }
243 }
244
245 private static class Event {
246 private final EventType type;
247
248 private final Object data;
249
250 private Event(EventType type, Object data) {
251 this.type = type;
252 this.data = data;
253 }
254
255 public Object getData() {
256 return data;
257 }
258
259 public EventType getType() {
260 return type;
261 }
262 }
263 }