Skip to content
Snippets Groups Projects
Verified Commit df755acb authored by Tom Teichler's avatar Tom Teichler :beers:
Browse files

[Reformat] flake8, black

parent 1228765f
No related branches found
No related tags found
No related merge requests found
Showing
with 717 additions and 249 deletions
...@@ -29,7 +29,9 @@ class GroupSubjectForm(forms.ModelForm): ...@@ -29,7 +29,9 @@ class GroupSubjectForm(forms.ModelForm):
self.fields["short_name"].widget = forms.HiddenInput() self.fields["short_name"].widget = forms.HiddenInput()
GroupSubjectFormset = forms.modelformset_factory(Group, form=GroupSubjectForm, max_num=0, extra=0) GroupSubjectFormset = forms.modelformset_factory(
Group, form=GroupSubjectForm, max_num=0, extra=0
)
if get_site_preferences()["untis_mysql__use_course_groups"]: if get_site_preferences()["untis_mysql__use_course_groups"]:
EditGroupForm.add_node_to_layout(Fieldset(_("UNTIS import"), "untis_subject")) EditGroupForm.add_node_to_layout(Fieldset(_("UNTIS import"), "untis_subject"))
...@@ -6,7 +6,10 @@ MENUS = { ...@@ -6,7 +6,10 @@ MENUS = {
"name": _("Untis XML import"), "name": _("Untis XML import"),
"url": "untis_xml_import", "url": "untis_xml_import",
"validators": [ "validators": [
("aleksis.core.util.predicates.permission_validator", "untis.do_xml_import"), (
"aleksis.core.util.predicates.permission_validator",
"untis.do_xml_import",
),
], ],
}, },
{ {
......
...@@ -5,40 +5,51 @@ from jsonstore import CharField, IntegerField ...@@ -5,40 +5,51 @@ from jsonstore import CharField, IntegerField
from aleksis.apps.chronos import models as chronos_models from aleksis.apps.chronos import models as chronos_models
from aleksis.core import models as core_models from aleksis.core import models as core_models
# Core models
from aleksis.core.util.core_helpers import get_site_preferences from aleksis.core.util.core_helpers import get_site_preferences
core_models.Person.field( core_models.Person.field(
import_ref_untis=IntegerField(verbose_name=_("UNTIS import reference"), null=True, blank=True) import_ref_untis=IntegerField(
verbose_name=_("UNTIS import reference"), null=True, blank=True
)
) )
core_models.Group.field( core_models.Group.field(
import_ref_untis=IntegerField(verbose_name=_("UNTIS import reference"), null=True, blank=True) import_ref_untis=IntegerField(
verbose_name=_("UNTIS import reference"), null=True, blank=True
)
) )
if get_site_preferences()["untis_mysql__use_course_groups"]: if get_site_preferences()["untis_mysql__use_course_groups"]:
core_models.Group.field( core_models.Group.field(
untis_subject=CharField( untis_subject=CharField(
verbose_name=_("UNTIS subject"), verbose_name=_("UNTIS subject"),
help_text=_( help_text=_(
"The UNTIS import will use this for matching course groups (along with parent groups)." "The UNTIS import will use this for matching course groups"
"(along with parent groups)."
), ),
blank=True, blank=True,
null=True,
max_length=255, max_length=255,
) )
) )
# Chronos models # Chronos models
chronos_models.Subject.field( chronos_models.Subject.field(
import_ref_untis=IntegerField(verbose_name=_("UNTIS import reference"), null=True, blank=True) import_ref_untis=IntegerField(
verbose_name=_("UNTIS import reference"), null=True, blank=True
)
) )
chronos_models.Room.field( chronos_models.Room.field(
import_ref_untis=IntegerField(verbose_name=_("UNTIS import reference"), null=True, blank=True) import_ref_untis=IntegerField(
verbose_name=_("UNTIS import reference"), null=True, blank=True
)
) )
chronos_models.SupervisionArea.field( chronos_models.SupervisionArea.field(
import_ref_untis=IntegerField(verbose_name=_("UNTIS import reference"), null=True, blank=True) import_ref_untis=IntegerField(
verbose_name=_("UNTIS import reference"), null=True, blank=True
)
) )
chronos_models.Lesson.field( chronos_models.Lesson.field(
lesson_id_untis=IntegerField(verbose_name=_("Lesson id in UNTIS"), null=True, blank=True) lesson_id_untis=IntegerField(
verbose_name=_("Lesson id in UNTIS"), null=True, blank=True
)
) )
chronos_models.Lesson.field( chronos_models.Lesson.field(
element_id_untis=IntegerField( element_id_untis=IntegerField(
...@@ -54,23 +65,37 @@ chronos_models.LessonPeriod.field( ...@@ -54,23 +65,37 @@ chronos_models.LessonPeriod.field(
) )
) )
chronos_models.LessonSubstitution.field( chronos_models.LessonSubstitution.field(
import_ref_untis=IntegerField(verbose_name=_("UNTIS import reference"), null=True, blank=True) import_ref_untis=IntegerField(
verbose_name=_("UNTIS import reference"), null=True, blank=True
)
) )
chronos_models.SupervisionSubstitution.field( chronos_models.SupervisionSubstitution.field(
import_ref_untis=IntegerField(verbose_name=_("UNTIS import reference"), null=True, blank=True) import_ref_untis=IntegerField(
verbose_name=_("UNTIS import reference"), null=True, blank=True
)
) )
chronos_models.AbsenceReason.field( chronos_models.AbsenceReason.field(
import_ref_untis=IntegerField(verbose_name=_("UNTIS import reference"), null=True, blank=True) import_ref_untis=IntegerField(
verbose_name=_("UNTIS import reference"), null=True, blank=True
)
) )
chronos_models.Absence.field( chronos_models.Absence.field(
import_ref_untis=IntegerField(verbose_name=_("UNTIS import reference"), null=True, blank=True) import_ref_untis=IntegerField(
verbose_name=_("UNTIS import reference"), null=True, blank=True
)
) )
chronos_models.Event.field( chronos_models.Event.field(
import_ref_untis=IntegerField(verbose_name=_("UNTIS import reference"), null=True, blank=True) import_ref_untis=IntegerField(
verbose_name=_("UNTIS import reference"), null=True, blank=True
)
) )
chronos_models.Holiday.field( chronos_models.Holiday.field(
import_ref_untis=IntegerField(verbose_name=_("UNTIS import reference"), null=True, blank=True) import_ref_untis=IntegerField(
verbose_name=_("UNTIS import reference"), null=True, blank=True
)
) )
chronos_models.ExtraLesson.field( chronos_models.ExtraLesson.field(
import_ref_untis=IntegerField(verbose_name=_("UNTIS import reference"), null=True, blank=True) import_ref_untis=IntegerField(
verbose_name=_("UNTIS import reference"), null=True, blank=True
)
) )
This diff is collapsed.
...@@ -79,5 +79,6 @@ class UseCourseGroups(BooleanPreference): ...@@ -79,5 +79,6 @@ class UseCourseGroups(BooleanPreference):
default = True default = True
verbose_name = _("Use course groups") verbose_name = _("Use course groups")
help_text = _( help_text = _(
"Build or search course groups for every course" " instead of setting classes as groups." "Build or search course groups for every course"
" instead of setting classes as groups."
) )
...@@ -5,5 +5,5 @@ from .util.mysql.main import untis_import_mysql as _untis_import_mysql ...@@ -5,5 +5,5 @@ from .util.mysql.main import untis_import_mysql as _untis_import_mysql
@celery_optional @celery_optional
def untis_import_mysql(): def untis_import_mysql():
""" Celery task for import of UNTIS data from MySQL """ """Celery task for import of UNTIS data from MySQL."""
_untis_import_mysql() _untis_import_mysql()
...@@ -26,7 +26,9 @@ class AbsenceType(Enum): ...@@ -26,7 +26,9 @@ class AbsenceType(Enum):
ROOM = 102 ROOM = 102
def import_absences(absence_reasons_ref, time_periods_ref, teachers_ref, classes_ref, rooms_ref): def import_absences(
absence_reasons_ref, time_periods_ref, teachers_ref, classes_ref, rooms_ref
):
ref = {} ref = {}
# Get term # Get term
......
import logging import logging
from datetime import time from datetime import time
from enum import Enum from enum import Enum
from typing import Dict, List from typing import Dict
from tqdm import tqdm from tqdm import tqdm
...@@ -26,7 +26,7 @@ class CommonDataId(Enum): ...@@ -26,7 +26,7 @@ class CommonDataId(Enum):
def import_subjects() -> Dict[int, chronos_models.Subject]: def import_subjects() -> Dict[int, chronos_models.Subject]:
""" Import subjects """ """Import subjects."""
subjects_ref = {} subjects_ref = {}
...@@ -94,7 +94,7 @@ def import_subjects() -> Dict[int, chronos_models.Subject]: ...@@ -94,7 +94,7 @@ def import_subjects() -> Dict[int, chronos_models.Subject]:
def import_teachers() -> Dict[int, core_models.Person]: def import_teachers() -> Dict[int, core_models.Person]:
""" Import teachers """ """Import teachers."""
teachers_ref = {} teachers_ref = {}
...@@ -159,8 +159,10 @@ def import_teachers() -> Dict[int, core_models.Person]: ...@@ -159,8 +159,10 @@ def import_teachers() -> Dict[int, core_models.Person]:
return teachers_ref return teachers_ref
def import_classes(teachers_ref: Dict[int, core_models.Person]) -> Dict[int, core_models.Group]: def import_classes(
""" Import classes """ teachers_ref: Dict[int, core_models.Person]
) -> Dict[int, core_models.Group]:
"""Import classes."""
classes_ref = {} classes_ref = {}
...@@ -171,7 +173,9 @@ def import_classes(teachers_ref: Dict[int, core_models.Person]) -> Dict[int, cor ...@@ -171,7 +173,9 @@ def import_classes(teachers_ref: Dict[int, core_models.Person]) -> Dict[int, cor
# Check if needed data are provided # Check if needed data are provided
if not class_.name: if not class_.name:
raise RuntimeException( raise RuntimeException(
"Class ID {}: Cannot import class without short name.".format(class_.teacher_id) "Class ID {}: Cannot import class without short name.".format(
class_.teacher_id
)
) )
# Build values # Build values
...@@ -201,7 +205,10 @@ def import_classes(teachers_ref: Dict[int, core_models.Person]) -> Dict[int, cor ...@@ -201,7 +205,10 @@ def import_classes(teachers_ref: Dict[int, core_models.Person]) -> Dict[int, cor
changed = True changed = True
logger.info(" Short name updated") logger.info(" Short name updated")
if get_site_preferences()["untis_mysql__update_groups_name"] and new_group.name != name: if (
get_site_preferences()["untis_mysql__update_groups_name"]
and new_group.name != name
):
new_group.name = name new_group.name = name
changed = True changed = True
logger.info(" Name updated") logger.info(" Name updated")
...@@ -227,7 +234,7 @@ def import_classes(teachers_ref: Dict[int, core_models.Person]) -> Dict[int, cor ...@@ -227,7 +234,7 @@ def import_classes(teachers_ref: Dict[int, core_models.Person]) -> Dict[int, cor
def import_rooms() -> Dict[int, chronos_models.Room]: def import_rooms() -> Dict[int, chronos_models.Room]:
""" Import rooms """ """Import rooms."""
ref = {} ref = {}
...@@ -237,7 +244,9 @@ def import_rooms() -> Dict[int, chronos_models.Room]: ...@@ -237,7 +244,9 @@ def import_rooms() -> Dict[int, chronos_models.Room]:
for room in tqdm(rooms, desc="Import rooms", **TQDM_DEFAULTS): for room in tqdm(rooms, desc="Import rooms", **TQDM_DEFAULTS):
if not room.name: if not room.name:
raise RuntimeException( raise RuntimeException(
"Room ID {}: Cannot import room without short name.".format(room.room_id) "Room ID {}: Cannot import room without short name.".format(
room.room_id
)
) )
# Build values # Build values
...@@ -248,7 +257,8 @@ def import_rooms() -> Dict[int, chronos_models.Room]: ...@@ -248,7 +257,8 @@ def import_rooms() -> Dict[int, chronos_models.Room]:
logger.info("Import room {} …".format(short_name)) logger.info("Import room {} …".format(short_name))
new_room, created = chronos_models.Room.objects.get_or_create( new_room, created = chronos_models.Room.objects.get_or_create(
short_name=short_name, defaults={"name": name, "import_ref_untis": import_ref}, short_name=short_name,
defaults={"name": name, "import_ref_untis": import_ref},
) )
if created: if created:
...@@ -256,7 +266,10 @@ def import_rooms() -> Dict[int, chronos_models.Room]: ...@@ -256,7 +266,10 @@ def import_rooms() -> Dict[int, chronos_models.Room]:
changed = False changed = False
if get_site_preferences()["untis_mysql__update_rooms_name"] and new_room.name != name: if (
get_site_preferences()["untis_mysql__update_rooms_name"]
and new_room.name != name
):
new_room.name = name new_room.name = name
changed = True changed = True
logger.info(" Name updated") logger.info(" Name updated")
...@@ -274,8 +287,10 @@ def import_rooms() -> Dict[int, chronos_models.Room]: ...@@ -274,8 +287,10 @@ def import_rooms() -> Dict[int, chronos_models.Room]:
return ref return ref
def import_supervision_areas(breaks_ref, teachers_ref) -> Dict[int, chronos_models.SupervisionArea]: def import_supervision_areas(
""" Import supervision areas """ breaks_ref, teachers_ref
) -> Dict[int, chronos_models.SupervisionArea]:
"""Import supervision areas."""
ref = {} ref = {}
...@@ -351,7 +366,7 @@ def import_supervision_areas(breaks_ref, teachers_ref) -> Dict[int, chronos_mode ...@@ -351,7 +366,7 @@ def import_supervision_areas(breaks_ref, teachers_ref) -> Dict[int, chronos_mode
teacher = teachers_ref[teacher_id] teacher = teachers_ref[teacher_id]
logger.info( logger.info(
" Import supervision on weekday {} before the {}. period (teacher {})".format( "Import supervision on weekday {} before the {}. period (teacher {})".format(
weekday, period_after_break, teacher weekday, period_after_break, teacher
) )
) )
...@@ -396,7 +411,7 @@ def import_supervision_areas(breaks_ref, teachers_ref) -> Dict[int, chronos_mode ...@@ -396,7 +411,7 @@ def import_supervision_areas(breaks_ref, teachers_ref) -> Dict[int, chronos_mode
def import_time_periods() -> Dict[int, Dict[int, chronos_models.TimePeriod]]: def import_time_periods() -> Dict[int, Dict[int, chronos_models.TimePeriod]]:
""" Import time periods an breaks """ """Import time periods an breaks."""
times = ( times = (
run_default_filter(mysql_models.Commondata.objects, filter_term=False) run_default_filter(mysql_models.Commondata.objects, filter_term=False)
...@@ -427,7 +442,9 @@ def import_time_periods() -> Dict[int, Dict[int, chronos_models.TimePeriod]]: ...@@ -427,7 +442,9 @@ def import_time_periods() -> Dict[int, Dict[int, chronos_models.TimePeriod]]:
start_time = times_ref[period][0] start_time = times_ref[period][0]
end_time = times_ref[period][1] end_time = times_ref[period][1]
logger.info("Import time period on weekday {} in the {}. period".format(weekday, period)) logger.info(
"Import time period on weekday {} in the {}. period".format(weekday, period)
)
new_time_period, created = chronos_models.TimePeriod.objects.get_or_create( new_time_period, created = chronos_models.TimePeriod.objects.get_or_create(
weekday=weekday, weekday=weekday,
...@@ -438,7 +455,10 @@ def import_time_periods() -> Dict[int, Dict[int, chronos_models.TimePeriod]]: ...@@ -438,7 +455,10 @@ def import_time_periods() -> Dict[int, Dict[int, chronos_models.TimePeriod]]:
if created: if created:
logger.info(" New time period created") logger.info(" New time period created")
if new_time_period.time_start != start_time or new_time_period.time_end != end_time: if (
new_time_period.time_start != start_time
or new_time_period.time_end != end_time
):
new_time_period.time_start = start_time new_time_period.time_start = start_time
new_time_period.time_end = end_time new_time_period.time_end = end_time
new_time_period.save() new_time_period.save()
...@@ -465,7 +485,9 @@ def import_breaks( ...@@ -465,7 +485,9 @@ def import_breaks(
# Add None two times in order to create breaks before first lesson and after last lesson # Add None two times in order to create breaks before first lesson and after last lesson
time_periods_for_breaks = [None] + list(time_periods.values()) + [None] time_periods_for_breaks = [None] + list(time_periods.values()) + [None]
for i, time_period in tqdm( for i, time_period in tqdm(
enumerate(time_periods_for_breaks), desc="Import breaks (period)", **TQDM_DEFAULTS enumerate(time_periods_for_breaks),
desc="Import breaks (period)",
**TQDM_DEFAULTS
): ):
# If last item (None) is reached, no further break must be created # If last item (None) is reached, no further break must be created
if i + 1 == len(time_periods_for_breaks): if i + 1 == len(time_periods_for_breaks):
...@@ -499,7 +521,7 @@ def import_breaks( ...@@ -499,7 +521,7 @@ def import_breaks(
def import_absence_reasons() -> Dict[int, chronos_models.AbsenceReason]: def import_absence_reasons() -> Dict[int, chronos_models.AbsenceReason]:
""" Import absence reasons """ """Import absence reasons."""
ref = {} ref = {}
...@@ -522,7 +544,8 @@ def import_absence_reasons() -> Dict[int, chronos_models.AbsenceReason]: ...@@ -522,7 +544,8 @@ def import_absence_reasons() -> Dict[int, chronos_models.AbsenceReason]:
logger.info("Import absence reason {} …".format(short_name)) logger.info("Import absence reason {} …".format(short_name))
new_reason, created = chronos_models.AbsenceReason.objects.get_or_create( new_reason, created = chronos_models.AbsenceReason.objects.get_or_create(
import_ref_untis=import_ref, defaults={"short_name": short_name, "name": name} import_ref_untis=import_ref,
defaults={"short_name": short_name, "name": name},
) )
if created: if created:
......
...@@ -12,7 +12,7 @@ logger = logging.getLogger(__name__) ...@@ -12,7 +12,7 @@ logger = logging.getLogger(__name__)
def import_holidays() -> Dict[int, chronos_models.Holiday]: def import_holidays() -> Dict[int, chronos_models.Holiday]:
""" Import holidays """ """Import holidays."""
ref = {} ref = {}
# Get holidays # Get holidays
...@@ -24,7 +24,9 @@ def import_holidays() -> Dict[int, chronos_models.Holiday]: ...@@ -24,7 +24,9 @@ def import_holidays() -> Dict[int, chronos_models.Holiday]:
# Check if needed data are provided # Check if needed data are provided
if not holiday.name: if not holiday.name:
raise RuntimeException( raise RuntimeException(
"Holiday ID {}: Cannot import holiday without short name.".format(import_ref) "Holiday ID {}: Cannot import holiday without short name.".format(
import_ref
)
) )
title = holiday.name[:50] title = holiday.name[:50]
......
...@@ -23,8 +23,10 @@ from ..util import ( ...@@ -23,8 +23,10 @@ from ..util import (
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def import_lessons(time_periods_ref, rooms_ref, subjects_ref, teachers_ref, classes_ref): def import_lessons(
""" Import lessons """ time_periods_ref, rooms_ref, subjects_ref, teachers_ref, classes_ref
):
"""Import lessons."""
# Get current term # Get current term
term = get_term() term = get_term()
...@@ -32,11 +34,11 @@ def import_lessons(time_periods_ref, rooms_ref, subjects_ref, teachers_ref, clas ...@@ -32,11 +34,11 @@ def import_lessons(time_periods_ref, rooms_ref, subjects_ref, teachers_ref, clas
date_end = untis_date_to_date(term.dateto) date_end = untis_date_to_date(term.dateto)
# Get all existing lessons for this term # Get all existing lessons for this term
lessons_in_term = chronos_models.Lesson.objects.filter(term_untis=term.term_id).values_list( lessons_in_term = chronos_models.Lesson.objects.filter(
"id", flat=True term_untis=term.term_id
) ).values_list("id", flat=True)
# Set the end date of all lessons from other terms ending in this term to the day before this term starts # Set end date of lessons from other terms ending in this term to the day before term starts
chronos_models.Lesson.objects.filter(date_end__gte=date_start).exclude( chronos_models.Lesson.objects.filter(date_end__gte=date_start).exclude(
id__in=lessons_in_term id__in=lessons_in_term
).update(date_end=date_start - timedelta(days=1)) ).update(date_end=date_start - timedelta(days=1))
...@@ -142,10 +144,12 @@ def import_lessons(time_periods_ref, rooms_ref, subjects_ref, teachers_ref, clas ...@@ -142,10 +144,12 @@ def import_lessons(time_periods_ref, rooms_ref, subjects_ref, teachers_ref, clas
# Build names and refs for course groups # Build names and refs for course groups
group_short_name = "{}-{}".format( group_short_name = "{}-{}".format(
"".join([c.short_name for c in course_classes]), subject.short_name "".join([c.short_name for c in course_classes]),
subject.short_name,
) )
group_name = "{}: {}".format( group_name = "{}: {}".format(
", ".join([c.short_name for c in course_classes]), subject.short_name ", ".join([c.short_name for c in course_classes]),
subject.short_name,
) )
# Get or create course group # Get or create course group
...@@ -229,7 +233,9 @@ def import_lessons(time_periods_ref, rooms_ref, subjects_ref, teachers_ref, clas ...@@ -229,7 +233,9 @@ def import_lessons(time_periods_ref, rooms_ref, subjects_ref, teachers_ref, clas
lesson.teachers.set(teachers) lesson.teachers.set(teachers)
# All times for this course # All times for this course
old_lesson_periods_qs = chronos_models.LessonPeriod.objects.filter(lesson=lesson) old_lesson_periods_qs = chronos_models.LessonPeriod.objects.filter(
lesson=lesson
)
# If length has changed, delete all lesson periods # If length has changed, delete all lesson periods
if old_lesson_periods_qs.count() != len(time_periods): if old_lesson_periods_qs.count() != len(time_periods):
...@@ -253,7 +259,10 @@ def import_lessons(time_periods_ref, rooms_ref, subjects_ref, teachers_ref, clas ...@@ -253,7 +259,10 @@ def import_lessons(time_periods_ref, rooms_ref, subjects_ref, teachers_ref, clas
# Update old lesson period # Update old lesson period
old_lesson_period = old_lesson_period_qs[0] old_lesson_period = old_lesson_period_qs[0]
if old_lesson_period.period != time_period or old_lesson_period.room != room: if (
old_lesson_period.period != time_period
or old_lesson_period.room != room
):
old_lesson_period.period = time_period old_lesson_period.period = time_period
old_lesson_period.room = room old_lesson_period.room = room
old_lesson_period.save() old_lesson_period.save()
......
...@@ -26,9 +26,14 @@ class SubstitutionFlag(Enum): ...@@ -26,9 +26,14 @@ class SubstitutionFlag(Enum):
def import_substitutions( def import_substitutions(
teachers_ref, subjects_ref, rooms_ref, classes_ref, supervision_areas_ref, time_periods_ref, teachers_ref,
subjects_ref,
rooms_ref,
classes_ref,
supervision_areas_ref,
time_periods_ref,
): ):
""" Import substitutions """ """Import substitutions."""
term = get_term() term = get_term()
date_start = untis_date_to_date(term.datefrom) date_start = untis_date_to_date(term.datefrom)
...@@ -133,7 +138,10 @@ def import_substitutions( ...@@ -133,7 +138,10 @@ def import_substitutions(
classes.append(classes_ref[id]) classes.append(classes_ref[id])
if lesson_period: if lesson_period:
(substitution, created,) = chronos_models.LessonSubstitution.objects.get_or_create( (
substitution,
created,
) = chronos_models.LessonSubstitution.objects.get_or_create(
lesson_period=lesson_period, week=week.week lesson_period=lesson_period, week=week.week
) )
...@@ -166,13 +174,18 @@ def import_substitutions( ...@@ -166,13 +174,18 @@ def import_substitutions(
logger.info(" Extra lesson detected") logger.info(" Extra lesson detected")
time_period = time_periods_ref[date.weekday()][period] time_period = time_periods_ref[date.weekday()][period]
groups = [classes_ref[pk] for pk in untis_split_first(sub.classids, int)] groups = [
classes_ref[pk] for pk in untis_split_first(sub.classids, int)
]
room = room_old if not room_new and room_old else room_new room = room_old if not room_new and room_old else room_new
subject = subject_old if not subject_new else subject_new subject = subject_old if not subject_new else subject_new
teachers = [teacher_old] if not teacher_new else [teacher_new] teachers = [teacher_old] if not teacher_new else [teacher_new]
(extra_lesson, created,) = chronos_models.ExtraLesson.objects.update_or_create( (
extra_lesson,
created,
) = chronos_models.ExtraLesson.objects.update_or_create(
import_ref_untis=sub_id, import_ref_untis=sub_id,
defaults={ defaults={
"week": week.week, "week": week.week,
...@@ -206,7 +219,9 @@ def import_substitutions( ...@@ -206,7 +219,9 @@ def import_substitutions(
substitution, substitution,
created, created,
) = chronos_models.SupervisionSubstitution.objects.get_or_create( ) = chronos_models.SupervisionSubstitution.objects.get_or_create(
supervision=supervision, date=date, defaults={"teacher": teacher_new}, supervision=supervision,
date=date,
defaults={"teacher": teacher_new},
) )
if created: if created:
...@@ -222,7 +237,9 @@ def import_substitutions( ...@@ -222,7 +237,9 @@ def import_substitutions(
logger.info(" Supervision substitution updated") logger.info(" Supervision substitution updated")
# Delete all no longer existing substitutions # Delete all no longer existing substitutions
for s in chronos_models.LessonSubstitution.objects.within_dates(date_start, date_end): for s in chronos_models.LessonSubstitution.objects.within_dates(
date_start, date_end
):
if s.import_ref_untis and s.import_ref_untis not in existing_subs: if s.import_ref_untis and s.import_ref_untis not in existing_subs:
logger.info("Substitution {} deleted".format(s.id)) logger.info("Substitution {} deleted".format(s.id))
s.delete() s.delete()
......
...@@ -42,9 +42,16 @@ def untis_import_mysql(): ...@@ -42,9 +42,16 @@ def untis_import_mysql():
import_lessons(time_periods_ref, rooms_ref, subjects_ref, teachers_ref, classes_ref) import_lessons(time_periods_ref, rooms_ref, subjects_ref, teachers_ref, classes_ref)
# Substitutions # Substitutions
import_absences(absence_reasons_ref, time_periods_ref, teachers_ref, classes_ref, rooms_ref) import_absences(
absence_reasons_ref, time_periods_ref, teachers_ref, classes_ref, rooms_ref
)
import_substitutions( import_substitutions(
teachers_ref, subjects_ref, rooms_ref, classes_ref, supervision_areas_ref, time_periods_ref teachers_ref,
subjects_ref,
rooms_ref,
classes_ref,
supervision_areas_ref,
time_periods_ref,
) )
# Events # Events
......
...@@ -20,18 +20,19 @@ logger = logging.getLogger(__name__) ...@@ -20,18 +20,19 @@ logger = logging.getLogger(__name__)
def run_using(obj: QuerySet) -> QuerySet: def run_using(obj: QuerySet) -> QuerySet:
""" Seed QuerySet with using() database from global DB_NAME """ """Seed QuerySet with using() database from global DB_NAME."""
return obj.using(DB_NAME) return obj.using(DB_NAME)
def get_term(for_date: Optional[date] = None) -> mysql_models.Terms: def get_term(for_date: Optional[date] = None) -> mysql_models.Terms:
""" Get term valid for the provided date """ """Get term valid for the provided date."""
if not for_date: if not for_date:
for_date = timezone.now().date() for_date = timezone.now().date()
term = run_using(mysql_models.Terms.objects).get( term = run_using(mysql_models.Terms.objects).get(
datefrom__lte=date_to_untis_date(for_date), dateto__gte=date_to_untis_date(for_date) datefrom__lte=date_to_untis_date(for_date),
dateto__gte=date_to_untis_date(for_date),
) )
return term return term
...@@ -43,7 +44,7 @@ def run_default_filter( ...@@ -43,7 +44,7 @@ def run_default_filter(
filter_term: bool = True, filter_term: bool = True,
filter_deleted: bool = True, filter_deleted: bool = True,
) -> QuerySet: ) -> QuerySet:
""" Add a default filter in order to select the correct term """ """Add a default filter in order to select the correct term."""
term = get_term(for_date) term = get_term(for_date)
term_id, schoolyear_id, school_id, version_id = ( term_id, schoolyear_id, school_id, version_id = (
...@@ -67,7 +68,8 @@ def run_default_filter( ...@@ -67,7 +68,8 @@ def run_default_filter(
def clean_array(seq: Sequence, conv: Callable[[Any], Any] = lambda el: el) -> Sequence: def clean_array(seq: Sequence, conv: Callable[[Any], Any] = lambda el: el) -> Sequence:
""" Convert a sequence using a converter function, stripping all """
Convert a sequence using a converter function, stripping all
elements that are boolean False after conversion. elements that are boolean False after conversion.
>>> clean_array(["a", "", "b"]) >>> clean_array(["a", "", "b"])
...@@ -77,7 +79,9 @@ def clean_array(seq: Sequence, conv: Callable[[Any], Any] = lambda el: el) -> Se ...@@ -77,7 +79,9 @@ def clean_array(seq: Sequence, conv: Callable[[Any], Any] = lambda el: el) -> Se
[8, 12] [8, 12]
""" """
filtered = filter(lambda el: bool(el), map(lambda el: conv(el) if el else None, seq)) filtered = filter(
lambda el: bool(el), map(lambda el: conv(el) if el else None, seq)
)
return type(seq)(filtered) return type(seq)(filtered)
...@@ -94,17 +98,17 @@ def untis_split_third(s: str, conv: Callable[[Any], Any] = lambda el: el) -> Seq ...@@ -94,17 +98,17 @@ def untis_split_third(s: str, conv: Callable[[Any], Any] = lambda el: el) -> Seq
def untis_date_to_date(untis: int) -> date: def untis_date_to_date(untis: int) -> date:
""" Converts a UNTIS date to a python date """ """Converts a UNTIS date to a python date."""
return datetime.strptime(str(untis), UNTIS_DATE_FORMAT).date() return datetime.strptime(str(untis), UNTIS_DATE_FORMAT).date()
def date_to_untis_date(from_date: date) -> int: def date_to_untis_date(from_date: date) -> int:
""" Converts a python date to a UNTIS date """ """Converts a python date to a UNTIS date."""
return int(from_date.strftime(UNTIS_DATE_FORMAT)) return int(from_date.strftime(UNTIS_DATE_FORMAT))
def untis_colour_to_hex(colour: int) -> str: def untis_colour_to_hex(colour: int) -> str:
""" Convert a numerical colour in BGR order to a standard hex RGB string """ """Convert a numerical colour in BGR order to a standard hex RGB string."""
# Convert UNTIS number to HEX # Convert UNTIS number to HEX
hex_bgr = str(hex(colour))[2:].zfill(6) hex_bgr = str(hex(colour))[2:].zfill(6)
...@@ -116,14 +120,16 @@ def untis_colour_to_hex(colour: int) -> str: ...@@ -116,14 +120,16 @@ def untis_colour_to_hex(colour: int) -> str:
return "#" + hex_rgb return "#" + hex_rgb
def compare_m2m(a: Union[Sequence[Model], QuerySet], b: Union[Sequence[Model], QuerySet]) -> bool: def compare_m2m(
""" Compare if content of two m2m fields is equal """ a: Union[Sequence[Model], QuerySet], b: Union[Sequence[Model], QuerySet]
) -> bool:
"""Compare if content of two m2m fields is equal."""
return set(a) == set(b) return set(a) == set(b)
def connect_untis_fields(obj: Model, attr: str, limit: int) -> Sequence[str]: def connect_untis_fields(obj: Model, attr: str, limit: int) -> Sequence[str]:
""" Connects data from multiple DB fields """Connects data from multiple DB fields.
Untis splits structured data, like lists, as comma-separated string into Untis splits structured data, like lists, as comma-separated string into
multiple, numbered database fields, like: multiple, numbered database fields, like:
...@@ -147,27 +153,27 @@ def connect_untis_fields(obj: Model, attr: str, limit: int) -> Sequence[str]: ...@@ -147,27 +153,27 @@ def connect_untis_fields(obj: Model, attr: str, limit: int) -> Sequence[str]:
def get_first_weekday(time_periods_ref: dict) -> int: def get_first_weekday(time_periods_ref: dict) -> int:
""" Get first weekday from time periods reference """ """Get first weekday from time periods reference."""
return sorted(time_periods_ref.keys())[0] return sorted(time_periods_ref.keys())[0]
def get_last_weekday(time_periods_ref: dict) -> int: def get_last_weekday(time_periods_ref: dict) -> int:
""" Get last weekday from time periods reference """ """Get last weekday from time periods reference."""
return sorted(time_periods_ref.keys())[-1] return sorted(time_periods_ref.keys())[-1]
def get_first_period(time_periods_ref: dict, weekday: int) -> int: def get_first_period(time_periods_ref: dict, weekday: int) -> int:
""" Get first period on a weekday from time periods reference """ """Get first period on a weekday from time periods reference."""
return sorted(time_periods_ref[weekday].keys())[0] return sorted(time_periods_ref[weekday].keys())[0]
def get_last_period(time_periods_ref: dict, weekday: int) -> int: def get_last_period(time_periods_ref: dict, weekday: int) -> int:
""" Get last period an a weekday from time periods reference """ """Get last period an a weekday from time periods reference."""
return sorted(time_periods_ref[weekday].keys())[-1] return sorted(time_periods_ref[weekday].keys())[-1]
def move_weekday_to_range(time_periods_ref: dict, weekday: int) -> int: def move_weekday_to_range(time_periods_ref: dict, weekday: int) -> int:
""" Move weekday values into school week (e. g. saturday to friday) """ """Move weekday values into school week (e. g. saturday to friday)."""
first_weekday = get_first_weekday(time_periods_ref) first_weekday = get_first_weekday(time_periods_ref)
last_weekday = get_last_weekday(time_periods_ref) last_weekday = get_last_weekday(time_periods_ref)
......
...@@ -132,7 +132,11 @@ def untis_import_xml(request: HttpRequest, untis_xml: Union[BinaryIO, str]) -> N ...@@ -132,7 +132,11 @@ def untis_import_xml(request: HttpRequest, untis_xml: Union[BinaryIO, str]) -> N
else None else None
) )
date_end = ( date_end = (
date(int(effectiveenddate[:4]), int(effectiveenddate[4:6]), int(effectiveenddate[6:]),) date(
int(effectiveenddate[:4]),
int(effectiveenddate[4:6]),
int(effectiveenddate[6:]),
)
if effectiveenddate if effectiveenddate
else None else None
) )
...@@ -144,7 +148,9 @@ def untis_import_xml(request: HttpRequest, untis_xml: Union[BinaryIO, str]) -> N ...@@ -144,7 +148,9 @@ def untis_import_xml(request: HttpRequest, untis_xml: Union[BinaryIO, str]) -> N
try: try:
groups = [Group.objects.get(short_name=v) for v in group_short_names] groups = [Group.objects.get(short_name=v) for v in group_short_names]
except Group.DoesNotExist: except Group.DoesNotExist:
messages.error(request, _("Invalid list of classes: %s") % ", ".join(group_short_names)) messages.error(
request, _("Invalid list of classes: %s") % ", ".join(group_short_names)
)
continue continue
try: try:
...@@ -152,11 +158,14 @@ def untis_import_xml(request: HttpRequest, untis_xml: Union[BinaryIO, str]) -> N ...@@ -152,11 +158,14 @@ def untis_import_xml(request: HttpRequest, untis_xml: Union[BinaryIO, str]) -> N
except Person.DoesNotExist: except Person.DoesNotExist:
messages.error( messages.error(
request, request,
_("Failed to import lesson: Teacher %s does not exist.") % teacher_short_name, _("Failed to import lesson: Teacher %s does not exist.")
% teacher_short_name,
) )
continue continue
lesson = Lesson.objects.create(subject=subject, date_start=date_start, date_end=date_end) lesson = Lesson.objects.create(
subject=subject, date_start=date_start, date_end=date_end
)
lesson.groups.set(groups) lesson.groups.set(groups)
lesson.teachers.set(teachers) lesson.teachers.set(teachers)
......
...@@ -31,7 +31,7 @@ def xml_import(request: HttpRequest) -> HttpResponse: ...@@ -31,7 +31,7 @@ def xml_import(request: HttpRequest) -> HttpResponse:
@permission_required("untis.assign_subjects_to_groups") @permission_required("untis.assign_subjects_to_groups")
def groups_subjects(request: HttpRequest) -> HttpResponse: def groups_subjects(request: HttpRequest) -> HttpResponse:
""" Assign subjects to groups (for matching by MySQL importer) """ """Assign subjects to groups (for matching by MySQL importer)."""
context = {} context = {}
groups_qs = Group.objects.all() groups_qs = Group.objects.all()
...@@ -43,7 +43,9 @@ def groups_subjects(request: HttpRequest) -> HttpResponse: ...@@ -43,7 +43,9 @@ def groups_subjects(request: HttpRequest) -> HttpResponse:
groups_paged = groups_qs.filter(id__in=[g.id for g in page]) groups_paged = groups_qs.filter(id__in=[g.id for g in page])
# Create filtered queryset # Create filtered queryset
group_subject_formset = GroupSubjectFormset(request.POST or None, queryset=groups_paged) group_subject_formset = GroupSubjectFormset(
request.POST or None, queryset=groups_paged
)
# Check if form is submitted and valid, then save # Check if form is submitted and valid, then save
if request.method == "POST": if request.method == "POST":
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment