001    package de.deepamehta.plugins.websockets;
002    
003    import de.deepamehta.plugins.websockets.event.WebsocketTextMessageListener;
004    import de.deepamehta.core.service.DeepaMehtaEvent;
005    import de.deepamehta.core.service.DeepaMehtaService;
006    import de.deepamehta.core.service.EventListener;
007    
008    import org.eclipse.jetty.server.Server;
009    import org.eclipse.jetty.server.nio.SelectChannelConnector;
010    import org.eclipse.jetty.websocket.WebSocket;
011    import org.eclipse.jetty.websocket.WebSocket.Connection;
012    import org.eclipse.jetty.websocket.WebSocketHandler;
013    
014    import javax.servlet.http.HttpServletRequest;
015    
016    import java.util.Map;
017    import java.util.Queue;
018    import java.util.concurrent.ConcurrentHashMap;
019    import java.util.concurrent.ConcurrentLinkedQueue;
020    import java.util.logging.Level;
021    import java.util.logging.Logger;
022    
023    
024    
025    class WebSocketsServer extends Server {
026    
027        // ------------------------------------------------------------------------------------------------------- Constants
028    
029        // Events
030        private static DeepaMehtaEvent WEBSOCKET_TEXT_MESSAGE = new DeepaMehtaEvent(WebsocketTextMessageListener.class) {
031            @Override
032            public void deliver(EventListener listener, Object... params) {
033                ((WebsocketTextMessageListener) listener).websocketTextMessage(
034                    (String) params[0]
035                );
036            }
037        };
038        // ### TODO: define further events, OPEN, CLOSE, BINARY_MESSAGE, ...
039    
040        // ---------------------------------------------------------------------------------------------- Instance Variables
041    
042        private Map<String, Queue<Connection>> pluginConnections = new ConcurrentHashMap();
043    
044        private DeepaMehtaService dms;
045    
046        private Logger logger = Logger.getLogger(getClass().getName());
047    
048        // ----------------------------------------------------------------------------------------------------- Constructor
049    
050        WebSocketsServer(int port, DeepaMehtaService dms) {
051            this.dms = dms;
052            //
053            // add connector
054            SelectChannelConnector connector = new SelectChannelConnector();
055            connector.setPort(port);
056            addConnector(connector);
057            //
058            // set WebSocket handler
059            setHandler(new WebSocketHandler() {
060                @Override
061                public WebSocket doWebSocketConnect(HttpServletRequest request, String protocol) {
062                    return new PluginWebSocket(protocol);
063                }
064            });
065        }
066    
067        // ----------------------------------------------------------------------------------------- Package Private Methods
068    
069        void broadcast(String pluginUri, String message) {
070            Queue<Connection> connections = getConnections(pluginUri);
071            if (connections != null) {
072                for (Connection connection : connections) {
073                    try {
074                        connection.sendMessage(message);
075                    } catch (Exception e) {
076                        removeConnection(pluginUri, connection);
077                        logger.log(Level.SEVERE, "Sending message via " + connection + " failed -- connection removed", e);
078                    }
079                }
080            }
081        }
082    
083        // ------------------------------------------------------------------------------------------------- Private Methods
084    
085        /**
086         * Returns the open WebSocket connections associated to the given plugin, or <code>null</code> if there are none.
087         */
088        private Queue<Connection> getConnections(String pluginUri) {
089            return pluginConnections.get(pluginUri);
090        }
091    
092        private void addConnection(String pluginUri, Connection connection) {
093            Queue<Connection> connections = getConnections(pluginUri);
094            if (connections == null) {
095                connections = new ConcurrentLinkedQueue<Connection>();
096                pluginConnections.put(pluginUri, connections);
097            }
098            connections.add(connection);
099        }
100    
101        private void removeConnection(String pluginUri, Connection connection) {
102            boolean removed = getConnections(pluginUri).remove(connection);
103            if (!removed) {
104                throw new RuntimeException("Removing a connection of plugin \"" + pluginUri + "\" failed");
105            }
106        }
107    
108        // ------------------------------------------------------------------------------------------------- Private Classes
109    
110        private class PluginWebSocket implements WebSocket, WebSocket.OnTextMessage, WebSocket.OnBinaryMessage {
111    
112            private String pluginUri;
113            private Connection connection;
114    
115            private PluginWebSocket(String pluginUri) {
116                this.pluginUri = pluginUri;
117                try {
118                    if (pluginUri == null) {
119                        throw new RuntimeException("Missing plugin URI -- Add your plugin's URI " +
120                            "as the 2nd argument to the JavaScript WebSocket constructor");
121                    } else {
122                        dms.getPlugin(pluginUri);   // check plugin URI, throws if invalid
123                    }
124                } catch (Exception e) {
125                    throw new RuntimeException("Opening WebSocket connection failed", e);
126                }
127            }
128    
129            // *** WebSocket ***
130    
131            @Override
132            public void onOpen(Connection connection) {
133                logger.info("### Opening a WebSocket connection for plugin \"" + pluginUri + "\"");
134                this.connection = connection;
135                addConnection(pluginUri, connection);
136            }
137    
138            @Override
139            public void onClose(int code, String message) {
140                logger.info("### Closing a WebSocket connection of plugin \"" + pluginUri + "\"");
141                removeConnection(pluginUri, connection);
142            }
143    
144            // *** WebSocket.OnTextMessage ***
145    
146            @Override
147            public void onMessage(String message) {
148                dms.deliverEvent(pluginUri, WEBSOCKET_TEXT_MESSAGE, message);
149            }
150    
151            // *** WebSocket.OnBinaryMessage ***
152    
153            @Override
154            public void onMessage(byte[] data, int offset, int length) {
155                // ### TODO
156            }
157        }
158    }