001    package org.deepamehta.plugins.subscriptions;
002    
003    import de.deepamehta.core.Association;
004    import de.deepamehta.core.DeepaMehtaObject;
005    import de.deepamehta.core.RelatedTopic;
006    import de.deepamehta.core.Topic;
007    import de.deepamehta.core.model.*;
008    import de.deepamehta.core.osgi.PluginActivator;
009    import de.deepamehta.core.service.ClientState;
010    import de.deepamehta.core.service.Directives;
011    import de.deepamehta.core.service.PluginService;
012    import de.deepamehta.core.service.ResultList;
013    import de.deepamehta.core.service.annotation.ConsumesService;
014    import de.deepamehta.core.storage.spi.DeepaMehtaTransaction;
015    import de.deepamehta.plugins.accesscontrol.service.AccessControlService;
016    import de.deepamehta.plugins.websockets.event.WebsocketTextMessageListener;
017    import de.deepamehta.plugins.websockets.service.WebSocketsService;
018    import java.util.ArrayList;
019    import java.util.Iterator;
020    import java.util.List;
021    import java.util.logging.Logger;
022    import javax.ws.rs.*;
023    import javax.ws.rs.core.Response;
024    import org.deepamehta.plugins.subscriptions.service.SubscriptionService;
025    
026    /**
027     *
028     * A DeepaMehta 4 Plugin introducing notifications on subscribed topics based on dm4-websockets.
029     *
030     * @author Malte Reißig (<malte@mikromedia.de>)
031     * @website https://github.com/mukil/dm4-subscriptions
032     * @version 1.0.1-SNAPSHOT
033     *
034     */
035    
036    @Path("/subscriptions")
037    public class SubscriptionsPlugin extends PluginActivator implements SubscriptionService,
038                                                                        WebsocketTextMessageListener {
039    
040        private static Logger log = Logger.getLogger(SubscriptionsPlugin.class.getName());
041    
042        private static final String NOTIFICATION_TYPE = "org.deepamehta.subscriptions.notification";
043        private static final String NOTIFICATION_TITLE_TYPE = "org.deepamehta.subscriptions.notification_title";
044        private static final String NOTIFICATION_BODY_TYPE = "org.deepamehta.subscriptions.notification_body";
045        private static final String NOTIFICATION_INVOLVED_ITEM_ID_TYPE = "org.deepamehta.subscriptions.involved_item_id";
046        private static final String NOTIFICATION_SUB_ITEM_ID_TYPE = "org.deepamehta.subscriptions.subscribed_item_id";
047        private static final String NOTIFICATION_RECIPIENT_EDGE_TYPE =
048                "org.deepamehta.subscriptions.notification_recipient_edge";
049        private static final String SUBSCRIPTION_EDGE_TYPE = "org.deepamehta.subscriptions.subscription_edge";
050        private static final String NOTIFICATION_SEEN_TYPE = "org.deepamehta.subscriptions.notification_seen";
051    
052        // These two types of information can currently be subscribed (with their special semantics)
053        private static final String USER_ACCOUNT_TYPE = "dm4.accesscontrol.user_account";
054        private static final String DEEPAMEHTA_TAG_TYPE = "dm4.tags.tag";
055    
056        private static final String DEFAULT_ROLE_TYPE = "dm4.core.default";
057    
058        private AccessControlService aclService = null;
059        private WebSocketsService webSocketsService = null;
060    
061    
062        // --- Hook Implementations
063    
064    
065        @Override
066        @ConsumesService({
067            "de.deepamehta.plugins.accesscontrol.service.AccessControlService",
068            "de.deepamehta.plugins.websockets.service.WebSocketsService"
069        })
070        public void serviceArrived(PluginService service) {
071            if (service instanceof AccessControlService) {
072                aclService = (AccessControlService) service;
073            } else if (service instanceof WebSocketsService) {
074                webSocketsService = (WebSocketsService) service;
075            }
076        }
077    
078        @ConsumesService({
079            "de.deepamehta.plugins.accesscontrol.service.AccessControlService",
080            "de.deepamehta.plugins.websockets.service.WebSocketsService"
081        })
082        public void serviceGone(PluginService service) {
083            if (service instanceof AccessControlService) {
084                aclService = null;
085            } else if (service instanceof WebSocketsService) {
086                webSocketsService = null;
087            }
088        }
089    
090        @GET
091        @Path("/subscribe/{itemId}")
092        public Response subscribeUser(@PathParam("itemId") long itemId, @HeaderParam("Cookie") ClientState clientState) {
093            // 0) Check for any session
094            String logged_in_username = aclService.getUsername();
095            if (!logged_in_username.isEmpty()) {
096                Topic user = dms.getTopic("dm4.accesscontrol.username", new SimpleValue(logged_in_username), false);
097                Topic account = user.getRelatedTopic("dm4.core.composition", "dm4.core.child",
098                        "dm4.core.parent", "dm4.accesscontrol.user_account", true, false);
099                // 1) Users can just manage their own subscriptions
100                subscribe(account.getId(), itemId, clientState);
101                return Response.ok().build();
102            }
103            return Response.noContent().build();
104        }
105    
106        @GET
107        @Path("/unsubscribe/{itemId}")
108        public Response unsubscribeUser(@PathParam("itemId") long itemId) {
109            // 0) Check for any session
110            String logged_in_username = aclService.getUsername();
111            if (!logged_in_username.isEmpty()) {
112                Topic user = dms.getTopic("dm4.accesscontrol.username", new SimpleValue(logged_in_username), false);
113                Topic account = user.getRelatedTopic("dm4.core.composition", "dm4.core.child",
114                        "dm4.core.parent", "dm4.accesscontrol.user_account", true, false);
115                // 1) Users can just manage their own subscriptions
116                unsubscribe(account.getId(), itemId);
117                return Response.ok().build();
118            }
119            return Response.noContent().build();
120        }
121    
122        @GET
123        @Path("/list")
124        public ResultList<RelatedTopic> getSubscriptions() {
125            // 0) Check for any session
126            String logged_in_username = aclService.getUsername();
127            if (logged_in_username == null || logged_in_username.isEmpty()) return null;
128            Topic user = dms.getTopic("dm4.accesscontrol.username", new SimpleValue(logged_in_username), false);
129            Topic account = user.getRelatedTopic("dm4.core.composition", "dm4.core.child", "dm4.core.parent",
130                    "dm4.accesscontrol.user_account", false, false);
131            // 1) Return results
132            log.info("Listing all subscriptions of user " + account.getSimpleValue());
133            return account.getRelatedTopics(SUBSCRIPTION_EDGE_TYPE, 0);
134        }
135    
136        @GET
137        @Path("/notification/seen/{newsId}")
138        public boolean setNotificationSeen(@PathParam("newsId") long newsId) {
139            DeepaMehtaTransaction tx = dms.beginTx();
140            try {
141                // 0) Check for any session
142                String logged_in_username = aclService.getUsername();
143                if (logged_in_username == null || logged_in_username.isEmpty()) throw new RuntimeException();
144                Topic notification = dms.getTopic(newsId, true);
145                notification.getCompositeValue().set(NOTIFICATION_SEEN_TYPE, true, null, new Directives());
146                // 1) Do operation
147                log.info("Set notification " + newsId + " as seen!");
148                tx.success();
149                return true;
150            } catch (Exception e) {
151                tx.failure();
152                log.warning("Could NOT set notification " + newsId + " as seen! Caused by: " + e.getMessage());
153                return false;
154            } finally {
155                tx.finish();
156            }
157        }
158    
159        @GET
160        @Path("/notifications/all")
161        public ResultList<RelatedTopic> getAllNotificationsForUser() {
162            return getAllNotifications();
163        }
164    
165        @GET
166        @Path("/notifications/unseen")
167        public ArrayList<RelatedTopic> getAllUnseenNotificationsForUser() {
168            return getAllUnseenNotifications();
169        }
170    
171        @Override
172        public void subscribe(long accountId, long itemId, ClientState clientState) {
173            DeepaMehtaTransaction tx = dms.beginTx();
174            try {
175                // 1)
176                Topic itemToSubscribe = dms.getTopic(itemId, false);
177                if (!itemToSubscribe.getTypeUri().equals(DEEPAMEHTA_TAG_TYPE)
178                        && !itemToSubscribe.getTypeUri().equals(USER_ACCOUNT_TYPE)) {
179                    throw new RuntimeException("Subscription are only supported for topics of type "
180                            + "\"User Account\" or \"Tag\" - Skipping creation of subscription");
181                }
182                // 2) Create subscriptions (if not alreay existent)
183                if (!associationExists(SUBSCRIPTION_EDGE_TYPE, itemId, accountId)) {
184                    AssociationModel model = new AssociationModel(SUBSCRIPTION_EDGE_TYPE,
185                        new TopicRoleModel(accountId, DEFAULT_ROLE_TYPE),
186                        new TopicRoleModel(itemId, DEFAULT_ROLE_TYPE),
187                        new CompositeValueModel().addRef("org.deepamehta.subscriptions.subscription_type",
188                        "org.deepamehta.subscriptions.in_app_subscription"));
189                    dms.createAssociation(model, clientState);
190                    log.info("New subscription for user:" + accountId + " to item:" + itemId);
191                } else {
192                    log.info("Subscription already exists between " + accountId + " and " + itemId);
193                }
194                tx.success();
195            } catch (Exception e) {
196                log.warning("Exception " + e.getMessage());
197                tx.failure();
198            } finally {
199                tx.finish();
200            }
201        }
202    
203        @Override
204        public void unsubscribe(long accountId, long itemId) {
205            List<Association> assocs = dms.getAssociations(accountId, itemId, SUBSCRIPTION_EDGE_TYPE);
206            Iterator<Association> iterator = assocs.iterator();
207            while (iterator.hasNext()) {
208                Association assoc = iterator.next();
209                dms.deleteAssociation(assoc.getId());
210            }
211        }
212    
213        @Override
214        public void createNotifications(String title, String message, long actionAccountId, DeepaMehtaObject item) {
215    
216            if (item.getTypeUri().equals(USER_ACCOUNT_TYPE)) {
217                // 1) create notifications for all directl subscribers of creates|edits of this user topic
218                log.info("Notifying subscribers of user account \"" + item.getSimpleValue() + "\"");
219                createNotifications(title, "", actionAccountId, item);
220            } else {
221                // 1) create notifications for all subscribers of all the tags this (created|edited) topic is tagged with
222                if (item.getModel().getCompositeValueModel().has(DEEPAMEHTA_TAG_TYPE)) {
223                    // 2) check all tags
224                    List<TopicModel> tags = item.getModel().getCompositeValueModel().getTopics(DEEPAMEHTA_TAG_TYPE);
225                    for (TopicModel tag : tags) {
226                        Topic tag_node = dms.getTopic(tag.getId(), true);
227                        log.info("Notifying subscribers of tag \"" + tag_node.getSimpleValue() + "\"");
228                        // for all subscribers of this tag
229                        createNotificationTopics(title, "", actionAccountId, item, tag_node);
230                    }
231                }
232                webSocketsService.broadcast("org.deepamehta.subscriptions", "Check notifications for the logged-in user.");
233            }
234    
235        }
236    
237        @Override
238        public ResultList<RelatedTopic> getAllNotifications() {
239            String logged_in_username = aclService.getUsername();
240            if (logged_in_username == null || logged_in_username.isEmpty()) return null;
241            Topic user = dms.getTopic("dm4.accesscontrol.username", new SimpleValue(logged_in_username), false);
242            Topic account = user.getRelatedTopic("dm4.core.composition", "dm4.core.child", "dm4.core.parent",
243                    "dm4.accesscontrol.user_account", false, false);
244            //
245            ResultList<RelatedTopic> results = account.getRelatedTopics(NOTIFICATION_RECIPIENT_EDGE_TYPE, "dm4.core.default", "dm4.core.default",
246                    NOTIFICATION_TYPE, true, false, 0);
247            log.info("Fetching " +results.getSize()+ " notifications for user " + account.getSimpleValue());
248            return results;
249        }
250    
251        @Override
252        public ArrayList<RelatedTopic> getAllUnseenNotifications() {
253            String logged_in_username = aclService.getUsername();
254            if (logged_in_username == null || logged_in_username.isEmpty()) return null;
255            Topic user = dms.getTopic("dm4.accesscontrol.username", new SimpleValue(logged_in_username), false);
256            Topic account = user.getRelatedTopic("dm4.core.composition", "dm4.core.child", "dm4.core.parent",
257                    "dm4.accesscontrol.user_account", false, false);
258            //
259            ArrayList<RelatedTopic> unseen = new ArrayList<RelatedTopic>();
260            ResultList<RelatedTopic> results = account.getRelatedTopics(NOTIFICATION_RECIPIENT_EDGE_TYPE, "dm4.core.default", "dm4.core.default",
261                    NOTIFICATION_TYPE, true, false, 0);
262            for (RelatedTopic notification : results.getItems()) {
263                boolean seen_child = notification.getCompositeValue().getBoolean(NOTIFICATION_SEEN_TYPE);
264                if (!seen_child) {
265                    unseen.add(notification);
266                }
267            }
268            log.info("Fetching " +unseen.size() + " unseen notifications for user " + account.getSimpleValue());
269            return unseen;
270        }
271    
272        @Override
273        public void websocketTextMessage(String message) {
274            log.info("### Receiving message from WebSocket client: \"" + message + "\"");
275    
276        }
277    
278        private void createNotificationTopics(String title, String text, long accountId, DeepaMehtaObject involvedItem) {
279            createNotificationTopics(title, text, accountId, involvedItem, null);
280        }
281    
282        private void createNotificationTopics(String title, String text, long accountId, DeepaMehtaObject involvedItem,
283                DeepaMehtaObject subscribedItem) {
284            // 0) Fetch all subscribers of item X
285            ResultList<RelatedTopic> subscribers = null;
286            long subscribedItemId = 0;
287            if (subscribedItem != null) { // fetch subscribers of subscribedItem
288                subscribers = subscribedItem.getRelatedTopics(SUBSCRIPTION_EDGE_TYPE,
289                    DEFAULT_ROLE_TYPE,  DEFAULT_ROLE_TYPE,  "dm4.accesscontrol.user_account", true, false, 0);
290                subscribedItemId = subscribedItem.getId();
291            } else { // fetch subscribers of involvedItem
292                subscribers = involvedItem.getRelatedTopics(SUBSCRIPTION_EDGE_TYPE,
293                    DEFAULT_ROLE_TYPE,  DEFAULT_ROLE_TYPE,  "dm4.accesscontrol.user_account", true, false, 0);
294            }
295            for (RelatedTopic subscriber : subscribers) {
296                if (subscriber.getId() != accountId) {
297                    log.fine("> subscription is valid, notifying user " + subscriber.getSimpleValue());
298                    // 1) Create notification instance
299                    CompositeValueModel message = new CompositeValueModel()
300                            .put(NOTIFICATION_SEEN_TYPE, false)
301                            .put(NOTIFICATION_TITLE_TYPE, title)
302                            .put(NOTIFICATION_BODY_TYPE, text)
303                            .put(NOTIFICATION_SUB_ITEM_ID_TYPE, subscribedItemId)
304                            .putRef(USER_ACCOUNT_TYPE, accountId)
305                            .put(NOTIFICATION_INVOLVED_ITEM_ID_TYPE, involvedItem.getId());
306                    TopicModel model = new TopicModel(NOTIFICATION_TYPE, message);
307                    dms.createTopic(model, null); // check: is system the creator?
308                    // 2) Hook up notification with subscriber
309                    AssociationModel recipient_model = new AssociationModel(NOTIFICATION_RECIPIENT_EDGE_TYPE,
310                            model.createRoleModel(DEFAULT_ROLE_TYPE),
311                            new TopicRoleModel(subscriber.getId(), DEFAULT_ROLE_TYPE));
312                    dms.createAssociation(recipient_model, null); // check: is system the creator?
313                }
314            }
315        }
316    
317        private boolean associationExists(String edge_type, long itemId, long accountId) {
318            List<Association> results = dms.getAssociations(itemId, accountId, edge_type);
319            return (results.size() > 0) ? true : false;
320        }
321    
322    }