import inspect import re from functools import reduce from operator import ior from tinydb import Query, TinyDB, table from tinydb.table import Document from grung.exceptions import UniqueConstraintError from grung.types import Record class RecordTable(table.Table): """ Wrapper around tinydb Tables that handles Records instead of dicts. """ def __init__(self, name: str, db: TinyDB, document_class: Document = Record, **kwargs): self.document_class = document_class self._db = db super().__init__(db.storage, name, **kwargs) def insert(self, document): doc = document.serialize(self._db) self._check_constraints(doc) if doc.doc_id: last_insert_id = super().upsert(doc)[0] else: last_insert_id = super().insert(dict(doc)) return self.get(doc_id=last_insert_id) def get(self, doc_id: int): document = super().get(doc_id=doc_id) if document: return document.deserialize(self._db) def remove(self, document): if document.doc_id: super().remove(doc_ids=[document.doc_id]) def _check_constraints(self, document) -> bool: self._check_unique(document) def _check_unique(self, document) -> bool: matches = [ match for match in self.search( reduce( ior, [ Query()[field.name].matches(document[field.name], flags=re.IGNORECASE) for field in document._metadata.fields.values() if field.unique ], ) ) if match.doc_id != document.doc_id ] if matches != []: raise UniqueConstraintError(document, matches) class GrungDB(TinyDB): """ A TinyDB database instance that uses RecordTable instances for each table and Record instances for each document in the table. """ default_table_name = "Record" _tables = {} def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.create_table(Record) def table(self, name: str) -> RecordTable: if name not in self._tables: raise RuntimeError(f"No such table: {name}") return self._tables[name] def create_table(self, table_class): name = table_class.__name__ if name not in self._tables: self._tables[name] = RecordTable(name, db=self, document_class=table_class) return self.table(name) def save(self, record): """ Create or update a record in its table. """ return self.table(record._metadata.table).insert(record) def delete(self, record): return self.table(record._metadata.table).remove(record) def __getattr__(self, attr_name): """ Make tables attributes of the instance. """ if attr_name in self._tables: return self.table(attr_name) return super().__getattr__(attr_name) @classmethod def with_schema(cls, schema_module, *args, **kwargs): db = GrungDB(*args, **kwargs) for name, obj in inspect.getmembers(schema_module): if type(obj) == type and issubclass(obj, Record): db.create_table(obj) return db