001package systems.dmx.core.impl;
002
003import systems.dmx.core.service.CoreService;
004import systems.dmx.core.service.WebSocketsService;
005import systems.dmx.core.util.JavaUtils;
006
007import org.eclipse.jetty.server.Connector;
008import org.eclipse.jetty.server.Server;
009import org.eclipse.jetty.server.nio.SelectChannelConnector;
010import org.eclipse.jetty.websocket.WebSocket;
011import org.eclipse.jetty.websocket.WebSocketHandler;
012
013import javax.servlet.http.HttpServletRequest;
014import javax.servlet.http.HttpSession;
015
016import java.util.Collection;
017import java.util.concurrent.BlockingQueue;
018import java.util.concurrent.LinkedBlockingQueue;
019import java.util.logging.Level;
020import java.util.logging.Logger;
021
022
023
024class WebSocketsServiceImpl implements WebSocketsService {
025
026    // ------------------------------------------------------------------------------------------------------- Constants
027
028    private static final int    WEBSOCKETS_PORT = Integer.getInteger("dmx.websockets.port", 8081);
029    private static final String WEBSOCKETS_URL = System.getProperty("dmx.websockets.url", "ws://localhost:8081");
030    // Note: the default values are required in case no config file is in effect. This applies when DM is started
031    // via feature:install from Karaf. The default values must match the values defined in project POM.
032
033    // ---------------------------------------------------------------------------------------------- Instance Variables
034
035    private WebSocketsServer server;
036    private WebSocketConnectionPool pool = new WebSocketConnectionPool();
037    private SendMessageWorker worker = new SendMessageWorker();
038    private CoreService dmx;
039
040    private Logger logger = Logger.getLogger(getClass().getName());
041
042    // ----------------------------------------------------------------------------------------------------- Constructor
043
044    // ### TODO: inject event manager only 
045    WebSocketsServiceImpl(CoreService dmx) {
046        this.dmx = dmx;
047        init();
048    }
049
050    // -------------------------------------------------------------------------------------------------- Public Methods
051
052    // *** WebSocketsService ***
053
054    @Override
055    public void messageToAll(String pluginUri, String message) {
056        broadcast(pluginUri, message, null);    // exclude=null
057    }
058
059    @Override
060    public void messageToAllButOne(HttpServletRequest request, String pluginUri, String message) {
061        WebSocketConnection connection = getConnection(request, pluginUri);
062        if (connection != null) {
063            broadcast(pluginUri, message, connection);
064        }
065    }
066
067    @Override
068    public void messageToOne(HttpServletRequest request, String pluginUri, String message) {
069        WebSocketConnection connection = getConnection(request, pluginUri);
070        if (connection != null) {
071            queueMessage(connection, message);
072        }
073    }
074
075    // ---
076
077    @Override
078    public String getWebSocketsURL() {
079        return WEBSOCKETS_URL;
080    }
081
082    // ------------------------------------------------------------------------------------------------- Private Methods
083
084    private void init() {
085        try {
086            logger.info("##### Starting Jetty WebSocket server #####");
087            server = new WebSocketsServer(WEBSOCKETS_PORT);
088            server.start();
089            worker.start();
090            // ### server.join();
091            logger.info("### Jetty WebSocket server started successfully");
092        } catch (Exception e) {
093            logger.log(Level.SEVERE, "Starting Jetty WebSocket server failed", e);
094        }
095    }
096
097    void shutdown() {
098        try {
099            if (server != null) {
100                logger.info("##### Stopping Jetty WebSocket server #####");
101                worker.interrupt();
102                server.stop();
103            } else {
104                logger.info("Stopping Jetty WebSocket server SKIPPED -- not yet started");
105            }
106        } catch (Exception e) {
107            logger.log(Level.SEVERE, "Stopping Jetty WebSocket server failed", e);
108        }
109    }
110
111    // ---
112
113    /**
114     * @return  the WebSocket connection that is associated with the given request (based on its session cookie),
115     *          or null if called outside request scope (e.g. while system startup).
116     *
117     * @throws  RuntimeException    if no valid session is associated with the request.
118     */
119    private WebSocketConnection getConnection(HttpServletRequest request, String pluginUri) {
120        try {
121            HttpSession session = request.getSession(false);
122            if (session == null) {
123                throw new RuntimeException("No valid session is associated with the request");
124            }
125            return pool.getConnection(pluginUri, session.getId());
126        } catch (IllegalStateException e) {
127            // Note: this happens if "request" is accessed outside request scope, e.g. while system startup.
128            return null;
129        }
130    }
131
132    private void broadcast(String pluginUri, String message, WebSocketConnection exclude) {
133        Collection<WebSocketConnection> connections = pool.getConnections(pluginUri);
134        if (connections != null) {
135            for (WebSocketConnection connection : connections) {
136                if (connection != exclude) {
137                    queueMessage(connection, message);
138                }
139            }
140        }
141    }
142
143    private void queueMessage(WebSocketConnection connection, String message) {
144        worker.queueMessage(connection, message);
145    }
146
147
148
149    // ------------------------------------------------------------------------------------------------- Private Classes
150
151    private class WebSocketsServer extends Server {
152
153        private int counter = 0;     // counts anonymous connections
154
155        private WebSocketsServer(int port) {
156            // add connector
157            Connector connector = new SelectChannelConnector();
158            connector.setPort(port);
159            addConnector(connector);
160            //
161            // set handler
162            setHandler(new WebSocketHandler() {
163                @Override
164                public WebSocket doWebSocketConnect(HttpServletRequest request, String protocol) {
165                    checkProtocol(protocol);
166                    return new WebSocketConnection(protocol, sessionId(request), pool, dmx);
167                }
168            });
169        }
170
171        private void checkProtocol(String pluginUri) {
172            try {
173                if (pluginUri == null) {
174                    throw new RuntimeException("A plugin URI is missing in the WebSocket handshake -- Add your " +
175                        "plugin's URI as the 2nd argument to the JavaScript WebSocket constructor");
176                } else {
177                    dmx.getPlugin(pluginUri);   // check plugin URI, throws if invalid
178                }
179            } catch (Exception e) {
180                throw new RuntimeException("Opening a WebSocket connection " +
181                    (pluginUri != null ? "for plugin \"" + pluginUri + "\" " : "") + "failed", e);
182            }
183        }
184
185        private String sessionId(HttpServletRequest request) {
186            String sessionId = JavaUtils.cookieValue(request, "JSESSIONID");
187            // TODO: drop anonymous connections
188            return sessionId != null ? sessionId : "anonymous-" + counter++;
189        }
190    }
191
192    private class SendMessageWorker extends Thread {
193
194        private BlockingQueue<QueuedMessage> messages = new LinkedBlockingQueue();
195
196        private SendMessageWorker() {
197            setPriority(Thread.MIN_PRIORITY);
198        }
199
200        @Override
201        public void run() {
202            try {
203                while (true) {
204                    QueuedMessage message = messages.take();
205                    yield();
206                    // logger.info("----- sending message " + Thread.currentThread().getName());
207                    message.connection.sendMessage(message.message);
208                }
209            } catch (InterruptedException e) {
210                logger.info("### SendMessageWorker thread received an InterruptedException -- terminating");
211            } catch (Exception e) {
212                logger.log(Level.WARNING, "An exception occurred in the SendMessageWorker thread -- terminating:", e);
213            }
214        }
215
216        private void queueMessage(WebSocketConnection connection, String message) {
217            try {
218                // logger.info("----- queueing message " + Thread.currentThread().getName());
219                messages.put(new QueuedMessage(connection, message));
220            } catch (InterruptedException e) {
221                logger.log(Level.WARNING, "Queueing a message failed:", e);
222            }
223        }
224    }
225
226    private static class QueuedMessage {
227
228        private WebSocketConnection connection;
229        private String message;
230
231        private QueuedMessage(WebSocketConnection connection, String message) {
232            this.connection = connection;
233            this.message = message;
234        }
235    }
236}