From b66b8217ea4daeae36914fdaa052deec75b746c9 Mon Sep 17 00:00:00 2001 From: cesmec Date: Mon, 8 Jul 2019 21:07:50 +0200 Subject: [PATCH 1/3] Added realtime updates --- src/main/scala/ScalatraBootstrap.scala | 4 + .../framework/manager/PluginManagerImpl.scala | 10 ++- .../instance/PluginInstance.scala | 7 ++ .../ui/web/rest/events/EventsController.scala | 74 +++++++++++++++++++ .../events/EventsControllerDefinition.scala | 21 ++++++ .../ui/web/rest/events/EventsDispatcher.scala | 20 +++++ 6 files changed, 135 insertions(+), 1 deletion(-) create mode 100644 src/main/scala/org/codeoverflow/chatoverflow/ui/web/rest/events/EventsController.scala create mode 100644 src/main/scala/org/codeoverflow/chatoverflow/ui/web/rest/events/EventsControllerDefinition.scala create mode 100644 src/main/scala/org/codeoverflow/chatoverflow/ui/web/rest/events/EventsDispatcher.scala diff --git a/src/main/scala/ScalatraBootstrap.scala b/src/main/scala/ScalatraBootstrap.scala index b0e4d37a..098487d2 100644 --- a/src/main/scala/ScalatraBootstrap.scala +++ b/src/main/scala/ScalatraBootstrap.scala @@ -1,6 +1,7 @@ import javax.servlet.ServletContext import org.codeoverflow.chatoverflow.ui.web.rest.config.ConfigController import org.codeoverflow.chatoverflow.ui.web.rest.connector.ConnectorController +import org.codeoverflow.chatoverflow.ui.web.rest.events.{EventsController, EventsDispatcher} import org.codeoverflow.chatoverflow.ui.web.rest.plugin.PluginInstanceController import org.codeoverflow.chatoverflow.ui.web.rest.types.TypeController import org.codeoverflow.chatoverflow.ui.web.{CodeOverflowSwagger, OpenAPIServlet} @@ -21,6 +22,9 @@ class ScalatraBootstrap extends LifeCycle { context.initParameters("org.scalatra.cors.allowedMethods") = "*" // Add all servlets and controller + val eventsController = new EventsController() + EventsDispatcher.init(eventsController) + context.mount(eventsController, "/events/*", "events") context.mount(new TypeController(), "/types/*", "types") context.mount(new ConfigController(), "/config/*", "config") context.mount(new PluginInstanceController(), "/instances/*", "instances") diff --git a/src/main/scala/org/codeoverflow/chatoverflow/framework/manager/PluginManagerImpl.scala b/src/main/scala/org/codeoverflow/chatoverflow/framework/manager/PluginManagerImpl.scala index 85a91c9b..188c539e 100644 --- a/src/main/scala/org/codeoverflow/chatoverflow/framework/manager/PluginManagerImpl.scala +++ b/src/main/scala/org/codeoverflow/chatoverflow/framework/manager/PluginManagerImpl.scala @@ -4,6 +4,9 @@ import java.util import org.codeoverflow.chatoverflow.WithLogger import org.codeoverflow.chatoverflow.api.plugin.{PluginLogMessage, PluginManager} +import org.codeoverflow.chatoverflow.ui.web.rest.events.EventsDispatcher +import org.json4s.DefaultFormats +import org.json4s.jackson.Serialization import scala.collection.JavaConverters._ import scala.collection.mutable.ListBuffer @@ -23,11 +26,16 @@ class PluginManagerImpl(pluginInstanceName: String, logOutputOnConsole: Boolean) * @param message the message to show */ override def log(message: String): Unit = { - logMessages += new PluginLogMessage(message) + val logMessage = new PluginLogMessage(message) + logMessages += logMessage if (logOutputOnConsole) { logger info s"[$pluginInstanceName] $message" } + + implicit val formats: DefaultFormats.type = DefaultFormats + val data = Map(("message", message), ("timestamp", logMessage.getTimestamp.toString)) + EventsDispatcher.broadcast("instance", Serialization.write(Map(("name", pluginInstanceName), ("action", "log"), ("data", data)))) } /** diff --git a/src/main/scala/org/codeoverflow/chatoverflow/instance/PluginInstance.scala b/src/main/scala/org/codeoverflow/chatoverflow/instance/PluginInstance.scala index 86aac0a8..95459a2c 100644 --- a/src/main/scala/org/codeoverflow/chatoverflow/instance/PluginInstance.scala +++ b/src/main/scala/org/codeoverflow/chatoverflow/instance/PluginInstance.scala @@ -8,6 +8,9 @@ import org.codeoverflow.chatoverflow.api.plugin.{Plugin, PluginManager} import org.codeoverflow.chatoverflow.framework.PluginCompatibilityState.PluginCompatibilityState import org.codeoverflow.chatoverflow.framework.manager.{PluginManagerImpl, PluginManagerStub} import org.codeoverflow.chatoverflow.framework.{PluginCompatibilityState, PluginType} +import org.codeoverflow.chatoverflow.ui.web.rest.events.EventsDispatcher +import org.json4s.DefaultFormats +import org.json4s.jackson.Serialization /** * A plugin instance holds all the general information of the plugin type and specific information of @@ -149,6 +152,9 @@ class PluginInstance(val instanceName: String, pluginType: PluginType) extends W logger info s"Starting plugin '$instanceName' in new thread!" try { instanceThread = new Thread(() => { + implicit val formats: DefaultFormats.type = DefaultFormats + EventsDispatcher.broadcast("instance", Serialization.write(Map(("name", instanceName), ("action", "start")))) + try { // Execute plugin setup @@ -191,6 +197,7 @@ class PluginInstance(val instanceName: String, pluginType: PluginType) extends W requirement.asInstanceOf[Requirement[Output]].get().shutdown() }) + EventsDispatcher.broadcast("instance", Serialization.write(Map(("name", instanceName), ("action", "stop")))) } }) instanceThread.start() diff --git a/src/main/scala/org/codeoverflow/chatoverflow/ui/web/rest/events/EventsController.scala b/src/main/scala/org/codeoverflow/chatoverflow/ui/web/rest/events/EventsController.scala new file mode 100644 index 00000000..fcad0b44 --- /dev/null +++ b/src/main/scala/org/codeoverflow/chatoverflow/ui/web/rest/events/EventsController.scala @@ -0,0 +1,74 @@ +package org.codeoverflow.chatoverflow.ui.web.rest.events + +import java.io.PrintWriter +import java.util.concurrent.ConcurrentHashMap + +import javax.servlet.AsyncContext +import javax.servlet.http.HttpServletRequest +import org.codeoverflow.chatoverflow.ui.web.JsonServlet +import org.scalatra.servlet.ScalatraAsyncSupport +import org.scalatra.{BadRequest, Unauthorized} +import org.scalatra.swagger.Swagger + +class EventsController(implicit val swagger: Swagger) extends JsonServlet with ScalatraAsyncSupport with EventsControllerDefinition { + private val connectionWriters = new ConcurrentHashMap[AsyncContext, PrintWriter]() + + def broadcast(messageType: String, message: String = null): Unit = { + connectionWriters.forEach((_, writer) => { + try { + sendMessage(writer, messageType, message) + } catch { + //probably lost or closed connection, remove from the list of connected clients + case _: Throwable => connectionWriters.remove(writer) + } + }) + } + + def closeConnections(): Unit = { + connectionWriters.forEach((_, writer) => { + try { + sendMessage(writer, "close", null) + writer.close() + } finally { + connectionWriters.remove(writer) + } + }) + } + + private def sendMessage(writer: PrintWriter, messageType: String, message: String): Unit = { + var msg = "event: " + messageType.replace("\n", "") + "\n" + if (message != null) + msg += "data: " + message.replace("\n", "\ndata: ") + "\n\n" + writer.write(msg) + writer.flush() + } + + get("/", operation(getEvents)) { + val accept = request.getHeader("Accept") + if (accept == null || !accept.replace(" ", "").split(",").contains("text/event-stream")) { + status = 406 + } else { + authParamRequired { + contentType = "text/event-stream" + + val asyncContext = request.startAsync() + asyncContext.setTimeout(0) + + val writer = asyncContext.getResponse.getWriter + connectionWriters.put(asyncContext, writer) + } + } + } + + private def authParamRequired(func: => Any)(implicit request: HttpServletRequest): Any = { + val authKeyKey = "authKey" + + if (!request.parameters.contains(authKeyKey) || request.getParameter(authKeyKey).isEmpty) { + BadRequest() + } else if (request.getParameter(authKeyKey) != chatOverflow.credentialsService.generateAuthKey()) { + Unauthorized() + } else { + func + } + } +} diff --git a/src/main/scala/org/codeoverflow/chatoverflow/ui/web/rest/events/EventsControllerDefinition.scala b/src/main/scala/org/codeoverflow/chatoverflow/ui/web/rest/events/EventsControllerDefinition.scala new file mode 100644 index 00000000..b89b3c90 --- /dev/null +++ b/src/main/scala/org/codeoverflow/chatoverflow/ui/web/rest/events/EventsControllerDefinition.scala @@ -0,0 +1,21 @@ +package org.codeoverflow.chatoverflow.ui.web.rest.events + +import org.codeoverflow.chatoverflow.ui.web.rest.{AuthSupport, TagSupport} +import org.scalatra.swagger.{SwaggerSupport, SwaggerSupportSyntax} +import org.scalatra.swagger.SwaggerSupportSyntax.OperationBuilder + +trait EventsControllerDefinition extends SwaggerSupport with TagSupport with AuthSupport { + val getEvents: OperationBuilder = + (apiOperation[Object]("getEvents") + summary "Get events" + description "Get events from chatoverflow using the EventSource API. Requires the authKey as a cookie and an Accept-header with the value text/event-stream." + parameter authQuery + tags controllerTag) + + protected def authQuery: SwaggerSupportSyntax.ParameterBuilder[String] = + queryParam[String]("authKey").description("connection auth key required") + + override def controllerTag: String = "events" + + override protected def applicationDescription: String = "Handles chatoverflow events." +} diff --git a/src/main/scala/org/codeoverflow/chatoverflow/ui/web/rest/events/EventsDispatcher.scala b/src/main/scala/org/codeoverflow/chatoverflow/ui/web/rest/events/EventsDispatcher.scala new file mode 100644 index 00000000..654b0062 --- /dev/null +++ b/src/main/scala/org/codeoverflow/chatoverflow/ui/web/rest/events/EventsDispatcher.scala @@ -0,0 +1,20 @@ +package org.codeoverflow.chatoverflow.ui.web.rest.events + +object EventsDispatcher { + private var controller: EventsController = _ + + def init(eventsController: EventsController): Unit = { + if (controller == null) + controller = eventsController + } + + def broadcast(messageType: String, message: String = null): Unit = { + if (controller != null) + controller.broadcast(messageType, message) + } + + def close(): Unit = { + if (controller != null) + controller.closeConnections() + } +} From 20af9216ac5a8ae5f905d764b8f11343a0d2c72e Mon Sep 17 00:00:00 2001 From: cesmec Date: Tue, 9 Jul 2019 21:24:13 +0200 Subject: [PATCH 2/3] Fixed obsolete comment in events api definition --- .../ui/web/rest/events/EventsControllerDefinition.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/scala/org/codeoverflow/chatoverflow/ui/web/rest/events/EventsControllerDefinition.scala b/src/main/scala/org/codeoverflow/chatoverflow/ui/web/rest/events/EventsControllerDefinition.scala index b89b3c90..c303b3c4 100644 --- a/src/main/scala/org/codeoverflow/chatoverflow/ui/web/rest/events/EventsControllerDefinition.scala +++ b/src/main/scala/org/codeoverflow/chatoverflow/ui/web/rest/events/EventsControllerDefinition.scala @@ -8,7 +8,7 @@ trait EventsControllerDefinition extends SwaggerSupport with TagSupport with Aut val getEvents: OperationBuilder = (apiOperation[Object]("getEvents") summary "Get events" - description "Get events from chatoverflow using the EventSource API. Requires the authKey as a cookie and an Accept-header with the value text/event-stream." + description "Get events from chatoverflow using the EventSource API. Requires an Accept-header with the value text/event-stream." parameter authQuery tags controllerTag) From fa77b69d4124de94a82b10d3e4495b506a8915cd Mon Sep 17 00:00:00 2001 From: cesmec Date: Fri, 12 Jul 2019 22:21:31 +0200 Subject: [PATCH 3/3] Added EventMessage overload to event broadcast and some documentation --- .../framework/manager/PluginManagerImpl.scala | 12 ++++---- .../instance/PluginInstance.scala | 9 ++---- .../ui/web/rest/events/EventMessage.scala | 3 ++ .../ui/web/rest/events/EventsController.scala | 10 +++++++ .../ui/web/rest/events/EventsDispatcher.scala | 30 +++++++++++++++++++ 5 files changed, 52 insertions(+), 12 deletions(-) create mode 100644 src/main/scala/org/codeoverflow/chatoverflow/ui/web/rest/events/EventMessage.scala diff --git a/src/main/scala/org/codeoverflow/chatoverflow/framework/manager/PluginManagerImpl.scala b/src/main/scala/org/codeoverflow/chatoverflow/framework/manager/PluginManagerImpl.scala index 188c539e..10e5b4d8 100644 --- a/src/main/scala/org/codeoverflow/chatoverflow/framework/manager/PluginManagerImpl.scala +++ b/src/main/scala/org/codeoverflow/chatoverflow/framework/manager/PluginManagerImpl.scala @@ -4,9 +4,7 @@ import java.util import org.codeoverflow.chatoverflow.WithLogger import org.codeoverflow.chatoverflow.api.plugin.{PluginLogMessage, PluginManager} -import org.codeoverflow.chatoverflow.ui.web.rest.events.EventsDispatcher -import org.json4s.DefaultFormats -import org.json4s.jackson.Serialization +import org.codeoverflow.chatoverflow.ui.web.rest.events.{EventMessage, EventsDispatcher} import scala.collection.JavaConverters._ import scala.collection.mutable.ListBuffer @@ -33,9 +31,11 @@ class PluginManagerImpl(pluginInstanceName: String, logOutputOnConsole: Boolean) logger info s"[$pluginInstanceName] $message" } - implicit val formats: DefaultFormats.type = DefaultFormats - val data = Map(("message", message), ("timestamp", logMessage.getTimestamp.toString)) - EventsDispatcher.broadcast("instance", Serialization.write(Map(("name", pluginInstanceName), ("action", "log"), ("data", data)))) + EventsDispatcher.broadcast("instance", EventMessage("log", Map( + ("name", pluginInstanceName), + ("message", message), + ("timestamp", logMessage.getTimestamp.toString) + ))) } /** diff --git a/src/main/scala/org/codeoverflow/chatoverflow/instance/PluginInstance.scala b/src/main/scala/org/codeoverflow/chatoverflow/instance/PluginInstance.scala index 95459a2c..a7adca24 100644 --- a/src/main/scala/org/codeoverflow/chatoverflow/instance/PluginInstance.scala +++ b/src/main/scala/org/codeoverflow/chatoverflow/instance/PluginInstance.scala @@ -8,9 +8,7 @@ import org.codeoverflow.chatoverflow.api.plugin.{Plugin, PluginManager} import org.codeoverflow.chatoverflow.framework.PluginCompatibilityState.PluginCompatibilityState import org.codeoverflow.chatoverflow.framework.manager.{PluginManagerImpl, PluginManagerStub} import org.codeoverflow.chatoverflow.framework.{PluginCompatibilityState, PluginType} -import org.codeoverflow.chatoverflow.ui.web.rest.events.EventsDispatcher -import org.json4s.DefaultFormats -import org.json4s.jackson.Serialization +import org.codeoverflow.chatoverflow.ui.web.rest.events.{EventMessage, EventsDispatcher} /** * A plugin instance holds all the general information of the plugin type and specific information of @@ -152,8 +150,7 @@ class PluginInstance(val instanceName: String, pluginType: PluginType) extends W logger info s"Starting plugin '$instanceName' in new thread!" try { instanceThread = new Thread(() => { - implicit val formats: DefaultFormats.type = DefaultFormats - EventsDispatcher.broadcast("instance", Serialization.write(Map(("name", instanceName), ("action", "start")))) + EventsDispatcher.broadcast("instance", EventMessage("start", Map(("name", instanceName)))) try { @@ -197,7 +194,7 @@ class PluginInstance(val instanceName: String, pluginType: PluginType) extends W requirement.asInstanceOf[Requirement[Output]].get().shutdown() }) - EventsDispatcher.broadcast("instance", Serialization.write(Map(("name", instanceName), ("action", "stop")))) + EventsDispatcher.broadcast("instance", EventMessage("stop", Map(("name", instanceName)))) } }) instanceThread.start() diff --git a/src/main/scala/org/codeoverflow/chatoverflow/ui/web/rest/events/EventMessage.scala b/src/main/scala/org/codeoverflow/chatoverflow/ui/web/rest/events/EventMessage.scala new file mode 100644 index 00000000..704ae346 --- /dev/null +++ b/src/main/scala/org/codeoverflow/chatoverflow/ui/web/rest/events/EventMessage.scala @@ -0,0 +1,3 @@ +package org.codeoverflow.chatoverflow.ui.web.rest.events + +case class EventMessage[T](action: String, data: T) diff --git a/src/main/scala/org/codeoverflow/chatoverflow/ui/web/rest/events/EventsController.scala b/src/main/scala/org/codeoverflow/chatoverflow/ui/web/rest/events/EventsController.scala index fcad0b44..3b2600e4 100644 --- a/src/main/scala/org/codeoverflow/chatoverflow/ui/web/rest/events/EventsController.scala +++ b/src/main/scala/org/codeoverflow/chatoverflow/ui/web/rest/events/EventsController.scala @@ -36,6 +36,16 @@ class EventsController(implicit val swagger: Swagger) extends JsonServlet with S } private def sendMessage(writer: PrintWriter, messageType: String, message: String): Unit = { + /* + Every message has the following format and ends with two line feeds (\n): + event: [name of event] + data: [first line] + data: [second line] + ... + + See also: https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events#Examples + */ + var msg = "event: " + messageType.replace("\n", "") + "\n" if (message != null) msg += "data: " + message.replace("\n", "\ndata: ") + "\n\n" diff --git a/src/main/scala/org/codeoverflow/chatoverflow/ui/web/rest/events/EventsDispatcher.scala b/src/main/scala/org/codeoverflow/chatoverflow/ui/web/rest/events/EventsDispatcher.scala index 654b0062..fa3a044b 100644 --- a/src/main/scala/org/codeoverflow/chatoverflow/ui/web/rest/events/EventsDispatcher.scala +++ b/src/main/scala/org/codeoverflow/chatoverflow/ui/web/rest/events/EventsDispatcher.scala @@ -1,18 +1,48 @@ package org.codeoverflow.chatoverflow.ui.web.rest.events +import org.json4s.{DefaultFormats, Formats} +import org.json4s.jackson.Serialization + +/** + * The EventsDispatcher is the central point for realtime communication to the clients + */ object EventsDispatcher { private var controller: EventsController = _ + implicit val formats: Formats = DefaultFormats + /** + * Initializes the EventsDispatcher with the registered controller + * Only to be used from the bootstrap + * @param eventsController registered controller that accepts the incoming connections + */ def init(eventsController: EventsController): Unit = { if (controller == null) controller = eventsController } + /** + * Sends the message to all connected clients + * @param messageType type of the message / event + * @param message the message to send + * @tparam T type of the message data + */ + def broadcast[T](messageType: String, message: EventMessage[T]): Unit = { + broadcast(messageType, Serialization.write(message)) + } + + /** + * Sends the message to all connected clients + * @param messageType type of the message / event + * @param message the message to send + */ def broadcast(messageType: String, message: String = null): Unit = { if (controller != null) controller.broadcast(messageType, message) } + /** + * Sends a close message to all connected clients and closes the connections + */ def close(): Unit = { if (controller != null) controller.closeConnections()