001package de.deepamehta.plugins.websockets; 002 003import de.deepamehta.plugins.websockets.event.WebsocketTextMessageListener; 004import de.deepamehta.core.service.DeepaMehtaEvent; 005import de.deepamehta.core.service.DeepaMehtaService; 006import de.deepamehta.core.service.EventListener; 007 008import org.eclipse.jetty.server.Server; 009import org.eclipse.jetty.server.nio.SelectChannelConnector; 010import org.eclipse.jetty.websocket.WebSocket; 011import org.eclipse.jetty.websocket.WebSocket.Connection; 012import org.eclipse.jetty.websocket.WebSocketHandler; 013 014import javax.servlet.http.HttpServletRequest; 015 016import java.util.Map; 017import java.util.Queue; 018import java.util.concurrent.ConcurrentHashMap; 019import java.util.concurrent.ConcurrentLinkedQueue; 020import java.util.logging.Level; 021import java.util.logging.Logger; 022 023 024 025class WebSocketsServer extends Server { 026 027 // ------------------------------------------------------------------------------------------------------- Constants 028 029 // Events 030 private static DeepaMehtaEvent WEBSOCKET_TEXT_MESSAGE = new DeepaMehtaEvent(WebsocketTextMessageListener.class) { 031 @Override 032 public void deliver(EventListener listener, Object... params) { 033 ((WebsocketTextMessageListener) listener).websocketTextMessage( 034 (String) params[0] 035 ); 036 } 037 }; 038 // ### TODO: define further events, OPEN, CLOSE, BINARY_MESSAGE, ... 039 040 // ---------------------------------------------------------------------------------------------- Instance Variables 041 042 private Map<String, Queue<Connection>> pluginConnections = new ConcurrentHashMap(); 043 044 private DeepaMehtaService dms; 045 046 private Logger logger = Logger.getLogger(getClass().getName()); 047 048 // ----------------------------------------------------------------------------------------------------- Constructor 049 050 WebSocketsServer(int port, DeepaMehtaService dms) { 051 this.dms = dms; 052 // 053 // add connector 054 SelectChannelConnector connector = new SelectChannelConnector(); 055 connector.setPort(port); 056 addConnector(connector); 057 // 058 // set WebSocket handler 059 setHandler(new WebSocketHandler() { 060 @Override 061 public WebSocket doWebSocketConnect(HttpServletRequest request, String protocol) { 062 return new PluginWebSocket(protocol); 063 } 064 }); 065 } 066 067 // ----------------------------------------------------------------------------------------- Package Private Methods 068 069 void broadcast(String pluginUri, String message) { 070 Queue<Connection> connections = getConnections(pluginUri); 071 if (connections != null) { 072 for (Connection connection : connections) { 073 try { 074 connection.sendMessage(message); 075 } catch (Exception e) { 076 removeConnection(pluginUri, connection); 077 logger.log(Level.SEVERE, "Sending message via " + connection + " failed -- connection removed", e); 078 } 079 } 080 } 081 } 082 083 // ------------------------------------------------------------------------------------------------- Private Methods 084 085 /** 086 * Returns the open WebSocket connections associated to the given plugin, or <code>null</code> if there are none. 087 */ 088 private Queue<Connection> getConnections(String pluginUri) { 089 return pluginConnections.get(pluginUri); 090 } 091 092 private void addConnection(String pluginUri, Connection connection) { 093 Queue<Connection> connections = getConnections(pluginUri); 094 if (connections == null) { 095 connections = new ConcurrentLinkedQueue<Connection>(); 096 pluginConnections.put(pluginUri, connections); 097 } 098 connections.add(connection); 099 } 100 101 private void removeConnection(String pluginUri, Connection connection) { 102 boolean removed = getConnections(pluginUri).remove(connection); 103 if (!removed) { 104 throw new RuntimeException("Removing a connection of plugin \"" + pluginUri + "\" failed"); 105 } 106 } 107 108 // ------------------------------------------------------------------------------------------------- Private Classes 109 110 private class PluginWebSocket implements WebSocket, WebSocket.OnTextMessage, WebSocket.OnBinaryMessage { 111 112 private String pluginUri; 113 private Connection connection; 114 115 private PluginWebSocket(String pluginUri) { 116 this.pluginUri = pluginUri; 117 try { 118 if (pluginUri == null) { 119 throw new RuntimeException("Missing plugin URI -- Add your plugin's URI " + 120 "as the 2nd argument to the JavaScript WebSocket constructor"); 121 } else { 122 dms.getPlugin(pluginUri); // check plugin URI, throws if invalid 123 } 124 } catch (Exception e) { 125 throw new RuntimeException("Opening WebSocket connection failed", e); 126 } 127 } 128 129 // *** WebSocket *** 130 131 @Override 132 public void onOpen(Connection connection) { 133 logger.info("### Opening a WebSocket connection for plugin \"" + pluginUri + "\""); 134 this.connection = connection; 135 addConnection(pluginUri, connection); 136 } 137 138 @Override 139 public void onClose(int code, String message) { 140 logger.info("### Closing a WebSocket connection of plugin \"" + pluginUri + "\""); 141 removeConnection(pluginUri, connection); 142 } 143 144 // *** WebSocket.OnTextMessage *** 145 146 @Override 147 public void onMessage(String message) { 148 dms.deliverEvent(pluginUri, WEBSOCKET_TEXT_MESSAGE, message); 149 } 150 151 // *** WebSocket.OnBinaryMessage *** 152 153 @Override 154 public void onMessage(byte[] data, int offset, int length) { 155 // ### TODO 156 } 157 } 158}