001 package org.deepamehta.plugins.subscriptions; 002 003 import de.deepamehta.core.Association; 004 import de.deepamehta.core.DeepaMehtaObject; 005 import de.deepamehta.core.RelatedTopic; 006 import de.deepamehta.core.Topic; 007 import de.deepamehta.core.model.*; 008 import de.deepamehta.core.osgi.PluginActivator; 009 import de.deepamehta.core.service.ClientState; 010 import de.deepamehta.core.service.Directives; 011 import de.deepamehta.core.service.PluginService; 012 import de.deepamehta.core.service.ResultList; 013 import de.deepamehta.core.service.annotation.ConsumesService; 014 import de.deepamehta.core.storage.spi.DeepaMehtaTransaction; 015 import de.deepamehta.plugins.accesscontrol.service.AccessControlService; 016 import de.deepamehta.plugins.websockets.event.WebsocketTextMessageListener; 017 import de.deepamehta.plugins.websockets.service.WebSocketsService; 018 import java.util.ArrayList; 019 import java.util.Iterator; 020 import java.util.List; 021 import java.util.logging.Logger; 022 import javax.ws.rs.*; 023 import javax.ws.rs.core.Response; 024 import org.deepamehta.plugins.subscriptions.service.SubscriptionService; 025 026 /** 027 * 028 * A DeepaMehta 4 Plugin introducing notifications on subscribed topics based on dm4-websockets. 029 * 030 * @author Malte Reißig (<malte@mikromedia.de>) 031 * @website https://github.com/mukil/dm4-subscriptions 032 * @version 1.0.1-SNAPSHOT 033 * 034 */ 035 036 @Path("/subscriptions") 037 public class SubscriptionsPlugin extends PluginActivator implements SubscriptionService, 038 WebsocketTextMessageListener { 039 040 private static Logger log = Logger.getLogger(SubscriptionsPlugin.class.getName()); 041 042 private static final String NOTIFICATION_TYPE = "org.deepamehta.subscriptions.notification"; 043 private static final String NOTIFICATION_TITLE_TYPE = "org.deepamehta.subscriptions.notification_title"; 044 private static final String NOTIFICATION_BODY_TYPE = "org.deepamehta.subscriptions.notification_body"; 045 private static final String NOTIFICATION_INVOLVED_ITEM_ID_TYPE = "org.deepamehta.subscriptions.involved_item_id"; 046 private static final String NOTIFICATION_SUB_ITEM_ID_TYPE = "org.deepamehta.subscriptions.subscribed_item_id"; 047 private static final String NOTIFICATION_RECIPIENT_EDGE_TYPE = 048 "org.deepamehta.subscriptions.notification_recipient_edge"; 049 private static final String SUBSCRIPTION_EDGE_TYPE = "org.deepamehta.subscriptions.subscription_edge"; 050 private static final String NOTIFICATION_SEEN_TYPE = "org.deepamehta.subscriptions.notification_seen"; 051 052 // These two types of information can currently be subscribed (with their special semantics) 053 private static final String USER_ACCOUNT_TYPE = "dm4.accesscontrol.user_account"; 054 private static final String DEEPAMEHTA_TAG_TYPE = "dm4.tags.tag"; 055 056 private static final String DEFAULT_ROLE_TYPE = "dm4.core.default"; 057 058 private AccessControlService aclService = null; 059 private WebSocketsService webSocketsService = null; 060 061 062 // --- Hook Implementations 063 064 065 @Override 066 @ConsumesService({ 067 "de.deepamehta.plugins.accesscontrol.service.AccessControlService", 068 "de.deepamehta.plugins.websockets.service.WebSocketsService" 069 }) 070 public void serviceArrived(PluginService service) { 071 if (service instanceof AccessControlService) { 072 aclService = (AccessControlService) service; 073 } else if (service instanceof WebSocketsService) { 074 webSocketsService = (WebSocketsService) service; 075 } 076 } 077 078 @ConsumesService({ 079 "de.deepamehta.plugins.accesscontrol.service.AccessControlService", 080 "de.deepamehta.plugins.websockets.service.WebSocketsService" 081 }) 082 public void serviceGone(PluginService service) { 083 if (service instanceof AccessControlService) { 084 aclService = null; 085 } else if (service instanceof WebSocketsService) { 086 webSocketsService = null; 087 } 088 } 089 090 @GET 091 @Path("/subscribe/{itemId}") 092 public Response subscribeUser(@PathParam("itemId") long itemId, @HeaderParam("Cookie") ClientState clientState) { 093 // 0) Check for any session 094 String logged_in_username = aclService.getUsername(); 095 if (!logged_in_username.isEmpty()) { 096 Topic user = dms.getTopic("dm4.accesscontrol.username", new SimpleValue(logged_in_username), false); 097 Topic account = user.getRelatedTopic("dm4.core.composition", "dm4.core.child", 098 "dm4.core.parent", "dm4.accesscontrol.user_account", true, false); 099 // 1) Users can just manage their own subscriptions 100 subscribe(account.getId(), itemId, clientState); 101 return Response.ok().build(); 102 } 103 return Response.noContent().build(); 104 } 105 106 @GET 107 @Path("/unsubscribe/{itemId}") 108 public Response unsubscribeUser(@PathParam("itemId") long itemId) { 109 // 0) Check for any session 110 String logged_in_username = aclService.getUsername(); 111 if (!logged_in_username.isEmpty()) { 112 Topic user = dms.getTopic("dm4.accesscontrol.username", new SimpleValue(logged_in_username), false); 113 Topic account = user.getRelatedTopic("dm4.core.composition", "dm4.core.child", 114 "dm4.core.parent", "dm4.accesscontrol.user_account", true, false); 115 // 1) Users can just manage their own subscriptions 116 unsubscribe(account.getId(), itemId); 117 return Response.ok().build(); 118 } 119 return Response.noContent().build(); 120 } 121 122 @GET 123 @Path("/list") 124 public ResultList<RelatedTopic> getSubscriptions() { 125 // 0) Check for any session 126 String logged_in_username = aclService.getUsername(); 127 if (logged_in_username == null || logged_in_username.isEmpty()) return null; 128 Topic user = dms.getTopic("dm4.accesscontrol.username", new SimpleValue(logged_in_username), false); 129 Topic account = user.getRelatedTopic("dm4.core.composition", "dm4.core.child", "dm4.core.parent", 130 "dm4.accesscontrol.user_account", false, false); 131 // 1) Return results 132 log.info("Listing all subscriptions of user " + account.getSimpleValue()); 133 return account.getRelatedTopics(SUBSCRIPTION_EDGE_TYPE, 0); 134 } 135 136 @GET 137 @Path("/notification/seen/{newsId}") 138 public boolean setNotificationSeen(@PathParam("newsId") long newsId) { 139 DeepaMehtaTransaction tx = dms.beginTx(); 140 try { 141 // 0) Check for any session 142 String logged_in_username = aclService.getUsername(); 143 if (logged_in_username == null || logged_in_username.isEmpty()) throw new RuntimeException(); 144 Topic notification = dms.getTopic(newsId, true); 145 notification.getCompositeValue().set(NOTIFICATION_SEEN_TYPE, true, null, new Directives()); 146 // 1) Do operation 147 log.info("Set notification " + newsId + " as seen!"); 148 tx.success(); 149 return true; 150 } catch (Exception e) { 151 tx.failure(); 152 log.warning("Could NOT set notification " + newsId + " as seen! Caused by: " + e.getMessage()); 153 return false; 154 } finally { 155 tx.finish(); 156 } 157 } 158 159 @GET 160 @Path("/notifications/all") 161 public ResultList<RelatedTopic> getAllNotificationsForUser() { 162 return getAllNotifications(); 163 } 164 165 @GET 166 @Path("/notifications/unseen") 167 public ArrayList<RelatedTopic> getAllUnseenNotificationsForUser() { 168 return getAllUnseenNotifications(); 169 } 170 171 @Override 172 public void subscribe(long accountId, long itemId, ClientState clientState) { 173 DeepaMehtaTransaction tx = dms.beginTx(); 174 try { 175 // 1) 176 Topic itemToSubscribe = dms.getTopic(itemId, false); 177 if (!itemToSubscribe.getTypeUri().equals(DEEPAMEHTA_TAG_TYPE) 178 && !itemToSubscribe.getTypeUri().equals(USER_ACCOUNT_TYPE)) { 179 throw new RuntimeException("Subscription are only supported for topics of type " 180 + "\"User Account\" or \"Tag\" - Skipping creation of subscription"); 181 } 182 // 2) Create subscriptions (if not alreay existent) 183 if (!associationExists(SUBSCRIPTION_EDGE_TYPE, itemId, accountId)) { 184 AssociationModel model = new AssociationModel(SUBSCRIPTION_EDGE_TYPE, 185 new TopicRoleModel(accountId, DEFAULT_ROLE_TYPE), 186 new TopicRoleModel(itemId, DEFAULT_ROLE_TYPE), 187 new CompositeValueModel().addRef("org.deepamehta.subscriptions.subscription_type", 188 "org.deepamehta.subscriptions.in_app_subscription")); 189 dms.createAssociation(model, clientState); 190 log.info("New subscription for user:" + accountId + " to item:" + itemId); 191 } else { 192 log.info("Subscription already exists between " + accountId + " and " + itemId); 193 } 194 tx.success(); 195 } catch (Exception e) { 196 log.warning("Exception " + e.getMessage()); 197 tx.failure(); 198 } finally { 199 tx.finish(); 200 } 201 } 202 203 @Override 204 public void unsubscribe(long accountId, long itemId) { 205 List<Association> assocs = dms.getAssociations(accountId, itemId, SUBSCRIPTION_EDGE_TYPE); 206 Iterator<Association> iterator = assocs.iterator(); 207 while (iterator.hasNext()) { 208 Association assoc = iterator.next(); 209 dms.deleteAssociation(assoc.getId()); 210 } 211 } 212 213 @Override 214 public void createNotifications(String title, String message, long actionAccountId, DeepaMehtaObject item) { 215 216 if (item.getTypeUri().equals(USER_ACCOUNT_TYPE)) { 217 // 1) create notifications for all directl subscribers of creates|edits of this user topic 218 log.info("Notifying subscribers of user account \"" + item.getSimpleValue() + "\""); 219 createNotifications(title, "", actionAccountId, item); 220 } else { 221 // 1) create notifications for all subscribers of all the tags this (created|edited) topic is tagged with 222 if (item.getModel().getCompositeValueModel().has(DEEPAMEHTA_TAG_TYPE)) { 223 // 2) check all tags 224 List<TopicModel> tags = item.getModel().getCompositeValueModel().getTopics(DEEPAMEHTA_TAG_TYPE); 225 for (TopicModel tag : tags) { 226 Topic tag_node = dms.getTopic(tag.getId(), true); 227 log.info("Notifying subscribers of tag \"" + tag_node.getSimpleValue() + "\""); 228 // for all subscribers of this tag 229 createNotificationTopics(title, "", actionAccountId, item, tag_node); 230 } 231 } 232 webSocketsService.broadcast("org.deepamehta.subscriptions", "Check notifications for the logged-in user."); 233 } 234 235 } 236 237 @Override 238 public ResultList<RelatedTopic> getAllNotifications() { 239 String logged_in_username = aclService.getUsername(); 240 if (logged_in_username == null || logged_in_username.isEmpty()) return null; 241 Topic user = dms.getTopic("dm4.accesscontrol.username", new SimpleValue(logged_in_username), false); 242 Topic account = user.getRelatedTopic("dm4.core.composition", "dm4.core.child", "dm4.core.parent", 243 "dm4.accesscontrol.user_account", false, false); 244 // 245 ResultList<RelatedTopic> results = account.getRelatedTopics(NOTIFICATION_RECIPIENT_EDGE_TYPE, "dm4.core.default", "dm4.core.default", 246 NOTIFICATION_TYPE, true, false, 0); 247 log.info("Fetching " +results.getSize()+ " notifications for user " + account.getSimpleValue()); 248 return results; 249 } 250 251 @Override 252 public ArrayList<RelatedTopic> getAllUnseenNotifications() { 253 String logged_in_username = aclService.getUsername(); 254 if (logged_in_username == null || logged_in_username.isEmpty()) return null; 255 Topic user = dms.getTopic("dm4.accesscontrol.username", new SimpleValue(logged_in_username), false); 256 Topic account = user.getRelatedTopic("dm4.core.composition", "dm4.core.child", "dm4.core.parent", 257 "dm4.accesscontrol.user_account", false, false); 258 // 259 ArrayList<RelatedTopic> unseen = new ArrayList<RelatedTopic>(); 260 ResultList<RelatedTopic> results = account.getRelatedTopics(NOTIFICATION_RECIPIENT_EDGE_TYPE, "dm4.core.default", "dm4.core.default", 261 NOTIFICATION_TYPE, true, false, 0); 262 for (RelatedTopic notification : results.getItems()) { 263 boolean seen_child = notification.getCompositeValue().getBoolean(NOTIFICATION_SEEN_TYPE); 264 if (!seen_child) { 265 unseen.add(notification); 266 } 267 } 268 log.info("Fetching " +unseen.size() + " unseen notifications for user " + account.getSimpleValue()); 269 return unseen; 270 } 271 272 @Override 273 public void websocketTextMessage(String message) { 274 log.info("### Receiving message from WebSocket client: \"" + message + "\""); 275 276 } 277 278 private void createNotificationTopics(String title, String text, long accountId, DeepaMehtaObject involvedItem) { 279 createNotificationTopics(title, text, accountId, involvedItem, null); 280 } 281 282 private void createNotificationTopics(String title, String text, long accountId, DeepaMehtaObject involvedItem, 283 DeepaMehtaObject subscribedItem) { 284 // 0) Fetch all subscribers of item X 285 ResultList<RelatedTopic> subscribers = null; 286 long subscribedItemId = 0; 287 if (subscribedItem != null) { // fetch subscribers of subscribedItem 288 subscribers = subscribedItem.getRelatedTopics(SUBSCRIPTION_EDGE_TYPE, 289 DEFAULT_ROLE_TYPE, DEFAULT_ROLE_TYPE, "dm4.accesscontrol.user_account", true, false, 0); 290 subscribedItemId = subscribedItem.getId(); 291 } else { // fetch subscribers of involvedItem 292 subscribers = involvedItem.getRelatedTopics(SUBSCRIPTION_EDGE_TYPE, 293 DEFAULT_ROLE_TYPE, DEFAULT_ROLE_TYPE, "dm4.accesscontrol.user_account", true, false, 0); 294 } 295 for (RelatedTopic subscriber : subscribers) { 296 if (subscriber.getId() != accountId) { 297 log.fine("> subscription is valid, notifying user " + subscriber.getSimpleValue()); 298 // 1) Create notification instance 299 CompositeValueModel message = new CompositeValueModel() 300 .put(NOTIFICATION_SEEN_TYPE, false) 301 .put(NOTIFICATION_TITLE_TYPE, title) 302 .put(NOTIFICATION_BODY_TYPE, text) 303 .put(NOTIFICATION_SUB_ITEM_ID_TYPE, subscribedItemId) 304 .putRef(USER_ACCOUNT_TYPE, accountId) 305 .put(NOTIFICATION_INVOLVED_ITEM_ID_TYPE, involvedItem.getId()); 306 TopicModel model = new TopicModel(NOTIFICATION_TYPE, message); 307 dms.createTopic(model, null); // check: is system the creator? 308 // 2) Hook up notification with subscriber 309 AssociationModel recipient_model = new AssociationModel(NOTIFICATION_RECIPIENT_EDGE_TYPE, 310 model.createRoleModel(DEFAULT_ROLE_TYPE), 311 new TopicRoleModel(subscriber.getId(), DEFAULT_ROLE_TYPE)); 312 dms.createAssociation(recipient_model, null); // check: is system the creator? 313 } 314 } 315 } 316 317 private boolean associationExists(String edge_type, long itemId, long accountId) { 318 List<Association> results = dms.getAssociations(itemId, accountId, edge_type); 319 return (results.size() > 0) ? true : false; 320 } 321 322 }