Source code for redirectory.libs_int.database.database_rule_actions

from typing import Union

from kubi_ecs_logger import Logger, Severity
from sqlalchemy import func, select
from sqlalchemy.exc import IntegrityError

from redirectory.models import RedirectRule, DomainRule, PathRule, DestinationRule
from . import db_get_or_create

MODEL_PROPERTY_ID_MAP = {
    DomainRule: RedirectRule.domain_rule_id,
    PathRule: RedirectRule.path_rule_id,
    DestinationRule: RedirectRule.destination_rule_id
}


[docs]def add_redirect_rule(db_session, domain: str, domain_is_regex: bool, path: str, path_is_regex: bool, destination: str, destination_is_rewrite: bool, weight: int, commit: bool = True) \ -> Union[RedirectRule, int]: """ Creates a new Redirect Rule from all of the given arguments. If a domain, path or destination is already used it is just going to be re-used in the new rule. Before all that it validates rules which are rewrites to see if they are configured correctly. Depending on where the check failed different integers will be returned. Args: db_session: the database session to use for the DB actions domain: the domain of the new rule domain_is_regex: is the domain a regex or not path: the path of the new rule path_is_regex: is the path a regex or not destination: the destination of the new rule destination_is_rewrite: is the destination a rewrite or not weight: the weight of the new rule commit: should the function commit the new rule or just flush for ids Returns: Redirect Rule - if all went well 1 (int) - if the check failed for rewrite rule 2 (int) - if the check for already existing rule failed """ if destination_is_rewrite and not validate_rewrite_rule(path, path_is_regex, destination): Logger() \ .event(category="database", action="rule added") \ .log(original=f"Rewrite rule failed validation check") \ .out(severity=Severity.DEBUG) return 1 domain_instance, is_new_domain = db_get_or_create(db_session, DomainRule, rule=domain, is_regex=domain_is_regex) path_instance, is_new_path = db_get_or_create(db_session, PathRule, rule=path, is_regex=path_is_regex) dest_instance, is_new_dest = db_get_or_create(db_session, DestinationRule, destination_url=destination, is_rewrite=destination_is_rewrite) new_redirect_rule = RedirectRule() new_redirect_rule.domain_rule_id = domain_instance.id new_redirect_rule.path_rule_id = path_instance.id new_redirect_rule.destination_rule_id = dest_instance.id new_redirect_rule.weight = weight db_session.add(new_redirect_rule) try: if commit: db_session.commit() else: db_session.flush() Logger() \ .event(category="database", action="rule added") \ .log(original=f"Added new redirect rule with id: {new_redirect_rule.id}") \ .out(severity=Severity.DEBUG) return new_redirect_rule except IntegrityError: Logger() \ .event(category="database", action="rule added") \ .log(original=f"Rule already exists") \ .out(severity=Severity.DEBUG) db_session.rollback() return 2
[docs]def update_redirect_rule(db_session, redirect_rule_id: int, domain: str, domain_is_regex: bool, path: str, path_is_regex: bool, destination: str, destination_is_rewrite: bool, weight: int) \ -> Union[RedirectRule, int]: """ Updates the rule with the given ID and with the given arguments. Finds the rule specified with the redirect_rule_id and updates it's values correspondingly. If everything goes correctly then the new version of the rule returned. If no rule with that ID is found an integer is returned If the new rule fails the rewrite validation an integer is returned Args: db_session: the database session to use for db actions redirect_rule_id: the ID of the rule to update domain: the new domain of the rule domain_is_regex: the new status of the domain rule path: the new path of the rule path_is_regex: the new status of the path rule destination: the new destination of the rule destination_is_rewrite: the new status of the destination rule weight: the new weight of the rule Returns: Redirect Rule - which is the updated version if all went well 1 (int) - rule exists but fails validation check for rewrite rule 2 (int) - rule with this id does not exist """ if destination_is_rewrite and not validate_rewrite_rule(path, path_is_regex, destination): Logger() \ .event(category="database", action="rule update") \ .log(original=f"The new rule updates make the rewrite rule incorrect") \ .out(severity=Severity.DEBUG) return 1 redirect_rule: RedirectRule = get_model_by_id(db_session, RedirectRule, redirect_rule_id) if not redirect_rule: Logger() \ .event(category="database", action="rule update") \ .log(original=f"A rule with ID: {redirect_rule_id} does not exist") \ .out(severity=Severity.DEBUG) return 2 domain_instance, _ = db_get_or_create(db_session, DomainRule, rule=domain, is_regex=domain_is_regex) path_instance, _ = db_get_or_create(db_session, PathRule, rule=path, is_regex=path_is_regex) dest_instance, _ = db_get_or_create(db_session, DestinationRule, destination_url=destination, is_rewrite=destination_is_rewrite) redirect_rule.domain_rule_id = domain_instance.id redirect_rule.path_rule_id = path_instance.id redirect_rule.destination_rule_id = dest_instance.id redirect_rule.weight = weight # redirect_rule.save() db_session.commit() Logger() \ .event(category="database", action="rule update") \ .log(original=f"A rule with ID: {redirect_rule_id} has been updated") \ .out(severity=Severity.DEBUG) return redirect_rule
[docs]def delete_redirect_rule(db_session, redirect_rule_id: int) -> bool: """ Tries to delete a redirect rule with a given id If the rule doesn't exist then false will be returned Args: db_session: the database session to use for db actions redirect_rule_id: the id of the rule to delete Returns: true if rule deleted successfully else false if rule not found """ result: RedirectRule = get_model_by_id(db_session, RedirectRule, redirect_rule_id) if result: result.delete(db_session) return True return False
[docs]def get_model_by_id(db_session, model, model_id): """ Queries a specific model / table in a given database session for a row with a given ID Args: db_session: the database session to use for db actions model: the model / table to query model_id: the id of the given model Returns: an instance of the model or None if not found """ return db_session.query(model).get(model_id)
[docs]def get_usage_count(db_session, model, model_instance_id) -> int: """ Creates a query that counts the usage of a given model with model_instance_id in the RedirectRule model / table. After that executes the query and returns the result Args: db_session: the database session to use for db actions model: the model to count the usages for model_instance_id: the id of the model instance Returns: an integer representing how many times a certain model with that id is used """ # Base query usage_query = select([func.count()]).select_from(RedirectRule) # Append correct where statement to query if model in MODEL_PROPERTY_ID_MAP: usage_query = usage_query.where(MODEL_PROPERTY_ID_MAP[model] == model_instance_id) # Execute query and get number usage = db_session.execute(usage_query).scalar() return usage
[docs]def validate_rewrite_rule(path: str, path_is_regex: bool, destination: str) -> bool: """ Checks if all of the needed variables/placeholders in the destination rule are also appearing in the path when compiled to a regex pattern. Args: path: the path rule to check for path_is_regex: if the path rule is a regex (hint in order to pass this check it always has to be) destination: the destination rule with placeholders in it Returns: True if the rule is valid else False """ import re import string if path_is_regex is False: return False try: # Get the groups from the regex pattern path_re_pattern = re.compile(path) groups = list(path_re_pattern.groupindex) # Get all the variables that need to be replaced variables_to_replace = [tup[1] for tup in string.Formatter().parse(destination) if tup[1] is not None] if len(groups) != len(variables_to_replace): return False for var in variables_to_replace: if var not in groups: return False return True except Exception: # All return False