Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • AlekSIS/official/AlekSIS-App-Untis
  • sunweaver/AlekSIS-App-Untis
  • cocguPpenda/AlekSIS-App-Untis
  • 0inraMfauri/AlekSIS-App-Untis
4 results
Show changes
Showing
with 394 additions and 323 deletions
from django.core.management.base import BaseCommand
from ...tasks import untis_import_mysql_current_term
from ...commands import COMMANDS_BY_NAME
class Command(BaseCommand):
def add_arguments(self, parser):
parser.add_argument(
"command", nargs="?", default="current", type=str, choices=list(COMMANDS_BY_NAME.keys())
)
parser.add_argument(
"--background",
action="store_true",
help="Run import job in background using Celery",
)
parser.add_argument(
"--plan-version",
help="Select explicit Untis plan version",
)
parser.add_argument(
"--school-id",
help="Select explicit Untis school ID",
)
def handle(self, *args, **options):
untis_import_mysql_current_term.delay()
command = COMMANDS_BY_NAME[options["command"]]
background = options["background"]
school_id = options.get("school_id")
version = options.get("plan_version")
command.run(background=background, school_id=school_id, version=version)
from django.core.management.base import BaseCommand
from ...tasks import untis_import_mysql_all_terms
class Command(BaseCommand):
def handle(self, *args, **options):
untis_import_mysql_all_terms.delay()
from django.core.management.base import BaseCommand
from ...tasks import untis_import_mysql_current_future_terms
class Command(BaseCommand):
def handle(self, *args, **options):
untis_import_mysql_current_future_terms.delay()
from django.core.management.base import BaseCommand
from ...tasks import untis_import_mysql_current_next_term
class Command(BaseCommand):
def handle(self, *args, **options):
untis_import_mysql_current_next_term.delay()
from django.core.management.base import BaseCommand
from ...tasks import untis_import_mysql_future_terms
class Command(BaseCommand):
def handle(self, *args, **options):
untis_import_mysql_future_terms.delay()
from django.core.management.base import BaseCommand
from django.utils.translation import ugettext as _
from ...util.xml.xml import untis_import_xml
class Command(BaseCommand):
def add_arguments(self, parser):
parser.add_argument("untis_xml_path", help=_("Path to Untis XML export file"))
def handle(self, *args, **options):
untis_xml = open(options["untis_xml_path"], "rb")
untis_import_xml(None, untis_xml)
from django.utils.translation import gettext_lazy as _
MENUS = {
"DATA_MANAGEMENT_MENU": [
{
"name": _("Untis XML import"),
"url": "untis_xml_import",
"validators": [
("aleksis.core.util.predicates.permission_validator", "untis.do_xml_import",),
],
},
{
"name": _("Link subjects to groups (for UNTIS MySQL import)"),
"url": "untis_groups_subjects",
"validators": [
(
"aleksis.core.util.predicates.permission_validator",
"untis.assign_subjects_to_groups",
),
],
},
]
}
......@@ -2272,7 +2272,6 @@ class Migration(migrations.Migration):
],
options={
"permissions": (
("do_xml_import", "Can do XML import"),
("assign_subjects_to_groups", "Can assign subjects to groups"),
),
"managed": False,
......
......@@ -12,6 +12,6 @@ class Migration(migrations.Migration):
operations = [
migrations.AlterModelOptions(
name='globalpermissions',
options={'managed': False, 'permissions': (('do_xml_import', 'Kann XML-Import durchführen'), ('assign_subjects_to_groups', 'Kann Fächer zu Gruppen zuzuordnen'))},
options={'managed': False, 'permissions': (('assign_subjects_to_groups', 'Kann Fächer zu Gruppen zuzuordnen'),)},
),
]
from django.db import migrations
from aleksis.core.util.core_helpers import get_site_preferences
def guess_school_id(apps, schema_editor):
db_alias = schema_editor.connection.alias
from aleksis.apps.lesrooster.models import ValidityRange
try:
vr = ValidityRange.objects.using(db_alias).first()
except ValidityRange.DoesNotExist:
return
if not vr or not vr.school_id_untis:
return
get_site_preferences()["untis_mysql__school_id"] = vr.school_id_untis
class Migration(migrations.Migration):
dependencies = [
('untis', '0002_auto_20200820_1542'),
]
operations = [
migrations.RunPython(guess_school_id),
]
from django.utils.translation import gettext as _
from jsonstore import IntegerField
from aleksis.apps.chronos import models as chronos_models
from aleksis.core import models as core_models
core_models.SchoolTerm.field(
import_ref_untis=IntegerField(verbose_name=_("UNTIS import reference"), null=True, blank=True)
)
core_models.Person.field(
import_ref_untis=IntegerField(verbose_name=_("UNTIS import reference"), null=True, blank=True)
)
core_models.Group.field(
import_ref_untis=IntegerField(verbose_name=_("UNTIS import reference"), null=True, blank=True)
)
# Chronos models
chronos_models.ValidityRange.field(
import_ref_untis=IntegerField(verbose_name=_("UNTIS import reference"), null=True, blank=True)
)
chronos_models.ValidityRange.field(
school_year_untis=IntegerField(verbose_name=_("UNTIS school year ID"), null=True, blank=True)
)
chronos_models.ValidityRange.field(
school_id_untis=IntegerField(verbose_name=_("UNTIS school id"), null=True, blank=True)
)
chronos_models.ValidityRange.field(
version_id_untis=IntegerField(verbose_name=_("UNTIS version id"), null=True, blank=True)
)
chronos_models.Subject.field(
import_ref_untis=IntegerField(verbose_name=_("UNTIS import reference"), null=True, blank=True)
)
chronos_models.Room.field(
import_ref_untis=IntegerField(verbose_name=_("UNTIS import reference"), null=True, blank=True)
)
chronos_models.SupervisionArea.field(
import_ref_untis=IntegerField(verbose_name=_("UNTIS import reference"), null=True, blank=True)
)
chronos_models.Lesson.field(
lesson_id_untis=IntegerField(verbose_name=_("Lesson id in UNTIS"), null=True, blank=True)
)
chronos_models.Lesson.field(
element_id_untis=IntegerField(
verbose_name=_("Number of lesson element in UNTIS"), null=True, blank=True
)
)
chronos_models.LessonPeriod.field(
element_id_untis=IntegerField(
verbose_name=_("Number of lesson element in UNTIS"), null=True, blank=True
)
)
chronos_models.LessonSubstitution.field(
import_ref_untis=IntegerField(verbose_name=_("UNTIS import reference"), null=True, blank=True)
)
chronos_models.SupervisionSubstitution.field(
import_ref_untis=IntegerField(verbose_name=_("UNTIS import reference"), null=True, blank=True)
)
chronos_models.AbsenceReason.field(
import_ref_untis=IntegerField(verbose_name=_("UNTIS import reference"), null=True, blank=True)
)
chronos_models.Absence.field(
import_ref_untis=IntegerField(verbose_name=_("UNTIS import reference"), null=True, blank=True)
)
chronos_models.Event.field(
import_ref_untis=IntegerField(verbose_name=_("UNTIS import reference"), null=True, blank=True)
)
chronos_models.Holiday.field(
import_ref_untis=IntegerField(verbose_name=_("UNTIS import reference"), null=True, blank=True)
)
chronos_models.ExtraLesson.field(
import_ref_untis=IntegerField(verbose_name=_("UNTIS import reference"), null=True, blank=True)
)
......@@ -8,6 +8,8 @@ from django.utils.translation import gettext as _
from aleksis.core.mixins import PureDjangoModel
from .data_checks import CourseGroupNotFoundAndCreated, CourseGroupNotFoundAndNotCreated
class Absence(models.Model, PureDjangoModel):
school_id = models.IntegerField(
......@@ -679,7 +681,15 @@ class Commondata(models.Model, PureDjangoModel):
managed = False
db_table = "CommonData"
unique_together = (
("school_id", "schoolyear_id", "version_id", "id", "owner", "number", "number1",),
(
"school_id",
"schoolyear_id",
"version_id",
"id",
"owner",
"number",
"number1",
),
)
......@@ -1840,7 +1850,14 @@ class Lessonsequence(models.Model, PureDjangoModel):
managed = False
db_table = "LessonSequence"
unique_together = (
("school_id", "schoolyear_id", "version_id", "type", "lesson_sequence_id", "term_id",),
(
"school_id",
"schoolyear_id",
"version_id",
"type",
"lesson_sequence_id",
"term_id",
),
)
......@@ -2447,7 +2464,14 @@ class Studentchoice(models.Model, PureDjangoModel):
managed = False
db_table = "StudentChoice"
unique_together = (
("school_id", "schoolyear_id", "version_id", "student_id", "term_id", "number",),
(
"school_id",
"schoolyear_id",
"version_id",
"student_id",
"term_id",
"number",
),
)
......@@ -2810,7 +2834,13 @@ class Ttelementfilter(models.Model, PureDjangoModel):
managed = False
db_table = "TTElementFilter"
unique_together = (
("school_id", "schoolyear_id", "version_id", "tt_element_filter_id", "owner",),
(
"school_id",
"schoolyear_id",
"version_id",
"tt_element_filter_id",
"owner",
),
)
......@@ -4216,9 +4246,8 @@ class Views(models.Model, PureDjangoModel):
class GlobalPermissions(models.Model, PureDjangoModel):
data_checks = [CourseGroupNotFoundAndCreated, CourseGroupNotFoundAndNotCreated]
class Meta:
managed = False
permissions = (
("do_xml_import", _("Can do XML import")),
("assign_subjects_to_groups", _("Can assign subjects to groups")),
)
permissions = (("assign_subjects_to_groups", _("Can assign subjects to groups")),)
from django.utils.translation import gettext as _
from django.utils.translation import gettext_lazy as _
from dynamic_preferences.preferences import Section
from dynamic_preferences.types import BooleanPreference
from dynamic_preferences.types import BooleanPreference, IntegerPreference, ModelChoicePreference
from aleksis.core.models import GroupType
from aleksis.core.registries import site_preferences_registry
untis_mysql = Section("untis_mysql", verbose_name=_("UNTIS: MySQL"))
untis_mysql = Section("untis_mysql", verbose_name=_("Untis: MySQL"))
@site_preferences_registry.register
class SchoolID(IntegerPreference):
section = untis_mysql
name = "school_id"
default = 0
verbose_name = _("School ID in Untis database")
@site_preferences_registry.register
......@@ -48,6 +57,14 @@ class UpdateGroupsName(BooleanPreference):
verbose_name = _("Update name of existing groups")
@site_preferences_registry.register
class DisambiguateGroupsName(BooleanPreference):
section = untis_mysql
name = "disambiguate_groups_name"
default = True
verbose_name = _("Disambiguate name of new groups")
@site_preferences_registry.register
class OverwriteGroupOwners(BooleanPreference):
section = untis_mysql
......@@ -56,6 +73,20 @@ class OverwriteGroupOwners(BooleanPreference):
default = True
@site_preferences_registry.register
class GroupTypeClassGroups(ModelChoicePreference):
section = untis_mysql
name = "group_type_class_groups"
required = False
default = None
model = GroupType
verbose_name = _("Group type for class groups")
help_text = _("If you leave it empty, no group type will be used.")
def get_queryset(self):
return GroupType.objects.managed_and_unmanaged()
@site_preferences_registry.register
class UpdateRoomsName(BooleanPreference):
section = untis_mysql
......@@ -78,6 +109,52 @@ class UseCourseGroups(BooleanPreference):
name = "use_course_groups"
default = True
verbose_name = _("Use course groups")
help_text = _(
"Build or search course groups for every course" " instead of setting classes as groups."
)
help_text = _("Search course groups for every course instead of setting classes as groups.")
@site_preferences_registry.register
class CreateCourseGroups(BooleanPreference):
section = untis_mysql
name = "create_course_groups"
default = True
verbose_name = _("Create non-existing course groups")
help_text = _("Only used if 'Use course groups' is enabled.")
@site_preferences_registry.register
class GroupTypeCourseGroups(ModelChoicePreference):
section = untis_mysql
name = "group_type_course_groups"
required = False
default = None
model = GroupType
verbose_name = _("Group type for course groups")
help_text = _("If you leave it empty, no group type will be used.")
def get_queryset(self):
return GroupType.objects.managed_and_unmanaged()
@site_preferences_registry.register
class DataCheckNotFoundCourseGroups(BooleanPreference):
section = untis_mysql
name = "data_check_for_not_found_course_groups"
default = True
verbose_name = _("Register a data problem if a course group has been not found.")
@site_preferences_registry.register
class CourseGroupsFuzzyMatching(BooleanPreference):
section = untis_mysql
name = "course_groups_fuzzy_matching"
default = False
verbose_name = _("Match course groups by a subset of parent groups if no 100% match is found")
help_text = _("Works only if 'Use course groups' is activated.")
@site_preferences_registry.register
class IgnoreIncompleteSubstitutions(BooleanPreference):
section = untis_mysql
name = "ignore_incomplete_substitutions"
default = True
verbose_name = _("Ignore incomplete substitutions")
......@@ -2,13 +2,7 @@ from rules import add_perm
from aleksis.core.util.predicates import has_global_perm, has_person
# Do XML import
do_xml_import_predicate = has_person & has_global_perm("untis.do_xml_import")
add_perm("untis.do_xml_import", do_xml_import_predicate)
# Do XML import
assign_subjects_to_groups_predicate = has_person & has_global_perm(
"untis.assign_subjects_to_groups"
)
add_perm("untis.assign_subjects_to_groups", assign_subjects_to_groups_predicate)
add_perm("untis.assign_subjects_to_groups_rule", assign_subjects_to_groups_predicate)
......@@ -9,5 +9,7 @@ if _settings.get("untis.database.enabled"):
"PASSWORD": _settings.get("untis.database.password", None),
"HOST": _settings.get("untis.database.host", "127.0.0.1"),
"PORT": _settings.get("untis.database.port", 3306),
"CONN_MAX_AGE": _settings.get("untis.database.conn_max_age", 0),
"OPTIONS": _settings.get("untis.database.options", {}),
}
}
from aleksis.apps.untis.util.mysql.importers.terms import (
get_future_terms_for_date,
get_terms_for_date,
)
from aleksis.core.celery import app
from .util.mysql.main import untis_import_mysql as _untis_import_mysql
from .commands import COMMANDS_BY_TASK_NAME, ImportCommand
TASKS = {}
for import_command in ImportCommand.__subclasses__():
@app.task
def untis_import_mysql_current_term():
"""Celery task for import of UNTIS data from MySQL (current term)."""
terms = get_terms_for_date()
_untis_import_mysql(terms)
@app.task(name=import_command.task_name, bind=True)
def _task(self, *args, **kwargs):
import_command = COMMANDS_BY_TASK_NAME[self.name]
import_command.run(*args, **kwargs)
@app.task
def untis_import_mysql_future_terms():
"""Celery task for import of UNTIS data from MySQL (all future terms)."""
terms = get_future_terms_for_date()
_untis_import_mysql(terms)
@app.task
def untis_import_mysql_all_terms():
"""Celery task for import of UNTIS data from MySQL (all terms in DB)."""
_untis_import_mysql()
@app.task
def untis_import_mysql_current_next_term():
"""Celery task for import of UNTIS data from MySQL (current and next term)."""
terms = get_terms_for_date()
future_terms = get_future_terms_for_date()
if future_terms.exists():
terms = terms.union(future_terms[0:1])
_untis_import_mysql(terms)
@app.task
def untis_import_mysql_current_future_terms():
"""Celery task for import of UNTIS data from MySQL (current and future terms)."""
terms = get_terms_for_date()
future_terms = get_future_terms_for_date()
terms = terms.union(future_terms)
_untis_import_mysql(terms)
TASKS[import_command.task_name] = _task
{# -*- engine:django -*- #}
{% extends "core/base.html" %}
{% load material_form i18n %}
{% block browser_title %}{% blocktrans %}Import Untis data via XML{% endblocktrans %}{% endblock %}
{% block page_title %}{% blocktrans %}Import Untis data via XML{% endblocktrans %}{% endblock %}
{% block content %}
<p class="flow-text">
{% blocktrans %}
Untis provides a function for exporting all data as an XML file.
{% endblocktrans %}
</p>
<div class="alert warning">
<p>
<i class="material-icons left">warning</i>
{% blocktrans %}
Newly imported data will be valid as of tomorrow.
{% endblocktrans %}
{% blocktrans %}
The effective dates of all existing lessons will be set to end
today.
{% endblocktrans %}
{% blocktrans %}
The effective dates of all newly imported lessons will be set to
start tomorrow.
{% endblocktrans %}
{% blocktrans %}
Teachers, rooms, subjects and classes and periods will be updated in place.
{% endblocktrans %}
</p>
</div>
<form method="post" enctype="multipart/form-data">
{% csrf_token %}
{% form form=upload_form %}{% endform %}
{% include "core/partials/save_button.html" with icon="import_export" caption=_("Import data") %}
</form>
{% endblock %}
from aleksis.apps.untis.util.mysql.util import (
untis_split_first,
untis_split_second,
untis_split_third,
)
test_lesson_element = (
'49~0~302~7r;~0;~0~0~175608~~~~~"Freiraum: Naturwissenschaften, Fokus Physik"~'
'"Nawi"~~n~~;11;12;13;14;15;16;17;,49~0~302~7r;~0;~0~0~175608~~'
'~~~"Freiraum: Naturwissenschaften, Fokus Physik"~"Nawi"~~n~~;11;12;13;14;15;16;17;'
)
test_lesson_element_partial = (
'49~0~302~7r;~0;~0~0~175608~~~~~"Freiraum: Naturwissenschaften,'
+ ' Fokus Physik"~"Nawi"~~n~~;11;12;13;14;15;16;17;'
)
test_lesson_element_partial_partial = ";11;12;13;14;15;16;17;"
test_lesson_element_second = (
'46~0~92~45;~45;~12~11~0~~B~~~~~~n~~;18;~"Wp_Eb"~~34~0~67900~0~0~~'
+ '21700,45~0~92~45;~45;~0~0~19000~~~~~~~~n~~;18;~"Wp_Eb"~~34~0~67900~0~0~~0'
)
def test_untis_split_first():
assert untis_split_first("") == []
assert untis_split_first("a,b,c") == ["a", "b", "c"]
assert untis_split_first("1,2,3") == ["1", "2", "3"]
assert untis_split_first("1,2,3", int) == [1, 2, 3]
assert len(untis_split_first(test_lesson_element)) == 2
assert untis_split_first(test_lesson_element) == [
'49~0~302~7r;~0;~0~0~175608~~~~~"Freiraum: Naturwissenschaften, '
+ 'Fokus Physik"~"Nawi"~~n~~;11;12;13;14;15;16;17;',
'49~0~302~7r;~0;~0~0~175608~~~~~"Freiraum: Naturwissenschaften, '
+ 'Fokus Physik"~"Nawi"~~n~~;11;12;13;14;15;16;17;',
]
assert len(untis_split_first(test_lesson_element_second)) == 2
assert untis_split_first(test_lesson_element_second) == [
'46~0~92~45;~45;~12~11~0~~B~~~~~~n~~;18;~"Wp_Eb"~~34~0~67900~0~0~~21700',
'45~0~92~45;~45;~0~0~19000~~~~~~~~n~~;18;~"Wp_Eb"~~34~0~67900~0~0~~0',
]
def test_untis_split_second():
assert untis_split_second("") == []
assert untis_split_second("a~b~c") == ["a", "b", "c"]
assert untis_split_second("1~2~3") == ["1", "2", "3"]
assert untis_split_second("1~2~3", int) == [1, 2, 3]
assert untis_split_second("1~~3", remove_empty=False) == ["1", None, "3"]
assert untis_split_second("1~~3", int, remove_empty=False) == [1, None, 3]
assert untis_split_second("1~~3", int) == [1, 3]
assert untis_split_second('"asdf"~"asdf"~"asdf') == ["asdf", "asdf", "asdf"]
assert len(untis_split_second(test_lesson_element_partial, remove_empty=False)) == 18
assert untis_split_second(test_lesson_element_partial, remove_empty=False) == [
"49",
"0",
"302",
"7r;",
"0;",
"0",
"0",
"175608",
None,
None,
None,
None,
"Freiraum: Naturwissenschaften, Fokus Physik",
"Nawi",
None,
"n",
None,
";11;12;13;14;15;16;17;",
]
def test_untis_split_third():
assert untis_split_third("") == []
assert untis_split_third("a;b;c") == ["a", "b", "c"]
assert untis_split_third("1;2;3") == ["1", "2", "3"]
assert untis_split_third("1;2;3", int) == [1, 2, 3]
assert untis_split_third("1;;3", remove_empty=False) == ["1", None, "3"]
assert untis_split_third("1;;3", int, remove_empty=False) == [1, None, 3]
assert untis_split_third("1;;3", int) == [1, 3]
assert untis_split_third('"asdf";"asdf";"asdf') == ["asdf", "asdf", "asdf"]
assert len(untis_split_third(test_lesson_element_partial_partial)) == 7
assert untis_split_third(test_lesson_element_partial_partial) == [
"11",
"12",
"13",
"14",
"15",
"16",
"17",
]
from django.urls import path
from . import views
urlpatterns = [
path("import/xml/", views.xml_import, name="untis_xml_import"),
]
import logging
from enum import Enum
from django.db.models import Count
from django.utils.translation import gettext as _
from reversion import create_revision, set_comment
from tqdm import tqdm
from aleksis.apps.chronos import models as chronos_models
from aleksis.apps.chronos.models import ValidityRange
from aleksis.apps.chronos.models import LessonEvent
from aleksis.apps.kolego import models as kolego_models
from aleksis.apps.lesrooster.models import ValidityRange
from aleksis.core.models import Group
from .... import models as mysql_models
from ..util import (
......@@ -15,10 +21,11 @@ from ..util import (
move_weekday_to_range,
run_default_filter,
untis_date_to_date,
update_or_create_lesson_event,
)
logger = logging.getLogger(__name__)
unknown_reason, _ = chronos_models.AbsenceReason.objects.get_or_create(short_name="?")
unknown_reason, __ = kolego_models.AbsenceReason.objects.get_or_create(short_name="?")
class AbsenceType(Enum):
......@@ -30,7 +37,7 @@ class AbsenceType(Enum):
def import_absences(
validity_range: ValidityRange,
absence_reasons_ref,
time_periods_ref,
slots_ref,
teachers_ref,
classes_ref,
rooms_ref,
......@@ -40,6 +47,8 @@ def import_absences(
untis_term_start = date_to_untis_date(validity_range.date_start)
untis_term_end = date_to_untis_date(validity_range.date_end)
created_substitutions = []
# Get absences
absences = (
run_default_filter(validity_range, mysql_models.Absence.objects, filter_term=False)
......@@ -51,7 +60,7 @@ def import_absences(
for absence in tqdm(absences, desc="Import absences", **TQDM_DEFAULTS):
import_ref = absence.absence_id
logger.info("Import absence {}".format(import_ref))
logger.info(f"Import absence {import_ref}")
if absence.absence_reason_id == 0:
reason = unknown_reason
......@@ -68,86 +77,112 @@ def import_absences(
weekday_to = date_to.weekday()
# Check min/max weekdays
weekday_from = move_weekday_to_range(time_periods_ref, weekday_from)
weekday_to = move_weekday_to_range(time_periods_ref, weekday_to)
weekday_from = move_weekday_to_range(slots_ref, weekday_from)
weekday_to = move_weekday_to_range(slots_ref, weekday_to)
# Check min/max periods
first_period = get_first_period(time_periods_ref, weekday_from)
last_period = get_last_period(time_periods_ref, weekday_from)
first_period = get_first_period(slots_ref, weekday_from)
last_period = get_last_period(slots_ref, weekday_from)
if period_from == 0:
period_from = first_period
if period_to == 0:
period_to = last_period
time_period_from = time_periods_ref[weekday_from][period_from]
time_period_to = time_periods_ref[weekday_to][period_to]
comment = absence.text
slot_from = slots_ref[weekday_from][period_from]
slot_to = slots_ref[weekday_to][period_to]
comment = absence.text if absence.text else ""
datetime_start = slot_from.get_datetime_start(date_from)
datetime_end = slot_to.get_datetime_end(date_to)
group = None
teacher = None
room = None
if type_ == AbsenceType.GROUP.value:
group = classes_ref[absence.ida]
group = classes_ref[absence.ida] # noqa
elif type_ == AbsenceType.TEACHER.value:
teacher = teachers_ref[absence.ida]
elif type_ == AbsenceType.ROOM.value:
room = rooms_ref[absence.ida]
new_absence, created = chronos_models.Absence.objects.get_or_create(
import_ref_untis=import_ref,
defaults={
"reason": reason,
"group": group,
"teacher": teacher,
"room": room,
"date_start": date_from,
"date_end": date_to,
"period_from": time_period_from,
"period_to": time_period_to,
"comment": absence.text,
"school_term": validity_range.school_term,
},
)
if created:
logger.info(" New absence created")
if (
new_absence.reason != reason
or new_absence.group != group
or new_absence.teacher != teacher
or new_absence.room != room
or new_absence.date_start != date_from
or new_absence.date_end != date_to
or new_absence.period_from != time_period_from
or new_absence.period_to != time_period_to
or new_absence.comment != comment
or new_absence.school_term != validity_range.school_term
):
room = rooms_ref[absence.ida] # noqa
if teacher:
try:
created = False
new_absence = kolego_models.Absence.objects.get(
extended_data__import_ref_untis=import_ref
)
except kolego_models.Absence.DoesNotExist:
created = True
new_absence = kolego_models.Absence()
new_absence.reason = reason
new_absence.group = group
new_absence.teacher = teacher
new_absence.room = room
new_absence.date_start = date_from
new_absence.date_end = date_to
new_absence.period_from = time_period_from
new_absence.period_to = time_period_to
new_absence.person = teacher
new_absence.datetime_start = datetime_start
new_absence.datetime_end = datetime_end
new_absence.comment = comment
new_absence.school_term = validity_range.school_term
new_absence.save()
logger.info(" Absence updated")
existing_absences.append(import_ref)
ref[import_ref] = new_absence
new_absence.extended_data["import_ref_untis"] = import_ref
new_absence.save(skip_overlap_handling=True)
if created:
logger.info(" New absence created")
existing_absences.append(import_ref)
ref[import_ref] = new_absence
# Cancel all lessons of this group
if group:
# Search parent groups with match
group_qs = Group.objects.annotate(parent_groups_count=Count("parent_groups")).filter(
parent_groups_count=1, parent_groups=group
)
# Search lesson events with exact groups or parent groups match
qs = LessonEvent.objects.filter(
amends__isnull=True, extended_data__event_untis__isnull=True
).filter(
pk__in=LessonEvent.objects.annotate(groups_count=Count("groups"))
.filter(groups_count=1, groups=group)
.values_list("id", flat=True)
.union(LessonEvent.objects.filter(groups__in=group_qs).values_list("id", flat=True))
)
affected_events = LessonEvent.get_single_events(
datetime_start, datetime_end, request=None, with_reference_object=True, queryset=qs
)
for affected_event in affected_events:
ref_object = affected_event["REFERENCE_OBJECT"]
amending_event = update_or_create_lesson_event(
amends=ref_object,
datetime_start=affected_event["DTSTART"].dt,
datetime_end=affected_event["DTEND"].dt,
defaults=dict(
cancelled=True,
current_change=True,
slot_number_start=ref_object.slot_number_start,
slot_number_end=ref_object.slot_number_end,
),
)
amending_event.extended_data["cancelled_by_absence_untis"] = import_ref
amending_event.save()
logging.info(
f" Cancel lesson event {ref_object.id} from {affected_event['DTSTART'].dt} "
f"to {affected_event['DTEND'].dt} with event {amending_event.id}"
)
# Delete all no longer existing absences
for a in chronos_models.Absence.objects.filter(
date_start__lte=validity_range.date_end, date_end__gte=validity_range.date_start
for a in kolego_models.Absence.objects.filter(
extended_data__import_ref_untis__isnull=False,
datetime_start__date__lte=validity_range.date_end,
datetime_end__date__gte=validity_range.date_start,
):
if a.import_ref_untis and a.import_ref_untis not in existing_absences:
logger.info("Absence {} deleted".format(a.id))
a.delete()
if a.extended_data["import_ref_untis"] not in existing_absences:
logger.info(f"Absence {a.id} deleted")
with create_revision():
set_comment(_("Deleted by Untis import"))
a.delete()
return ref
return ref, created_substitutions