Source code for ellclockin.server.server

"""The SHV RPC server."""

from __future__ import annotations

import collections.abc
import logging

import shv
import shv.broker

from .config import ServerConfig
from .database import ServerDatabase

logger = logging.getLogger(__name__)


[docs] class Server(shv.broker.RpcBroker): """Elldev server that is also a SHV Broker."""
[docs] class Client(shv.broker.RpcBroker.Client): """The client exposed in Broker.""" broker: Server def _ls(self, path: str) -> collections.abc.Iterator[str]: yield from super()._ls(path) match path: case "": yield "clockin" case "clockin": yield "workers" yield "projects" case "clockin/workers": yield "self" yield from self.broker.database.workers() case "clockin/projects": yield from self.broker.database.projects() def _dir(self, path: str) -> collections.abc.Iterator[shv.RpcMethodDesc]: yield from super()._dir(path) match path.split("/"): case ("clockin", "projects"): yield shv.RpcMethodDesc( "new", param="String", access=shv.RpcMethodAccess.COMMAND ) yield shv.RpcMethodDesc( "clockout", flags=shv.RpcMethodFlags.USER_ID_REQUIRED, access=shv.RpcMethodAccess.WRITE, ) yield shv.RpcMethodDesc( "retroactive", param="Float", flags=shv.RpcMethodFlags.USER_ID_REQUIRED, access=shv.RpcMethodAccess.WRITE, ) case ("clockin", "projects", project): yield shv.RpcMethodDesc( "worktime", flags=shv.RpcMethodFlags.GETTER, result="Int", access=shv.RpcMethodAccess.READ, ) yield shv.RpcMethodDesc( "workers", flags=shv.RpcMethodFlags.GETTER, result="List[String]", access=shv.RpcMethodAccess.READ, ) if not self.broker.database.project_complete(project): yield shv.RpcMethodDesc( "work", flags=shv.RpcMethodFlags.USER_ID_REQUIRED, access=shv.RpcMethodAccess.WRITE, ) yield shv.RpcMethodDesc( "complete", access=shv.RpcMethodAccess.COMMAND, ) case ("clockin", "workers"): yield shv.RpcMethodDesc( "new", param="String", access=shv.RpcMethodAccess.COMMAND ) case ("clockin", "workers", _): yield shv.RpcMethodDesc( "currentProject", flags=shv.RpcMethodFlags.GETTER, result="OptionalString", access=shv.RpcMethodAccess.READ, ) yield shv.RpcMethodDesc( "currentSeconds", flags=shv.RpcMethodFlags.GETTER, result="OptionalInt", access=shv.RpcMethodAccess.READ, ) async def _method_call( self, path: str, method: str, param: shv.SHVType, access: shv.RpcMethodAccess, user_id: str | None, ) -> shv.SHVType: match path.split("/"), method: case ("clockin", "projects"), "new": if not isinstance(param, str): raise shv.RpcInvalidParamsError("Expected String") self.broker.database.new_project(param) return None case ("clockin", "projects"), "clockout": if param is not None: raise shv.RpcInvalidParamsError("No parameter is expected") if user_id is None: raise shv.RpcUserIDRequiredError self.broker.database.work(user_id, None) return None case ("clockin", "projects"), "retroactive": if not isinstance(param, float): raise shv.RpcInvalidParamsError("Expected Float") if user_id is None: raise shv.RpcUserIDRequiredError self.broker.database.retroactive_clockout(user_id, param) return None case ("clockin", "projects", project), "worktime": if param is not None: raise shv.RpcInvalidParamsError("No parameter is expected") return self.broker.database.project_seconds(project) case ("clockin", "projects", project), "workers": if param is not None: raise shv.RpcInvalidParamsError("No parameter is expected") return list(self.broker.database.project_workers(project)) case ( "clockin", "projects", project, ), "work" if not self.broker.database.project_complete(project): if param is not None: raise shv.RpcInvalidParamsError("No parameter is expected") if user_id is None: raise shv.RpcUserIDRequiredError if user_id not in set(self.broker.database.workers()): self.broker.database.new_worker(user_id) self.broker.database.work(user_id, project) return None case ( "clockin", "projects", project, ), "checkComplete": if param is not None: raise shv.RpcInvalidParamsError("No parameter is expected") return self.broker.database.project_complete(project) case ( "clockin", "projects", project, ), "contributions": if not isinstance(param, str): raise shv.RpcInvalidParamsError("Expected String") return self.broker.database.project_worker_seconds(project, param) case ( "clockin", "projects", project, ), "complete" if not self.broker.database.project_complete(project): if param is not None: raise shv.RpcInvalidParamsError("No parameter is expected") self.broker.database.complete_project(project) return None case ("clockin", "workers"), "new": if not isinstance(param, str): raise shv.RpcInvalidParamsError("Expected String") self.broker.database.new_worker(param) return None case ("clockin", "workers", worker), "currentProject": if worker == "self": if user_id is None: raise shv.RpcUserIDRequiredError worker = user_id return self.broker.database.current_project(worker) case ("clockin", "workers", worker), "currentSeconds": if worker == "self": if user_id is None: raise shv.RpcUserIDRequiredError worker = user_id return self.broker.database.current_seconds(worker) return await super()._method_call(path, method, param, access, user_id)
[docs] class LoginClient(Client, shv.broker.RpcBroker.LoginClient): """Broker's client that expects login from client.""" APP_NAME = "clockin-broker"
[docs] class ConnectClient(Client, shv.broker.RpcBroker.ConnectClient): """Broker client that actively connects to some other peer.""" APP_NAME = "clockin-broker-client"
def __init__(self, config: ServerConfig) -> None: super().__init__(config.shvbroker_config()) self.server_config = config self.database = ServerDatabase(config)