Source code for redirectory.libs_int.database.database_manager
import os
from typing import Optional
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, scoped_session
from sqlalchemy.ext.declarative import declarative_base
from kubi_ecs_logger import Logger, Severity
from redirectory.libs_int.config.configuration import Configuration
[docs]def get_connection_string():
"""
Generates a connection string to be passed to SQLAlchemy.
The string is created from the current loaded configuration with
the help of the Configuration() class.
There are two options for both SQLite and MySQL database connections.
Returns:
a connection string for SQLAlchemy to use for an engine
"""
config = Configuration().values
db = config.database
db_path = os.path.join(config.directories.data, db.path)
if config.deployment == 'test':
return 'sqlite://'
if db.type == "sqlite":
return f"sqlite:///{db_path}"
else:
return f"{db.type}://{db.username}:{db.password}@{db.host}:{db.port}/{db.name}"
[docs]class DatabaseManager:
__instance: Optional['DatabaseManager'] = None
__engine = None
__initialized: bool = False
__base = None
__session = None
__session_maker = None
def __new__(cls):
if cls.__instance is None:
cls.__instance = super(DatabaseManager, cls).__new__(cls)
connection_string = get_connection_string()
cls.__instance.__engine = create_engine(connection_string, echo=False)
cls.__instance.__base = declarative_base(bind=cls.__instance.__engine)
cls.__instance.__session_maker = sessionmaker(expire_on_commit=True, autoflush=True)
cls.__initialized = True
Logger().event(
category="database",
action="database loaded",
dataset=connection_string
).out(severity=Severity.INFO)
return cls.__instance
[docs] def reload(self):
self.__instance = None
# TODO:
pass
[docs] def get_session(self):
"""
Creates a scoped session with with the help of the session maker.
This session is specific to the current thread from where this function is called.
If a session already exists it will be returned but if not a new one will be created.
If the DatabaseManager is not initialized then a ValueError will be raised.
Returns:
a database session for the current thread
"""
if self.__initialized:
if self.__session is None:
self.__session = scoped_session(self.__session_maker)
return self.__session
else:
raise ValueError("DatabaseManager must be initialized!")
[docs] def return_session(self, session):
"""
Closes the given session and removes it from DatabaseManager to prevent from any further use.
Sets the session of the DatabaseManager to None
Args:
session: the session to remove
"""
session.remove()
self.__session = None
[docs] def get_base(self):
"""
Gets the current base that all models should inherit from.
Once a model inherits from this base it will be associated with it.
Returns:
the current base
"""
if self.__initialized:
return self.__base
else:
raise ValueError("DatabaseManager must be initialized!")
[docs] def delete_db_tables(self):
"""
Will drop all tables associated with the current base of the DatabaseManager.
Every model's table that inherits from this base will be dropped.
If the DatabaseManager is not initialized then a ValueError will be raised.
"""
if self.__initialized:
self.__base.metadata.drop_all()
else:
raise ValueError("DatabaseManager must be initialized!")
[docs] def create_db_tables(self):
"""
Will create all model's tables associated with the current DatabaseManager base.
The creation of those tables is safe. If a table already exists it will not be created again.
If the DatabaseManager is not initialized then a ValueError will be raised.
"""
if self.__initialized:
self.__base.metadata.create_all(checkfirst=True)
else:
raise ValueError("DatabaseManager must be initialized!")