import enum
import decimal
from datetime import date
from typing import Any, Union

from sqlalchemy import func, select
from sqlalchemy.ext.declarative import DeclarativeMeta
from sqlalchemy.sql.expression import ClauseElement

NoneType = type(None)
"""type: the type of None :D"""

[docs]def get_or_create(session, model, defaults=None, **kwargs): """ Gets an instance of an object or if it does not exist then create it. Args: session: the database session model: the model / table to ger or create from defaults: any default parameters for creating **kwargs: the criteria to get or create Returns: a tuple(p,q) p: an instance of the object and q: if it is new or old """ instance = session.query(model).filter_by(**kwargs).first() if instance: return instance, False else: params = dict((k, v) for k, v in kwargs.items() if not isinstance(v, ClauseElement)) params.update(defaults or {}) instance = model(**params) session.add(instance) session.flush() return instance, True
[docs]def encode_model(model: DeclarativeMeta, parent_class: Any = None, expand: bool = False) -> dict: """ Encodes a DB instance object of a given model into json Args: model: The DB model instance to serialize to json parent_class: A DB model might inherit from another DB model. Pass the parent class in order to be serialized correctly expand: to include relationships or not Returns: a dictionary with basic data types that are all serializable """ if parent_class is None: parent_class = (NoneType,) columns = model.__table__.columns.keys() + model.__mapper__.relationships.keys() return dict((c, _alchemy_encoder(getattr(model, c), parent_class + (model.__class__,))) for c in columns if not c.endswith('_id') and _check_instance(getattr(model, c), parent_class, expand))
[docs]def encode_query(query: list, expand: bool = False) -> list: """ Loops through all of the objects in a query and encodes every object with the help of encode_model() function. All of the individual encoded models are added into a list and then returned. Args: query: the query that you would like to encode expand: if you should expand relationships in the models Returns: a list of dictionaries which are the encoded objects """ return [encode_model(q, expand=expand) for q in query]
[docs]def sanitize_like_query(query_str: str) -> str: """ Sanitizes a string to be used as a query in the DB. It will replace a * with % only when the star is not escaped. Args: query_str: original string to sanitize Returns: sanitized converted string """ import re if query_str.startswith('*'): query_str = "%" + query_str[1:] query_str = re.sub(r"([^\\])\*", r"\1%", query_str) query_str = query_str.replace("\\*", "*") return query_str
[docs]def get_table_row_count(db_session, model_table) -> int: """ Gets the number of rows in a given database in the given database session Args: db_session: the database session to use for db actions model_table: the model / table Returns: integer represent the number of row in the table """ row_query = select([func.count()]).select_from(model_table) row_count = db_session.execute(row_query).scalar() return row_count
def _alchemy_encoder(obj: Any, obj_class: Any) -> Union[str, float, int, dict, list]: """ JSON encoder function for SQLAlchemy special classes. Args: obj: Object to check obj_class: The class of the object passed Returns: Object formatted to acceptable json type """ if isinstance(obj, list): return [_alchemy_encoder(i, obj_class) for i in obj] if not isinstance(obj[0], obj_class) else [] elif isinstance(obj.__class__, DeclarativeMeta): return encode_model(obj, obj_class, expand=True) elif isinstance(obj, enum.Enum): return getattr(obj, 'name') elif isinstance(obj, date): return obj.isoformat() elif isinstance(obj, decimal.Decimal): return float(obj) else: return obj def _check_instance(model, parent, expand): if isinstance(model.__class__, DeclarativeMeta) and not expand: return False if isinstance(model, list): if len(model) == 0: return False if isinstance(model[0].__class__, DeclarativeMeta) and not expand: return False return not isinstance(model[0], parent) return not isinstance(model, parent)