001package systems.dmx.core.impl; 002 003import systems.dmx.core.service.CoreService; 004import systems.dmx.core.service.WebSocketsService; 005import systems.dmx.core.util.JavaUtils; 006 007import org.eclipse.jetty.server.Connector; 008import org.eclipse.jetty.server.Server; 009import org.eclipse.jetty.server.nio.SelectChannelConnector; 010import org.eclipse.jetty.websocket.WebSocket; 011import org.eclipse.jetty.websocket.WebSocketHandler; 012 013import javax.servlet.http.HttpServletRequest; 014import javax.servlet.http.HttpSession; 015 016import java.util.Collection; 017import java.util.concurrent.BlockingQueue; 018import java.util.concurrent.LinkedBlockingQueue; 019import java.util.logging.Level; 020import java.util.logging.Logger; 021 022 023 024class WebSocketsServiceImpl implements WebSocketsService { 025 026 // ------------------------------------------------------------------------------------------------------- Constants 027 028 private static final int WEBSOCKETS_PORT = Integer.getInteger("dmx.websockets.port", 8081); 029 private static final String WEBSOCKETS_URL = System.getProperty("dmx.websockets.url", "ws://localhost:8081"); 030 // Note: the default values are required in case no config file is in effect. This applies when DM is started 031 // via feature:install from Karaf. The default values must match the values defined in project POM. 032 033 // ---------------------------------------------------------------------------------------------- Instance Variables 034 035 private WebSocketsServer server; 036 private WebSocketConnectionPool pool = new WebSocketConnectionPool(); 037 private SendMessageWorker worker = new SendMessageWorker(); 038 private CoreService dmx; 039 040 private Logger logger = Logger.getLogger(getClass().getName()); 041 042 // ----------------------------------------------------------------------------------------------------- Constructor 043 044 // ### TODO: inject event manager only 045 WebSocketsServiceImpl(CoreService dmx) { 046 this.dmx = dmx; 047 init(); 048 } 049 050 // -------------------------------------------------------------------------------------------------- Public Methods 051 052 // *** WebSocketsService *** 053 054 @Override 055 public void messageToAll(String pluginUri, String message) { 056 broadcast(pluginUri, message, null); // exclude=null 057 } 058 059 @Override 060 public void messageToAllButOne(HttpServletRequest request, String pluginUri, String message) { 061 WebSocketConnection connection = getConnection(request, pluginUri); 062 if (connection != null) { 063 broadcast(pluginUri, message, connection); 064 } 065 } 066 067 @Override 068 public void messageToOne(HttpServletRequest request, String pluginUri, String message) { 069 WebSocketConnection connection = getConnection(request, pluginUri); 070 if (connection != null) { 071 queueMessage(connection, message); 072 } 073 } 074 075 // --- 076 077 @Override 078 public String getWebSocketsURL() { 079 return WEBSOCKETS_URL; 080 } 081 082 // ------------------------------------------------------------------------------------------------- Private Methods 083 084 private void init() { 085 try { 086 logger.info("##### Starting Jetty WebSocket server #####"); 087 server = new WebSocketsServer(WEBSOCKETS_PORT); 088 server.start(); 089 worker.start(); 090 // ### server.join(); 091 logger.info("### Jetty WebSocket server started successfully"); 092 } catch (Exception e) { 093 logger.log(Level.SEVERE, "Starting Jetty WebSocket server failed", e); 094 } 095 } 096 097 void shutdown() { 098 try { 099 if (server != null) { 100 logger.info("##### Stopping Jetty WebSocket server #####"); 101 worker.interrupt(); 102 server.stop(); 103 } else { 104 logger.info("Stopping Jetty WebSocket server SKIPPED -- not yet started"); 105 } 106 } catch (Exception e) { 107 logger.log(Level.SEVERE, "Stopping Jetty WebSocket server failed", e); 108 } 109 } 110 111 // --- 112 113 /** 114 * @return the WebSocket connection that is associated with the given request (based on its session cookie), 115 * or null if called outside request scope (e.g. while system startup). 116 * 117 * @throws RuntimeException if no valid session is associated with the request. 118 */ 119 private WebSocketConnection getConnection(HttpServletRequest request, String pluginUri) { 120 try { 121 HttpSession session = request.getSession(false); 122 if (session == null) { 123 throw new RuntimeException("No valid session is associated with the request"); 124 } 125 return pool.getConnection(pluginUri, session.getId()); 126 } catch (IllegalStateException e) { 127 // Note: this happens if "request" is accessed outside request scope, e.g. while system startup. 128 return null; 129 } 130 } 131 132 private void broadcast(String pluginUri, String message, WebSocketConnection exclude) { 133 Collection<WebSocketConnection> connections = pool.getConnections(pluginUri); 134 if (connections != null) { 135 for (WebSocketConnection connection : connections) { 136 if (connection != exclude) { 137 queueMessage(connection, message); 138 } 139 } 140 } 141 } 142 143 private void queueMessage(WebSocketConnection connection, String message) { 144 worker.queueMessage(connection, message); 145 } 146 147 148 149 // ------------------------------------------------------------------------------------------------- Private Classes 150 151 private class WebSocketsServer extends Server { 152 153 private int counter = 0; // counts anonymous connections 154 155 private WebSocketsServer(int port) { 156 // add connector 157 Connector connector = new SelectChannelConnector(); 158 connector.setPort(port); 159 addConnector(connector); 160 // 161 // set handler 162 setHandler(new WebSocketHandler() { 163 @Override 164 public WebSocket doWebSocketConnect(HttpServletRequest request, String protocol) { 165 checkProtocol(protocol); 166 return new WebSocketConnection(protocol, sessionId(request), pool, dmx); 167 } 168 }); 169 } 170 171 private void checkProtocol(String pluginUri) { 172 try { 173 if (pluginUri == null) { 174 throw new RuntimeException("A plugin URI is missing in the WebSocket handshake -- Add your " + 175 "plugin's URI as the 2nd argument to the JavaScript WebSocket constructor"); 176 } else { 177 dmx.getPlugin(pluginUri); // check plugin URI, throws if invalid 178 } 179 } catch (Exception e) { 180 throw new RuntimeException("Opening a WebSocket connection " + 181 (pluginUri != null ? "for plugin \"" + pluginUri + "\" " : "") + "failed", e); 182 } 183 } 184 185 private String sessionId(HttpServletRequest request) { 186 String sessionId = JavaUtils.cookieValue(request, "JSESSIONID"); 187 // TODO: drop anonymous connections 188 return sessionId != null ? sessionId : "anonymous-" + counter++; 189 } 190 } 191 192 private class SendMessageWorker extends Thread { 193 194 private BlockingQueue<QueuedMessage> messages = new LinkedBlockingQueue(); 195 196 private SendMessageWorker() { 197 setPriority(Thread.MIN_PRIORITY); 198 } 199 200 @Override 201 public void run() { 202 try { 203 while (true) { 204 QueuedMessage message = messages.take(); 205 yield(); 206 // logger.info("----- sending message " + Thread.currentThread().getName()); 207 message.connection.sendMessage(message.message); 208 } 209 } catch (InterruptedException e) { 210 logger.info("### SendMessageWorker thread received an InterruptedException -- terminating"); 211 } catch (Exception e) { 212 logger.log(Level.WARNING, "An exception occurred in the SendMessageWorker thread -- terminating:", e); 213 } 214 } 215 216 private void queueMessage(WebSocketConnection connection, String message) { 217 try { 218 // logger.info("----- queueing message " + Thread.currentThread().getName()); 219 messages.put(new QueuedMessage(connection, message)); 220 } catch (InterruptedException e) { 221 logger.log(Level.WARNING, "Queueing a message failed:", e); 222 } 223 } 224 } 225 226 private static class QueuedMessage { 227 228 private WebSocketConnection connection; 229 private String message; 230 231 private QueuedMessage(WebSocketConnection connection, String message) { 232 this.connection = connection; 233 this.message = message; 234 } 235 } 236}