grung-db/src/grung/db.py
2025-09-28 12:00:19 -07:00

118 lines
3.6 KiB
Python

import inspect
import re
from functools import reduce
from operator import ior
from typing import List
from tinydb import Query, TinyDB, table
from tinydb.table import Document
from grung.exceptions import UniqueConstraintError
from grung.types import Record
class RecordTable(table.Table):
"""
Wrapper around tinydb Tables that handles Records instead of dicts.
"""
def __init__(self, name: str, db: TinyDB, document_class: Document = Record, **kwargs):
self.document_class = document_class
self._db = db
super().__init__(db.storage, name, **kwargs)
def insert(self, document):
document.before_insert()
doc = document.serialize(self._db)
self._check_constraints(doc)
if doc.doc_id:
last_insert_id = super().upsert(doc)[0]
else:
last_insert_id = super().insert(dict(doc))
return self.get(doc_id=last_insert_id)
def get(self, doc_id: int):
document = super().get(doc_id=doc_id)
if document:
return document.deserialize(self._db)
def search(self, *args, recurse: bool = True, **kwargs) -> List[Record]:
results = super().search(*args, **kwargs)
return [r.deserialize(self._db, recurse=recurse) for r in results]
def remove(self, document):
if document.doc_id:
super().remove(doc_ids=[document.doc_id])
def _check_constraints(self, document) -> bool:
self._check_unique(document)
def _check_unique(self, document) -> bool:
matches = [
match
for match in self.search(
reduce(
ior,
[
Query()[field.name].matches(document[field.name], flags=re.IGNORECASE)
for field in document._metadata.fields.values()
if field.unique
],
)
)
if match.doc_id != document.doc_id
]
if matches != []:
raise UniqueConstraintError(document, matches)
class GrungDB(TinyDB):
"""
A TinyDB database instance that uses RecordTable instances for each table
and Record instances for each document in the table.
"""
default_table_name = "Record"
_tables = {}
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.create_table(Record)
def table(self, name: str) -> RecordTable:
if name not in self._tables:
raise RuntimeError(f"No such table: {name}")
return self._tables[name]
def create_table(self, table_class):
name = table_class.__name__
if name not in self._tables:
self._tables[name] = RecordTable(name, db=self, document_class=table_class)
return self.table(name)
def save(self, record):
"""
Create or update a record in its table.
"""
return self.table(record._metadata.table).insert(record)
def delete(self, record):
return self.table(record._metadata.table).remove(record)
def __getattr__(self, attr_name):
"""
Make tables attributes of the instance.
"""
if attr_name in self._tables:
return self.table(attr_name)
return super().__getattr__(attr_name)
@classmethod
def with_schema(cls, schema_module, *args, **kwargs):
db = GrungDB(*args, **kwargs)
for name, obj in inspect.getmembers(schema_module):
if type(obj) == type and issubclass(obj, Record):
db.create_table(obj)
return db