-
Tom Teichler authoredTom Teichler authored
lessons.py 9.38 KiB
import logging
from django.db.models import Q
from django.utils.translation import gettext as _
import reversion
from tqdm import tqdm
from aleksis.apps.chronos import models as chronos_models
from aleksis.apps.chronos.models import ValidityRange
from aleksis.core import models as core_models
from aleksis.core.util.core_helpers import get_site_preferences
from .... import models as mysql_models
from ..util import (
TQDM_DEFAULTS,
compare_m2m,
connect_untis_fields,
run_default_filter,
untis_split_third,
)
logger = logging.getLogger(__name__)
def import_lessons(
validity_range: ValidityRange,
time_periods_ref,
rooms_ref,
subjects_ref,
teachers_ref,
classes_ref,
):
"""Import lessons."""
# Lessons
lessons = run_default_filter(validity_range, mysql_models.Lesson.objects)
existing_lessons = []
for lesson in tqdm(lessons, desc="Import lessons", **TQDM_DEFAULTS):
lesson_id = lesson.lesson_id
existing_lessons.append(lesson_id)
logger.info(_("Import lesson {}").format(lesson_id))
if not lesson.lesson_tt:
logger.warning(_(" Skip because missing times").format(lesson_id))
continue
# Split data (,)
raw_lesson_data = connect_untis_fields(lesson, "lessonelement", 10)
raw_time_data = lesson.lesson_tt.split(",")
raw_time_data_2 = []
for el in raw_time_data:
# Split data (~)
raw_time_data_2.append(el.split("~"))
# Get time periods and rooms
time_periods = []
rooms_per_periods = []
for el in raw_time_data_2:
weekday = int(el[1]) - 1
hour = int(el[2])
room_ids = untis_split_third(el[3], conv=int)
# Get rooms
rooms = []
for room_id in room_ids:
r = rooms_ref[room_id]
rooms.append(r)
# Get time period
time_period = time_periods_ref[weekday][hour]
time_periods.append(time_period)
rooms_per_periods.append(rooms)
# Split data more (~)
raw_lesson_data_2 = []
for el in raw_lesson_data:
raw_lesson_data_2.append(el.split("~"))
# All part lessons (courses)
for i, el in enumerate(raw_lesson_data_2):
logger.info(" Lesson part {}".format(i))
# Get plain ids
teacher_id = int(el[0])
subject_id = int(el[2])
class_ids = untis_split_third(el[17], conv=int)
# Get teacher
if teacher_id != 0:
teacher = teachers_ref[teacher_id]
else:
teacher = None
teachers = [teacher] if teacher else []
# Get subject
if subject_id != 0:
subject = subjects_ref[subject_id]
else:
logger.warning(_(" Skip because missing subject"))
continue
# Get classes
course_classes = []
for class_id in class_ids:
c = classes_ref[class_id]
course_classes.append(c)
if get_site_preferences()["untis_mysql__use_course_groups"]:
# Negative import_ref denotes a course group
group_import_ref = -int("{}{}".format(lesson_id, i))
# Search by parent groups and subject
qs = core_models.Group.objects.filter(
parent_groups__in=[c.id for c in course_classes], subject_id=subject.id,
).filter(Q(school_term__isnull=True) | Q(school_term=validity_range.school_term))
# Check if found groups match
match = False
for found_group in qs:
if compare_m2m(course_classes, found_group.parent_groups.all()):
match = True
course_group = found_group
logger.info(
" Course group found by searching by parent groups and subject"
)
changed = False
if not match:
# No matching group found
# Build names and refs for course groups
group_short_name = "{}-{}".format(
"".join([c.short_name for c in course_classes]), subject.short_name,
)
group_name = "{}: {}".format(
", ".join([c.short_name for c in course_classes]), subject.short_name,
)
# Get or create course group
course_group, created = core_models.Group.objects.get_or_create(
short_name=group_short_name, defaults={"name": group_name}
)
# Log
if created:
logger.info(" Course group created")
# Update parent groups
course_group.parent_groups.set(course_classes)
logger.info(" Parent groups set")
# Update name
if course_group.name != group_name:
course_group.name = group_name
logger.info(" Name of course group updated")
changed = True
# Update owners
course_group.owners.set(teachers)
# Update import ref
if course_group.import_ref_untis != group_import_ref:
course_group.import_ref_untis = group_import_ref
logger.info(" Import reference of course group updated")
changed = True
if course_group.subject != subject:
course_group.subject = subject
logger.info(" Subject reference of course group updated")
changed = True
if course_group.school_term != validity_range.school_term:
course_group.school_term = validity_range.school_term
logger.info(" School term reference of course group updated")
changed = True
if changed:
course_group.save()
groups = [course_group]
else:
groups = course_classes
# Get old lesson
old_lesson_qs = chronos_models.Lesson.objects.filter(
lesson_id_untis=lesson_id, element_id_untis=i, validity=validity_range
)
if old_lesson_qs.exists():
# Update existing lesson
logger.info(" Existing lesson found")
old_lesson = old_lesson_qs[0]
if old_lesson.subject != subject:
old_lesson.subject = subject
old_lesson.save()
logger.info(" Subject updated")
lesson = old_lesson
else:
# Create new lesson
lesson = chronos_models.Lesson.objects.create(
subject=subject,
lesson_id_untis=lesson_id,
element_id_untis=i,
validity=validity_range,
)
logger.info(" New lesson created")
# Sync groups
lesson.groups.set(groups)
# Sync teachers
lesson.teachers.set(teachers)
# All times for this course
old_lesson_periods_qs = chronos_models.LessonPeriod.objects.filter(lesson=lesson)
# If length has changed, delete all lesson periods
if old_lesson_periods_qs.count() != len(time_periods):
old_lesson_periods_qs.delete()
logger.info(" Lesson periods deleted")
# Sync time periods
for j, time_period in enumerate(time_periods):
logger.info(" Import lesson period {}".format(time_period))
# Get room if provided
rooms = rooms_per_periods[j]
if i < len(rooms):
room = rooms[i]
else:
room = None
# Check if an old lesson period is provided
old_lesson_period_qs = old_lesson_periods_qs.filter(element_id_untis=j)
if old_lesson_period_qs.exists():
# Update old lesson period
old_lesson_period = old_lesson_period_qs[0]
if old_lesson_period.period != time_period or old_lesson_period.room != room:
old_lesson_period.period = time_period
old_lesson_period.room = room
old_lesson_period.save()
logger.info(" Time period and room updated")
else:
# Create new lesson period
chronos_models.LessonPeriod.objects.create(
lesson=lesson, period=time_period, room=room, element_id_untis=j
)
logger.info(" New time period added")
for lesson in chronos_models.Lesson.objects.filter(validity=validity_range):
if lesson.lesson_id_untis and lesson.lesson_id_untis not in existing_lessons:
logger.info("Lesson {} deleted".format(lesson.id))
with reversion.create_revision():
lesson.save()
lesson.delete()