grung-db/src/grung/db.py

104 lines
3.1 KiB
Python
Raw Normal View History

2025-09-24 19:51:39 -07:00
import inspect
2025-09-27 12:09:43 -07:00
from functools import reduce
from operator import ior
2025-09-24 19:51:39 -07:00
2025-09-27 12:09:43 -07:00
from tinydb import Query, TinyDB, table
2025-09-24 19:51:39 -07:00
from tinydb.table import Document
2025-09-27 12:09:43 -07:00
from grung.exceptions import UniqueConstraintError
2025-09-24 19:51:39 -07:00
from grung.types import Record
class RecordTable(table.Table):
"""
Wrapper around tinydb Tables that handles Records instead of dicts.
"""
2025-09-27 12:09:43 -07:00
def __init__(self, name: str, db: TinyDB, document_class: Document = Record, **kwargs):
2025-09-24 19:51:39 -07:00
self.document_class = document_class
2025-09-27 12:09:43 -07:00
self._db = db
super().__init__(db.storage, name, **kwargs)
2025-09-24 19:51:39 -07:00
def insert(self, document):
self._satisfy_constraints(document)
if document.doc_id:
last_insert_id = super().upsert(document)[0]
else:
last_insert_id = super().insert(dict(document))
return self.get(doc_id=last_insert_id)
def remove(self, document):
if document.doc_id:
super().remove(doc_ids=[document.doc_id])
2025-09-27 12:09:43 -07:00
def _satisfy_constraints(self, document) -> bool:
self._check_unique(document)
def _check_unique(self, document) -> bool:
matches = [
match
for match in self.search(
reduce(
ior,
[
Query().fragment({field.name: document[field.name]})
for field in document._metadata.fields.values()
if field.unique
],
)
)
if match.doc_id != document.doc_id
]
if matches != []:
raise UniqueConstraintError(document, matches)
2025-09-24 19:51:39 -07:00
class GrungDB(TinyDB):
"""
A TinyDB database instance that uses RecordTable instances for each table
and Record instances for each document in the table.
"""
default_table_name = "Record"
_tables = {}
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.create_table(Record)
def table(self, name: str) -> RecordTable:
if name not in self._tables:
raise RuntimeError(f"No such table: {name}")
return self._tables[name]
def create_table(self, table_class):
name = table_class.__name__
if name not in self._tables:
2025-09-27 12:09:43 -07:00
self._tables[name] = RecordTable(name, db=self, document_class=table_class)
2025-09-24 19:51:39 -07:00
return self.table(name)
def save(self, record):
"""
Create or update a record in its table.
"""
return self.table(record._metadata.table).insert(record)
def delete(self, record):
return self.table(record._metadata.table).remove(record)
def __getattr__(self, attr_name):
"""
Make tables attributes of the instance.
"""
if attr_name in self._tables:
return self.table(attr_name)
return super().__getattr__(attr_name)
@classmethod
def with_schema(cls, schema_module, *args, **kwargs):
db = GrungDB(*args, **kwargs)
for name, obj in inspect.getmembers(schema_module):
if type(obj) == type and issubclass(obj, Record):
2025-09-24 19:51:39 -07:00
db.create_table(obj)
return db