Skip to content
Snippets Groups Projects
util.py 4.74 KiB
Newer Older
import logging
from datetime import date, datetime
from typing import Optional, Union, List
from django.db.models import QuerySet, Model
Jonathan Weth's avatar
Jonathan Weth committed
from aleksis.apps.untis import models as mysql_models

DB_NAME = "untis"

logger = logging.getLogger(__name__)

Jonathan Weth's avatar
Jonathan Weth committed

def run_using(obj: QuerySet) -> QuerySet:
    return obj.using(DB_NAME)


def get_term(date: Optional[date] = None) -> mysql_models.Terms:
Jonathan Weth's avatar
Jonathan Weth committed
    """ Get term valid for the provided date """

    if not date:
        date = datetime.now()

    terms = run_using(mysql_models.Terms.objects).filter(
        datefrom__lte=date_to_untis_date(date), dateto__gte=date_to_untis_date(date)
    )

    if not terms.exists():
        raise Exception("Term needed")

    return terms[0]


def run_default_filter(
    qs: QuerySet, date: Optional[date] = None, filter_term: bool = True, filter_deleted: bool = True
Jonathan Weth's avatar
Jonathan Weth committed
) -> QuerySet:
    """ Add a default filter in order to select the correct term """

    term = get_term(date)
    term_id, schoolyear_id, school_id, version_id = (
        term.term_id,
        term.schoolyear_id,
        term.school_id,
        term.version_id,
    )

    qs = run_using(qs).filter(
Jonathan Weth's avatar
Jonathan Weth committed
            school_id=school_id,
            schoolyear_id=schoolyear_id,
            version_id=version_id,
    )

    if filter_term:
        qs = qs.filter(term_id=term_id)

    if filter_deleted:
        qs = qs.filter(deleted=0)

    return qs
Jonathan Weth's avatar
Jonathan Weth committed


def clean_array(a: list, conv=None) -> list:
        if el != "" and el != "0":
            if conv is not None:
                el = conv(el)
            b.append(el)
    return b


Jonathan Weth's avatar
Jonathan Weth committed
def untis_split_first(s: str, conv=None) -> list:
    return clean_array(s.split(","), conv=conv)


Jonathan Weth's avatar
Jonathan Weth committed
def untis_split_second(s: str, conv=None) -> list:
    return clean_array(s.split("~"), conv=conv)


Jonathan Weth's avatar
Jonathan Weth committed
def untis_split_third(s: str, conv=None) -> list:
    return clean_array(s.split(";"), conv=conv)


UNTIS_DATE_FORMAT = "%Y%m%d"
def untis_date_to_date(untis: int) -> date:
    """ Converts a UNTIS date to a python date """
    return datetime.strptime(str(untis), UNTIS_DATE_FORMAT).date()
def date_to_untis_date(date: date) -> int:
    """ Converts a python date to a UNTIS date """
    return int(date.strftime(UNTIS_DATE_FORMAT))


def untis_colour_to_hex(colour: int) -> str:
    # Convert UNTIS number to HEX
    hex_bgr = str(hex(colour)).replace("0x", "")

    # Add beginning zeros if len < 6
    if len(hex_bgr) < 6:
        hex_bgr = "0" * (6 - len(hex_bgr)) + hex_bgr

    # Change BGR to RGB
    hex_rgb = hex_bgr[4:6] + hex_bgr[2:4] + hex_bgr[0:2]

    # Add html #
    return "#" + hex_rgb


def sync_m2m(new_items: Union[List[Model], QuerySet], m2m_qs: QuerySet):
    """ Sync m2m field """

    # Add items
    for item in new_items:
        if item not in m2m_qs.all():
            m2m_qs.add(item)
            logger.info("  Many-to-many sync: item added")

    # Delete items
    for item in m2m_qs.all():
        if item not in new_items:
            m2m_qs.remove(item)
            logger.info("  Many-to-many sync: item removed")


def compare_m2m(
    a: Union[List[Model], QuerySet], b: Union[List[Model], QuerySet]
) -> bool:
    """ Compare if content of two m2m fields is equal """

    ids_a = sorted([i.id for i in a])
    ids_b = sorted([i.id for i in b])
    return ids_a == ids_b


def connect_untis_fields(obj: Model, attr: str, limit: int) -> List[str]:
    """ Connects data from multiple DB fields """

    all_data = []

    for i in range(1, limit + 1):
        attr_name = "{}{}".format(attr, i)
        raw_data = getattr(obj, attr_name, "")
        if raw_data not in ("", None):
            data = untis_split_first(raw_data)
            all_data += data

    return all_data


def get_first_weekday(time_periods_ref: dict) -> int:
    """ Get first weekday from time periods reference """
    return sorted(time_periods_ref.keys())[0]


def get_last_weekday(time_periods_ref: dict) -> int:
    """ Get last weekday from time periods reference """
    return sorted(time_periods_ref.keys())[-1]


def get_first_period(time_periods_ref: dict, weekday: int) -> int:
    """ Get first period on a weekday from time periods reference """
    return sorted(time_periods_ref[weekday].keys())[0]


def get_last_period(time_periods_ref: dict, weekday: int) -> int:
    """ Get last period an a weekday from time periods reference """
    return sorted(time_periods_ref[weekday].keys())[-1]


def move_weekday_to_range(time_periods_ref: dict, weekday: int) -> int:
    """ Move weekday values into school week (e. g. saturday to friday) """
    first_weekday = get_first_weekday(time_periods_ref)
    last_weekday = get_last_weekday(time_periods_ref)

    if weekday < first_weekday:
        weekday = first_weekday
    if weekday > last_weekday:
        weekday = last_weekday

    return weekday