001package de.deepamehta.plugins.websockets;
002
003import de.deepamehta.plugins.websockets.event.WebsocketTextMessageListener;
004import de.deepamehta.core.service.DeepaMehtaEvent;
005import de.deepamehta.core.service.DeepaMehtaService;
006import de.deepamehta.core.service.EventListener;
007
008import org.eclipse.jetty.server.Server;
009import org.eclipse.jetty.server.nio.SelectChannelConnector;
010import org.eclipse.jetty.websocket.WebSocket;
011import org.eclipse.jetty.websocket.WebSocket.Connection;
012import org.eclipse.jetty.websocket.WebSocketHandler;
013
014import javax.servlet.http.HttpServletRequest;
015
016import java.util.Map;
017import java.util.Queue;
018import java.util.concurrent.ConcurrentHashMap;
019import java.util.concurrent.ConcurrentLinkedQueue;
020import java.util.logging.Level;
021import java.util.logging.Logger;
022
023
024
025class WebSocketsServer extends Server {
026
027    // ------------------------------------------------------------------------------------------------------- Constants
028
029    // Events
030    private static DeepaMehtaEvent WEBSOCKET_TEXT_MESSAGE = new DeepaMehtaEvent(WebsocketTextMessageListener.class) {
031        @Override
032        public void deliver(EventListener listener, Object... params) {
033            ((WebsocketTextMessageListener) listener).websocketTextMessage(
034                (String) params[0]
035            );
036        }
037    };
038    // ### TODO: define further events, OPEN, CLOSE, BINARY_MESSAGE, ...
039
040    // ---------------------------------------------------------------------------------------------- Instance Variables
041
042    private Map<String, Queue<Connection>> pluginConnections = new ConcurrentHashMap();
043
044    private DeepaMehtaService dms;
045
046    private Logger logger = Logger.getLogger(getClass().getName());
047
048    // ----------------------------------------------------------------------------------------------------- Constructor
049
050    WebSocketsServer(int port, DeepaMehtaService dms) {
051        this.dms = dms;
052        //
053        // add connector
054        SelectChannelConnector connector = new SelectChannelConnector();
055        connector.setPort(port);
056        addConnector(connector);
057        //
058        // set WebSocket handler
059        setHandler(new WebSocketHandler() {
060            @Override
061            public WebSocket doWebSocketConnect(HttpServletRequest request, String protocol) {
062                return new PluginWebSocket(protocol);
063            }
064        });
065    }
066
067    // ----------------------------------------------------------------------------------------- Package Private Methods
068
069    void broadcast(String pluginUri, String message) {
070        Queue<Connection> connections = getConnections(pluginUri);
071        if (connections != null) {
072            for (Connection connection : connections) {
073                try {
074                    connection.sendMessage(message);
075                } catch (Exception e) {
076                    removeConnection(pluginUri, connection);
077                    logger.log(Level.SEVERE, "Sending message via " + connection + " failed -- connection removed", e);
078                }
079            }
080        }
081    }
082
083    // ------------------------------------------------------------------------------------------------- Private Methods
084
085    /**
086     * Returns the open WebSocket connections associated to the given plugin, or <code>null</code> if there are none.
087     */
088    private Queue<Connection> getConnections(String pluginUri) {
089        return pluginConnections.get(pluginUri);
090    }
091
092    private void addConnection(String pluginUri, Connection connection) {
093        Queue<Connection> connections = getConnections(pluginUri);
094        if (connections == null) {
095            connections = new ConcurrentLinkedQueue<Connection>();
096            pluginConnections.put(pluginUri, connections);
097        }
098        connections.add(connection);
099    }
100
101    private void removeConnection(String pluginUri, Connection connection) {
102        boolean removed = getConnections(pluginUri).remove(connection);
103        if (!removed) {
104            throw new RuntimeException("Removing a connection of plugin \"" + pluginUri + "\" failed");
105        }
106    }
107
108    // ------------------------------------------------------------------------------------------------- Private Classes
109
110    private class PluginWebSocket implements WebSocket, WebSocket.OnTextMessage, WebSocket.OnBinaryMessage {
111
112        private String pluginUri;
113        private Connection connection;
114
115        private PluginWebSocket(String pluginUri) {
116            this.pluginUri = pluginUri;
117            try {
118                if (pluginUri == null) {
119                    throw new RuntimeException("Missing plugin URI -- Add your plugin's URI " +
120                        "as the 2nd argument to the JavaScript WebSocket constructor");
121                } else {
122                    dms.getPlugin(pluginUri);   // check plugin URI, throws if invalid
123                }
124            } catch (Exception e) {
125                throw new RuntimeException("Opening WebSocket connection failed", e);
126            }
127        }
128
129        // *** WebSocket ***
130
131        @Override
132        public void onOpen(Connection connection) {
133            logger.info("### Opening a WebSocket connection for plugin \"" + pluginUri + "\"");
134            this.connection = connection;
135            addConnection(pluginUri, connection);
136        }
137
138        @Override
139        public void onClose(int code, String message) {
140            logger.info("### Closing a WebSocket connection of plugin \"" + pluginUri + "\"");
141            removeConnection(pluginUri, connection);
142        }
143
144        // *** WebSocket.OnTextMessage ***
145
146        @Override
147        public void onMessage(String message) {
148            dms.deliverEvent(pluginUri, WEBSOCKET_TEXT_MESSAGE, message);
149        }
150
151        // *** WebSocket.OnBinaryMessage ***
152
153        @Override
154        public void onMessage(byte[] data, int offset, int length) {
155            // ### TODO
156        }
157    }
158}