Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions src/main/scala/ru/org/codingteam/horta/core/Core.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import akka.actor._
import akka.pattern.ask
import akka.util.Timeout
import org.joda.time.DateTime
import ru.org.codingteam.horta.database.{DAO, PersistentStore}
import ru.org.codingteam.horta.database.{RepositoryFactory, PersistentStore}
import ru.org.codingteam.horta.messages._
import ru.org.codingteam.horta.plugins.HelperPlugin.HelperPlugin
import ru.org.codingteam.horta.plugins._
Expand Down Expand Up @@ -212,8 +212,7 @@ class Core extends Actor with ActorLogging {

object Core {

private def getCommands(pluginDefinitions: List[(ActorRef, PluginDefinition)]
): Map[String, List[(ActorRef, CommandDefinition)]] = {
private def getCommands(pluginDefinitions: List[(ActorRef, PluginDefinition)]): Map[String, List[(ActorRef, CommandDefinition)]] = {
val commands = for ((actor, pluginDefinition) <- pluginDefinitions) yield {
for (command <- pluginDefinition.commands) yield (command.name, actor, command)
}
Expand All @@ -228,8 +227,10 @@ object Core {
private def getCommandsDescription(pluginDefinitions: List[(ActorRef, PluginDefinition)]) =
pluginDefinitions.map(t => t._2.name -> t._2.commands.map(cd => cd.name -> cd.level)).toMap

private def getStorages(pluginDefinitions: List[(ActorRef, PluginDefinition)]): Map[String, DAO] = {
pluginDefinitions.map(_._2).filter(_.dao.isDefined).map(definition => (definition.name, definition.dao.get)).toMap
private def getStorages(pluginDefinitions: Seq[(ActorRef, PluginDefinition)]): Map[String, RepositoryFactory] = {
pluginDefinitions.toStream.flatMap { case (_, definition) =>
definition.repositoryFactory.map(factory => (definition.name, factory))
}.toMap
}

}
103 changes: 28 additions & 75 deletions src/main/scala/ru/org/codingteam/horta/database/PersistentStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,58 +2,20 @@ package ru.org.codingteam.horta.database

import javax.sql.DataSource

import akka.actor.{Actor, ActorLogging}
import akka.actor.{Actor, ActorLogging, ActorSelection}
import akka.pattern.ask
import akka.util.Timeout
import com.googlecode.flyway.core.Flyway
import org.h2.jdbcx.JdbcConnectionPool
import ru.org.codingteam.horta.configuration.Configuration
import scalikejdbc.{ConnectionPool, DB, DBSession, DataSourceConnectionPool}

import scala.concurrent.Future
import scala.concurrent.duration._
import scala.language.postfixOps
import scala.reflect.ClassTag

case class StoreObject(plugin: String, id: Option[Any], obj: Any)

case class ReadObject(plugin: String, id: Any)

case class DeleteObject(plugin: String, id: Any)

trait DAO {

/**
* Schema name for current DAO.
* @return schema name.
*/
def schema: String

/**
* Store an object in the database.
* @param session session to access the database.
* @param id object id (if null then it should be generated).
* @param obj stored object.
* @return stored object id (or None if object was not stored).
*/
def store(implicit session: DBSession, id: Option[Any], obj: Any): Option[Any]

/**
* Read an object from the database.
* @param session session to access the database.
* @param id object id.
* @return stored object or None if object not found.
*/
def read(implicit session: DBSession, id: Any): Option[Any]

/**
* Delete an object from the database.
* @param session session to access the database.
* @param id object id.
* @return true if object was successfully deleted, false otherwise.
*/
def delete(implicit session: DBSession, id: Any): Boolean

}

class PersistentStore(storages: Map[String, DAO]) extends Actor with ActorLogging {
class PersistentStore(repositories: Map[String, RepositoryFactory]) extends Actor with ActorLogging {

val Url = Configuration.storageUrl
val User = Configuration.storageUser
Expand All @@ -70,45 +32,22 @@ class PersistentStore(storages: Map[String, DAO]) extends Actor with ActorLoggin
}

override def receive = {
case StoreObject(plugin, id, obj) =>
storages.get(plugin) match {
case Some(dao) =>
initializeDatabase(dao)
withTransaction { session =>
sender ! dao.store(session, id, obj)
}

case None =>
log.info(s"Cannot store object $obj for plugin $plugin")
}

case ReadObject(plugin, id) =>
storages.get(plugin) match {
case Some(dao) =>
initializeDatabase(dao)
withTransaction { session =>
sender ! dao.read(session, id)
}

case None =>
log.info(s"Cannot read object $id for plugin $plugin")
}

case DeleteObject(plugin, id) =>
storages.get(plugin) match {
case Some(dao) =>
initializeDatabase(dao)
case PersistentStore.Execute(plugin, action) =>
repositories.get(plugin) match {
case Some(factory) =>
initializeDatabase(factory)
withTransaction { session =>
sender ! dao.delete(session, id)
val repository = factory.create(session)
sender ! action(repository)
}

case None =>
log.info(s"Cannot delete object $id for plugin $plugin")
log.info(s"Cannot execute action $action for plugin $plugin: repository not found")
}
}

private def initializeDatabase(dao: DAO) {
val schema = dao.schema
private def initializeDatabase(factory: RepositoryFactory) {
val schema = factory.schema
if (!initializedSchemas.contains(schema)) {
initializeScript(schema)
initializedSchemas += schema
Expand All @@ -133,4 +72,18 @@ class PersistentStore(storages: Map[String, DAO]) extends Actor with ActorLoggin
flyway.repair()
flyway.migrate()
}

}

object PersistentStore {

private case class Execute(plugin: String, action: (Any) => Any)

def execute[Repository, T: ClassTag](plugin: String, store: ActorSelection)
(action: (Repository) => T)
(implicit timeout: Timeout): Future[T] = {
val message = Execute(plugin, (r) => action(r.asInstanceOf[Repository]))
(store ? message).mapTo[T]
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package ru.org.codingteam.horta.database

import scalikejdbc.DBSession

/**
* A repository factory. Will be used to create the repository.
*
* @param schema schema name for current repository.
* @param create a repository creation function.
*/
case class RepositoryFactory(schema: String, create: (DBSession => Any))
14 changes: 4 additions & 10 deletions src/main/scala/ru/org/codingteam/horta/plugins/BasePlugin.scala
Original file line number Diff line number Diff line change
@@ -1,18 +1,17 @@
package ru.org.codingteam.horta.plugins

import akka.actor.{Actor, ActorLogging}
import ru.org.codingteam.horta.database.DAO

/**
* CommandPlugin class used as base for all command plugins.
* Common plugin functionality. Every plugin should be inherited from this class.
*/
abstract class BasePlugin extends Actor with ActorLogging {

def receive = {
case GetPluginDefinition => sender ! pluginDefinition
}

protected val core = context.actorSelection("/user/core")
protected val store = context.actorSelection("/user/core/store")

/**
* Plugin name.
Expand All @@ -32,15 +31,10 @@ abstract class BasePlugin extends Actor with ActorLogging {
*/
protected def commands: List[CommandDefinition] = List()

/**
* Plugin data access object. May be None if not present.
* @return data access object.
*/
protected def dao: Option[DAO] = None

/**
* A full plugin definition.
* @return plugin definition.
*/
private def pluginDefinition = PluginDefinition(name, notifications, commands, dao)
protected def pluginDefinition = PluginDefinition(name, notifications, commands, None)

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package ru.org.codingteam.horta.plugins

import akka.util.Timeout
import ru.org.codingteam.horta.database.{RepositoryFactory, PersistentStore}
import scalikejdbc.DBSession

import scala.concurrent.Future
import scala.reflect.ClassTag

/**
* Trait for the plugins that can access the database.
*/
trait DataAccessingPlugin[Repository] extends BasePlugin {

/**
* A full plugin definition.
* @return plugin definition.
*/
override protected def pluginDefinition: PluginDefinition =
super.pluginDefinition.copy(repositoryFactory = Some(RepositoryFactory(schema, createRepository)))

/**
* Schema name for database storage.
*/
protected val schema: String

/**
* A function creating the repository for database access.
*/
protected val createRepository: (DBSession) => Repository

/**
* Execute repository action. Use with caution - do not mess with plugin data members within action; that is not
* context-safe.
*
* @param action an action.
* @tparam T type of action result.
* @return future that will be resolved after action execution.
*/
protected def withDatabase[T: ClassTag](action: (Repository) => T)
(implicit timeout: Timeout): Future[T] = {
PersistentStore.execute[Repository, T](name, store)(action)
}

private val store = context.actorSelection("/user/core/store")

}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package ru.org.codingteam.horta.plugins

import ru.org.codingteam.horta.database.DAO
import ru.org.codingteam.horta.database.RepositoryFactory

/**
* Description of events.
Expand All @@ -17,9 +17,10 @@ case class Notifications(messages: Boolean,
* @param name plugin name.
* @param notifications description of events plugin want to be notified of.
* @param commands a list of commands supported by the plugin.
* @param dao plugin data access object if present.
* @param repositoryFactory a factory of repositories used for data access. Data access is disabled if factory isn't
* defined.
*/
case class PluginDefinition(name: String,
notifications: Notifications,
commands: List[CommandDefinition],
dao: Option[DAO])
repositoryFactory: Option[RepositoryFactory])
106 changes: 0 additions & 106 deletions src/main/scala/ru/org/codingteam/horta/plugins/karma/KarmaDAO.scala

This file was deleted.

Loading