"""The database backend for server."""
import collections.abc
import datetime
import itertools
import sqlite3
from .config import ServerConfig
[docs]
class ServerDatabase:
"""Abstraction on top of the database to provide data retention functionality."""
def __init__(self, config: ServerConfig) -> None:
self.db = sqlite3.Connection(str(config.dbfile))
with self.db as con:
con.execute(
"CREATE TABLE IF NOT EXISTS projects("
+ "id INTEGER PRIMARY KEY AUTOINCREMENT,"
+ "name TEXT NOT NULL UNIQUE,"
+ "complete BOOL NOT NULL"
+ ")"
)
con.execute(
"CREATE TABLE IF NOT EXISTS workers("
+ "id INTEGER PRIMARY KEY AUTOINCREMENT,"
+ "name TEXT NOT NULL UNIQUE"
+ ")"
)
con.execute(
"CREATE TABLE IF NOT EXISTS work("
+ "projectid INTEGER NOT NULL,"
+ "workerid INTEGER NOT NULL,"
+ "seconds INTEGER NOT NULL,"
+ "FOREIGN KEY (projectid) REFERENCES projects (id),"
+ "FOREIGN KEY (workerid) REFERENCES workers (id)"
+ ")"
)
con.execute(
"CREATE TABLE IF NOT EXISTS work_in_progress("
+ "projectid INTEGER NOT NULL,"
+ "workerid INTEGER NOT NULL UNIQUE,"
+ "starttime TEXT NOT NULL,"
+ "FOREIGN KEY (projectid) REFERENCES projects (id),"
+ "FOREIGN KEY (workerid) REFERENCES workers (id)"
+ ")"
)
[docs]
def projects(self) -> collections.abc.Iterator[str]:
"""Iterate over project names."""
for p in self.db.execute("SELECT name FROM projects ORDER BY name"):
yield p[0]
[docs]
def projects_in_progress(self) -> collections.abc.Iterator[str]:
"""Iterate over project names for those not yet completed."""
for p in self.db.execute("SELECT name FROM projects WHERE NOT complete"):
yield p[0]
[docs]
def new_project(self, name: str) -> None:
"""Create new project.
:param name: name of the new project
"""
try:
with self.db as con:
con.execute("INSERT INTO projects VALUES(NULL, ?, FALSE)", (name,))
except sqlite3.IntegrityError as exc:
raise ValueError(f"Project already exists: {name}") from exc
[docs]
def complete_project(self, name: str) -> None:
"""Make project completed.
:param name: name of the project
"""
if name not in set(self.projects_in_progress()):
raise ValueError(f"No such project in progress: {name}")
with self.db as con:
con.execute("UPDATE projects SET complete = TRUE WHERE name == ?", (name,))
[docs]
def project_complete(self, name: str) -> bool:
"""Check if given project is complete or not.
:param name: name of the project
"""
return bool(
self.db.execute(
"SELECT complete FROM projects WHERE projects.name == ?",
(name,),
).fetchone()[0]
)
[docs]
def project_workers(self, name: str) -> set[str]:
"""Set of all workers participating in this project.
:param name: name of the project
"""
workers = self.db.execute(
"SELECT DISTINCT workers.name FROM work, workers, projects WHERE projects.name == ? AND work.projectid == projects.id AND work.workerid == workers.id",
(name,),
).fetchone()
workers_in_progress = self.db.execute(
"SELECT DISTINCT workers.name FROM work_in_progress, workers, projects WHERE projects.name == ? AND work_in_progress.projectid == projects.id AND work_in_progress.workerid == workers.id",
(name,),
).fetchone()
return set(itertools.chain(workers or (), workers_in_progress or ()))
[docs]
def project_worker_seconds(self, project: str, worker: str) -> int:
"""Provide number of seconds invested in this project by this worker so far.
:param project: name of the project
:param worker: name of the worker
:return: number of seconds invested
"""
seconds = self.db.execute(
"SELECT SUM(seconds) FROM work, workers, projects WHERE "
"projects.name == ? AND workers.name == ? AND "
"work.projectid == projects.id AND work.workerid == workers.id",
(project, worker),
).fetchone()
seconds_in_progress = self.db.execute(
"SELECT SUM(strftime('%s',?) - strftime('%s',starttime)) "
"FROM work_in_progress, workers, projects WHERE "
"projects.name == ? AND workers.name == ? AND "
"work_in_progress.projectid == projects.id AND work_in_progress.workerid == workers.id",
(datetime.datetime.now(), project, worker),
).fetchone()
return (seconds[0] or 0) + (seconds_in_progress[0] or 0)
[docs]
def project_seconds(self, name: str) -> int:
"""Provide number of seconds invested in this project so far.
:param name: name of the project
"""
seconds = self.db.execute(
"SELECT SUM(seconds) FROM work, projects WHERE projects.name == ? AND work.projectid == projects.id",
(name,),
).fetchone()
seconds_in_progress = self.db.execute(
"SELECT SUM(strftime('%s',?) - strftime('%s',starttime)) FROM work_in_progress, projects WHERE projects.name == ? AND work_in_progress.projectid == projects.id",
(datetime.datetime.now(), name),
).fetchone()
return (seconds[0] or 0) + (seconds_in_progress[0] or 0)
[docs]
def workers(self) -> collections.abc.Iterator[str]:
"""Iterate over all worker names."""
for p in self.db.execute("SELECT name FROM workers ORDER BY name"):
yield p[0]
[docs]
def new_worker(self, name: str) -> None:
"""Register new worker.
:param name: name of the new worker
"""
try:
with self.db as con:
con.execute("INSERT INTO workers VALUES(NULL, ?)", (name,))
except sqlite3.IntegrityError as exc:
raise ValueError(f"Worker already exists: {name}") from exc
[docs]
def work(self, name: str, project: str | None) -> None:
"""Start or end worker's work on project.
:param name: name of the worker
:param project: name of the project, `None` when ending the work
"""
with self.db as con:
cur = con.cursor()
# Finish previous work if any
if prev := cur.execute(
"DELETE FROM work_in_progress WHERE workerid == (SELECT id FROM workers WHERE name == ?) RETURNING *",
(name,),
).fetchone():
cur.execute(
"INSERT INTO work VALUES(?,?,?)",
(
prev[0],
prev[1],
int(
(
datetime.datetime.now()
- datetime.datetime.fromisoformat(prev[2])
).total_seconds()
),
),
)
# Add current work
cur.execute(
"INSERT INTO work_in_progress SELECT projects.id, workers.id, ? FROM projects, workers WHERE projects.name == ? AND workers.name == ?",
(datetime.datetime.now(), project, name),
)
[docs]
def retroactive_clockout(self, name: str, retro_date: float) -> None:
"""Retroactively end worker's work on their current project.
:param name: name of the worker
:param retro_date: clockout time timestamp
"""
with self.db as con:
cur = con.cursor()
if prev := cur.execute(
"DELETE FROM work_in_progress WHERE workerid == (SELECT id FROM workers WHERE name == ?) RETURNING *",
(name,),
).fetchone():
start_time = datetime.datetime.fromisoformat(prev[2])
end_time = datetime.datetime.fromtimestamp(retro_date)
if end_time < start_time:
raise ValueError(
f"You cannot end the work before it started! Start time: {start_time}, end time: {end_time}."
)
cur.execute(
"INSERT INTO work VALUES(?,?,?)",
(
prev[0],
prev[1],
int((end_time - start_time).total_seconds()),
),
)
[docs]
def add_work(self, name: str, project: str, seconds: int) -> None:
"""Add record about work in the past.
:param name: name of the worker
:param project: name of the project
:param seconds: number of seconds invested
"""
self.db.execute(
"INSERT INTO work SELECT projects.id, workers.id, ? FROM projects, workers WHERE projects.name == ? AND workers.name == ?",
(seconds, project, name),
)
[docs]
def current_project(self, name: str) -> str | None:
"""Get current project user is working on.
:param name: name of the worker
"""
res = self.db.execute(
"SELECT projects.name FROM projects, work_in_progress, workers WHERE workers.name == ? AND work_in_progress.workerid == workers.id AND work_in_progress.projectid == projects.id",
(name,),
).fetchone()
return res[0] if res else None
[docs]
def current_seconds(self, name: str) -> int | None:
"""Get current number of seconds user already invested into the current project.
:param name: name of the project
:return: number of seconds invested
"""
res = self.db.execute(
"SELECT strftime('%s',?) - strftime('%s',starttime) FROM work_in_progress, workers WHERE workers.name == ? AND work_in_progress.workerid == workers.id",
(datetime.datetime.now(), name),
).fetchone()
return res[0] if res else None