001 package de.deepamehta.plugins.websockets;
002
003 import de.deepamehta.plugins.websockets.event.WebsocketTextMessageListener;
004 import de.deepamehta.core.service.DeepaMehtaEvent;
005 import de.deepamehta.core.service.DeepaMehtaService;
006 import de.deepamehta.core.service.EventListener;
007
008 import org.eclipse.jetty.server.Server;
009 import org.eclipse.jetty.server.nio.SelectChannelConnector;
010 import org.eclipse.jetty.websocket.WebSocket;
011 import org.eclipse.jetty.websocket.WebSocket.Connection;
012 import org.eclipse.jetty.websocket.WebSocketHandler;
013
014 import javax.servlet.http.HttpServletRequest;
015
016 import java.util.Map;
017 import java.util.Queue;
018 import java.util.concurrent.ConcurrentHashMap;
019 import java.util.concurrent.ConcurrentLinkedQueue;
020 import java.util.logging.Level;
021 import java.util.logging.Logger;
022
023
024
025 class WebSocketsServer extends Server {
026
027 // ------------------------------------------------------------------------------------------------------- Constants
028
029 // Events
030 private static DeepaMehtaEvent WEBSOCKET_TEXT_MESSAGE = new DeepaMehtaEvent(WebsocketTextMessageListener.class) {
031 @Override
032 public void deliver(EventListener listener, Object... params) {
033 ((WebsocketTextMessageListener) listener).websocketTextMessage(
034 (String) params[0]
035 );
036 }
037 };
038 // ### TODO: define further events, OPEN, CLOSE, BINARY_MESSAGE, ...
039
040 // ---------------------------------------------------------------------------------------------- Instance Variables
041
042 private Map<String, Queue<Connection>> pluginConnections = new ConcurrentHashMap();
043
044 private DeepaMehtaService dms;
045
046 private Logger logger = Logger.getLogger(getClass().getName());
047
048 // ----------------------------------------------------------------------------------------------------- Constructor
049
050 WebSocketsServer(int port, DeepaMehtaService dms) {
051 this.dms = dms;
052 //
053 // add connector
054 SelectChannelConnector connector = new SelectChannelConnector();
055 connector.setPort(port);
056 addConnector(connector);
057 //
058 // set WebSocket handler
059 setHandler(new WebSocketHandler() {
060 @Override
061 public WebSocket doWebSocketConnect(HttpServletRequest request, String protocol) {
062 return new PluginWebSocket(protocol);
063 }
064 });
065 }
066
067 // ----------------------------------------------------------------------------------------- Package Private Methods
068
069 void broadcast(String pluginUri, String message) {
070 Queue<Connection> connections = getConnections(pluginUri);
071 if (connections != null) {
072 for (Connection connection : connections) {
073 try {
074 connection.sendMessage(message);
075 } catch (Exception e) {
076 removeConnection(pluginUri, connection);
077 logger.log(Level.SEVERE, "Sending message via " + connection + " failed -- connection removed", e);
078 }
079 }
080 }
081 }
082
083 // ------------------------------------------------------------------------------------------------- Private Methods
084
085 /**
086 * Returns the open WebSocket connections associated to the given plugin, or <code>null</code> if there are none.
087 */
088 private Queue<Connection> getConnections(String pluginUri) {
089 return pluginConnections.get(pluginUri);
090 }
091
092 private void addConnection(String pluginUri, Connection connection) {
093 Queue<Connection> connections = getConnections(pluginUri);
094 if (connections == null) {
095 connections = new ConcurrentLinkedQueue<Connection>();
096 pluginConnections.put(pluginUri, connections);
097 }
098 connections.add(connection);
099 }
100
101 private void removeConnection(String pluginUri, Connection connection) {
102 boolean removed = getConnections(pluginUri).remove(connection);
103 if (!removed) {
104 throw new RuntimeException("Removing a connection of plugin \"" + pluginUri + "\" failed");
105 }
106 }
107
108 // ------------------------------------------------------------------------------------------------- Private Classes
109
110 private class PluginWebSocket implements WebSocket, WebSocket.OnTextMessage, WebSocket.OnBinaryMessage {
111
112 private String pluginUri;
113 private Connection connection;
114
115 private PluginWebSocket(String pluginUri) {
116 this.pluginUri = pluginUri;
117 try {
118 if (pluginUri == null) {
119 throw new RuntimeException("Missing plugin URI -- Add your plugin's URI " +
120 "as the 2nd argument to the JavaScript WebSocket constructor");
121 } else {
122 dms.getPlugin(pluginUri); // check plugin URI, throws if invalid
123 }
124 } catch (Exception e) {
125 throw new RuntimeException("Opening WebSocket connection failed", e);
126 }
127 }
128
129 // *** WebSocket ***
130
131 @Override
132 public void onOpen(Connection connection) {
133 logger.info("### Opening a WebSocket connection for plugin \"" + pluginUri + "\"");
134 this.connection = connection;
135 addConnection(pluginUri, connection);
136 }
137
138 @Override
139 public void onClose(int code, String message) {
140 logger.info("### Closing a WebSocket connection of plugin \"" + pluginUri + "\"");
141 removeConnection(pluginUri, connection);
142 }
143
144 // *** WebSocket.OnTextMessage ***
145
146 @Override
147 public void onMessage(String message) {
148 dms.deliverEvent(pluginUri, WEBSOCKET_TEXT_MESSAGE, message);
149 }
150
151 // *** WebSocket.OnBinaryMessage ***
152
153 @Override
154 public void onMessage(byte[] data, int offset, int length) {
155 // ### TODO
156 }
157 }
158 }