Compare commits
2 Commits
99dbfe7e2d
...
7e649ee6e0
Author | SHA1 | Date | |
---|---|---|---|
![]() |
7e649ee6e0 | ||
![]() |
7e7d61efe9 |
|
@ -1,8 +1,12 @@
|
||||||
import inspect
|
import inspect
|
||||||
|
import re
|
||||||
|
from functools import reduce
|
||||||
|
from operator import ior
|
||||||
|
|
||||||
from tinydb import TinyDB, table
|
from tinydb import Query, TinyDB, table
|
||||||
from tinydb.table import Document
|
from tinydb.table import Document
|
||||||
|
|
||||||
|
from grung.exceptions import UniqueConstraintError
|
||||||
from grung.types import Record
|
from grung.types import Record
|
||||||
|
|
||||||
|
|
||||||
|
@ -11,9 +15,10 @@ class RecordTable(table.Table):
|
||||||
Wrapper around tinydb Tables that handles Records instead of dicts.
|
Wrapper around tinydb Tables that handles Records instead of dicts.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, storage, name, document_class: Document = Record, **kwargs):
|
def __init__(self, name: str, db: TinyDB, document_class: Document = Record, **kwargs):
|
||||||
self.document_class = document_class
|
self.document_class = document_class
|
||||||
super().__init__(storage, name, **kwargs)
|
self._db = db
|
||||||
|
super().__init__(db.storage, name, **kwargs)
|
||||||
|
|
||||||
def insert(self, document):
|
def insert(self, document):
|
||||||
self._satisfy_constraints(document)
|
self._satisfy_constraints(document)
|
||||||
|
@ -27,9 +32,26 @@ class RecordTable(table.Table):
|
||||||
if document.doc_id:
|
if document.doc_id:
|
||||||
super().remove(doc_ids=[document.doc_id])
|
super().remove(doc_ids=[document.doc_id])
|
||||||
|
|
||||||
def _satisfy_constraints(self, document):
|
def _satisfy_constraints(self, document) -> bool:
|
||||||
# check for uniqueness, etc.
|
self._check_unique(document)
|
||||||
pass
|
|
||||||
|
def _check_unique(self, document) -> bool:
|
||||||
|
matches = [
|
||||||
|
match
|
||||||
|
for match in self.search(
|
||||||
|
reduce(
|
||||||
|
ior,
|
||||||
|
[
|
||||||
|
Query()[field.name].matches(document[field.name], flags=re.IGNORECASE)
|
||||||
|
for field in document._metadata.fields.values()
|
||||||
|
if field.unique
|
||||||
|
],
|
||||||
|
)
|
||||||
|
)
|
||||||
|
if match.doc_id != document.doc_id
|
||||||
|
]
|
||||||
|
if matches != []:
|
||||||
|
raise UniqueConstraintError(document, matches)
|
||||||
|
|
||||||
|
|
||||||
class GrungDB(TinyDB):
|
class GrungDB(TinyDB):
|
||||||
|
@ -53,7 +75,7 @@ class GrungDB(TinyDB):
|
||||||
def create_table(self, table_class):
|
def create_table(self, table_class):
|
||||||
name = table_class.__name__
|
name = table_class.__name__
|
||||||
if name not in self._tables:
|
if name not in self._tables:
|
||||||
self._tables[name] = RecordTable(self.storage, name, document_class=table_class)
|
self._tables[name] = RecordTable(name, db=self, document_class=table_class)
|
||||||
return self.table(name)
|
return self.table(name)
|
||||||
|
|
||||||
def save(self, record):
|
def save(self, record):
|
||||||
|
|
12
src/grung/exceptions.py
Normal file
12
src/grung/exceptions.py
Normal file
|
@ -0,0 +1,12 @@
|
||||||
|
class UniqueConstraintError(Exception):
|
||||||
|
"""
|
||||||
|
Thrown when a db write operation cannot complete due to a field's unique constraint.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, document, collisions):
|
||||||
|
super().__init__(
|
||||||
|
"\n"
|
||||||
|
f" * Record: {dict(document)}\n"
|
||||||
|
f" * Error: Unique constraint failure\n"
|
||||||
|
" * The record matches the following existing records:\n\n" + "\n".join(str(c) for c in collisions)
|
||||||
|
)
|
|
@ -3,6 +3,7 @@ from tinydb.storages import MemoryStorage
|
||||||
|
|
||||||
from grung import examples
|
from grung import examples
|
||||||
from grung.db import GrungDB
|
from grung.db import GrungDB
|
||||||
|
from grung.exceptions import UniqueConstraintError
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
|
@ -49,3 +50,13 @@ def test_crud(db):
|
||||||
# delete
|
# delete
|
||||||
db.delete(players)
|
db.delete(players)
|
||||||
assert len(db.Group) == 0
|
assert len(db.Group) == 0
|
||||||
|
|
||||||
|
|
||||||
|
def test_unique(db):
|
||||||
|
user1 = examples.User(name="john", email="john@foo")
|
||||||
|
user2 = examples.User(name="john", email="john@foo")
|
||||||
|
|
||||||
|
user1 = db.save(user1)
|
||||||
|
with pytest.raises(UniqueConstraintError):
|
||||||
|
user2 = db.save(user2)
|
||||||
|
db.save(user1)
|
||||||
|
|
Loading…
Reference in New Issue
Block a user