105 lines
3.1 KiB
Python
105 lines
3.1 KiB
Python
import inspect
|
|
import re
|
|
from functools import reduce
|
|
from operator import ior
|
|
|
|
from tinydb import Query, TinyDB, table
|
|
from tinydb.table import Document
|
|
|
|
from grung.exceptions import UniqueConstraintError
|
|
from grung.types import Record
|
|
|
|
|
|
class RecordTable(table.Table):
|
|
"""
|
|
Wrapper around tinydb Tables that handles Records instead of dicts.
|
|
"""
|
|
|
|
def __init__(self, name: str, db: TinyDB, document_class: Document = Record, **kwargs):
|
|
self.document_class = document_class
|
|
self._db = db
|
|
super().__init__(db.storage, name, **kwargs)
|
|
|
|
def insert(self, document):
|
|
self._satisfy_constraints(document)
|
|
if document.doc_id:
|
|
last_insert_id = super().upsert(document)[0]
|
|
else:
|
|
last_insert_id = super().insert(dict(document))
|
|
return self.get(doc_id=last_insert_id)
|
|
|
|
def remove(self, document):
|
|
if document.doc_id:
|
|
super().remove(doc_ids=[document.doc_id])
|
|
|
|
def _satisfy_constraints(self, document) -> bool:
|
|
self._check_unique(document)
|
|
|
|
def _check_unique(self, document) -> bool:
|
|
matches = [
|
|
match
|
|
for match in self.search(
|
|
reduce(
|
|
ior,
|
|
[
|
|
Query()[field.name].matches(document[field.name], flags=re.IGNORECASE)
|
|
for field in document._metadata.fields.values()
|
|
if field.unique
|
|
],
|
|
)
|
|
)
|
|
if match.doc_id != document.doc_id
|
|
]
|
|
if matches != []:
|
|
raise UniqueConstraintError(document, matches)
|
|
|
|
|
|
class GrungDB(TinyDB):
|
|
"""
|
|
A TinyDB database instance that uses RecordTable instances for each table
|
|
and Record instances for each document in the table.
|
|
"""
|
|
|
|
default_table_name = "Record"
|
|
_tables = {}
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
self.create_table(Record)
|
|
|
|
def table(self, name: str) -> RecordTable:
|
|
if name not in self._tables:
|
|
raise RuntimeError(f"No such table: {name}")
|
|
return self._tables[name]
|
|
|
|
def create_table(self, table_class):
|
|
name = table_class.__name__
|
|
if name not in self._tables:
|
|
self._tables[name] = RecordTable(name, db=self, document_class=table_class)
|
|
return self.table(name)
|
|
|
|
def save(self, record):
|
|
"""
|
|
Create or update a record in its table.
|
|
"""
|
|
return self.table(record._metadata.table).insert(record)
|
|
|
|
def delete(self, record):
|
|
return self.table(record._metadata.table).remove(record)
|
|
|
|
def __getattr__(self, attr_name):
|
|
"""
|
|
Make tables attributes of the instance.
|
|
"""
|
|
if attr_name in self._tables:
|
|
return self.table(attr_name)
|
|
return super().__getattr__(attr_name)
|
|
|
|
@classmethod
|
|
def with_schema(cls, schema_module, *args, **kwargs):
|
|
db = GrungDB(*args, **kwargs)
|
|
for name, obj in inspect.getmembers(schema_module):
|
|
if type(obj) == type and issubclass(obj, Record):
|
|
db.create_table(obj)
|
|
return db
|