"""The SHV RPC server."""
from __future__ import annotations
import collections.abc
import logging
import shv
import shv.broker
from .config import ServerConfig
from .database import ServerDatabase
logger = logging.getLogger(__name__)
[docs]
class Server(shv.broker.RpcBroker):
"""Elldev server that is also a SHV Broker."""
[docs]
class Client(shv.broker.RpcBroker.Client):
"""The client exposed in Broker."""
broker: Server
def _ls(self, path: str) -> collections.abc.Iterator[str]:
yield from super()._ls(path)
match path:
case "":
yield "clockin"
case "clockin":
yield "workers"
yield "projects"
case "clockin/workers":
yield "self"
yield from self.broker.database.workers()
case "clockin/projects":
yield from self.broker.database.projects()
def _dir(self, path: str) -> collections.abc.Iterator[shv.RpcMethodDesc]:
yield from super()._dir(path)
match path.split("/"):
case ("clockin", "projects"):
yield shv.RpcMethodDesc(
"new", param="String", access=shv.RpcMethodAccess.COMMAND
)
yield shv.RpcMethodDesc(
"clockout",
flags=shv.RpcMethodFlags.USER_ID_REQUIRED,
access=shv.RpcMethodAccess.WRITE,
)
yield shv.RpcMethodDesc(
"retroactive",
param="Float",
flags=shv.RpcMethodFlags.USER_ID_REQUIRED,
access=shv.RpcMethodAccess.WRITE,
)
case ("clockin", "projects", project):
yield shv.RpcMethodDesc(
"worktime",
flags=shv.RpcMethodFlags.GETTER,
result="Int",
access=shv.RpcMethodAccess.READ,
)
yield shv.RpcMethodDesc(
"workers",
flags=shv.RpcMethodFlags.GETTER,
result="List[String]",
access=shv.RpcMethodAccess.READ,
)
if not self.broker.database.project_complete(project):
yield shv.RpcMethodDesc(
"work",
flags=shv.RpcMethodFlags.USER_ID_REQUIRED,
access=shv.RpcMethodAccess.WRITE,
)
yield shv.RpcMethodDesc(
"complete",
access=shv.RpcMethodAccess.COMMAND,
)
case ("clockin", "workers"):
yield shv.RpcMethodDesc(
"new", param="String", access=shv.RpcMethodAccess.COMMAND
)
case ("clockin", "workers", _):
yield shv.RpcMethodDesc(
"currentProject",
flags=shv.RpcMethodFlags.GETTER,
result="OptionalString",
access=shv.RpcMethodAccess.READ,
)
yield shv.RpcMethodDesc(
"currentSeconds",
flags=shv.RpcMethodFlags.GETTER,
result="OptionalInt",
access=shv.RpcMethodAccess.READ,
)
async def _method_call(
self,
path: str,
method: str,
param: shv.SHVType,
access: shv.RpcMethodAccess,
user_id: str | None,
) -> shv.SHVType:
match path.split("/"), method:
case ("clockin", "projects"), "new":
if not isinstance(param, str):
raise shv.RpcInvalidParamsError("Expected String")
self.broker.database.new_project(param)
return None
case ("clockin", "projects"), "clockout":
if param is not None:
raise shv.RpcInvalidParamsError("No parameter is expected")
if user_id is None:
raise shv.RpcUserIDRequiredError
self.broker.database.work(user_id, None)
return None
case ("clockin", "projects"), "retroactive":
if not isinstance(param, float):
raise shv.RpcInvalidParamsError("Expected Float")
if user_id is None:
raise shv.RpcUserIDRequiredError
self.broker.database.retroactive_clockout(user_id, param)
return None
case ("clockin", "projects", project), "worktime":
if param is not None:
raise shv.RpcInvalidParamsError("No parameter is expected")
return self.broker.database.project_seconds(project)
case ("clockin", "projects", project), "workers":
if param is not None:
raise shv.RpcInvalidParamsError("No parameter is expected")
return list(self.broker.database.project_workers(project))
case (
"clockin",
"projects",
project,
), "work" if not self.broker.database.project_complete(project):
if param is not None:
raise shv.RpcInvalidParamsError("No parameter is expected")
if user_id is None:
raise shv.RpcUserIDRequiredError
if user_id not in set(self.broker.database.workers()):
self.broker.database.new_worker(user_id)
self.broker.database.work(user_id, project)
return None
case (
"clockin",
"projects",
project,
), "checkComplete":
if param is not None:
raise shv.RpcInvalidParamsError("No parameter is expected")
return self.broker.database.project_complete(project)
case (
"clockin",
"projects",
project,
), "contributions":
if not isinstance(param, str):
raise shv.RpcInvalidParamsError("Expected String")
return self.broker.database.project_worker_seconds(project, param)
case (
"clockin",
"projects",
project,
), "complete" if not self.broker.database.project_complete(project):
if param is not None:
raise shv.RpcInvalidParamsError("No parameter is expected")
self.broker.database.complete_project(project)
return None
case ("clockin", "workers"), "new":
if not isinstance(param, str):
raise shv.RpcInvalidParamsError("Expected String")
self.broker.database.new_worker(param)
return None
case ("clockin", "workers", worker), "currentProject":
if worker == "self":
if user_id is None:
raise shv.RpcUserIDRequiredError
worker = user_id
return self.broker.database.current_project(worker)
case ("clockin", "workers", worker), "currentSeconds":
if worker == "self":
if user_id is None:
raise shv.RpcUserIDRequiredError
worker = user_id
return self.broker.database.current_seconds(worker)
return await super()._method_call(path, method, param, access, user_id)
[docs]
class LoginClient(Client, shv.broker.RpcBroker.LoginClient):
"""Broker's client that expects login from client."""
APP_NAME = "clockin-broker"
[docs]
class ConnectClient(Client, shv.broker.RpcBroker.ConnectClient):
"""Broker client that actively connects to some other peer."""
APP_NAME = "clockin-broker-client"
def __init__(self, config: ServerConfig) -> None:
super().__init__(config.shvbroker_config())
self.server_config = config
self.database = ServerDatabase(config)