Skip to content
Snippets Groups Projects
Verified Commit c7b8e92b authored by Jonathan Weth's avatar Jonathan Weth :keyboard:
Browse files

Separate import commands from tasks and use them in management commands

parent ec88618c
No related branches found
No related tags found
1 merge request!91Resolve "Management commands are not run in the foreground"
from aleksis.apps.untis.util.mysql.importers.terms import (
get_future_terms_for_date,
get_terms_for_date,
)
from .util.mysql.main import untis_import_mysql as _untis_import_mysql
def import_mysql_current_term():
"""Import UNTIS data from MySQL (current term)."""
terms = get_terms_for_date()
_untis_import_mysql(terms)
def import_mysql_future_terms():
"""Import UNTIS data from MySQL (all future terms)."""
terms = get_future_terms_for_date()
_untis_import_mysql(terms)
def import_mysql_all_terms():
"""Import UNTIS data from MySQL (all terms in DB)."""
_untis_import_mysql()
def import_mysql_current_next_term():
"""Import UNTIS data from MySQL (current and next term)."""
terms = get_terms_for_date()
future_terms = get_future_terms_for_date()
if future_terms.exists():
terms = terms.union(future_terms[0:1])
_untis_import_mysql(terms)
def import_mysql_current_future_terms():
"""Import UNTIS data from MySQL (current and future terms)."""
terms = get_terms_for_date()
future_terms = get_future_terms_for_date()
terms = terms.union(future_terms)
_untis_import_mysql(terms)
from django.core.management.base import BaseCommand
from ...tasks import untis_import_mysql_current_term
from ...commands import import_mysql_current_term
class Command(BaseCommand):
def handle(self, *args, **options):
untis_import_mysql_current_term.delay()
import_mysql_current_term()
from django.core.management.base import BaseCommand
from ...tasks import untis_import_mysql_all_terms
from ...commands import import_mysql_all_terms
class Command(BaseCommand):
def handle(self, *args, **options):
untis_import_mysql_all_terms.delay()
import_mysql_all_terms()
from django.core.management.base import BaseCommand
from ...tasks import untis_import_mysql_current_future_terms
from ...commands import import_mysql_current_future_terms
class Command(BaseCommand):
def handle(self, *args, **options):
untis_import_mysql_current_future_terms.delay()
import_mysql_current_future_terms()
from django.core.management.base import BaseCommand
from ...tasks import untis_import_mysql_current_next_term
from ...commands import import_mysql_current_next_term
class Command(BaseCommand):
def handle(self, *args, **options):
untis_import_mysql_current_next_term.delay()
import_mysql_current_next_term()
from django.core.management.base import BaseCommand
from ...tasks import untis_import_mysql_future_terms
from ...commands import import_mysql_future_terms
class Command(BaseCommand):
def handle(self, *args, **options):
untis_import_mysql_future_terms.delay()
import_mysql_future_terms()
from aleksis.apps.untis.util.mysql.importers.terms import (
get_future_terms_for_date,
get_terms_for_date,
)
from aleksis.core.celery import app
from .util.mysql.main import untis_import_mysql as _untis_import_mysql
from .commands import (
import_mysql_all_terms,
import_mysql_current_future_terms,
import_mysql_current_next_term,
import_mysql_current_term,
import_mysql_future_terms,
)
@app.task
def untis_import_mysql_current_term():
"""Celery task for import of UNTIS data from MySQL (current term)."""
terms = get_terms_for_date()
_untis_import_mysql(terms)
import_mysql_current_term()
@app.task
def untis_import_mysql_future_terms():
"""Celery task for import of UNTIS data from MySQL (all future terms)."""
terms = get_future_terms_for_date()
_untis_import_mysql(terms)
import_mysql_future_terms()
@app.task
def untis_import_mysql_all_terms():
"""Celery task for import of UNTIS data from MySQL (all terms in DB)."""
_untis_import_mysql()
import_mysql_all_terms()
@app.task
def untis_import_mysql_current_next_term():
"""Celery task for import of UNTIS data from MySQL (current and next term)."""
terms = get_terms_for_date()
future_terms = get_future_terms_for_date()
if future_terms.exists():
terms = terms.union(future_terms[0:1])
_untis_import_mysql(terms)
import_mysql_current_next_term()
@app.task
def untis_import_mysql_current_future_terms():
"""Celery task for import of UNTIS data from MySQL (current and future terms)."""
terms = get_terms_for_date()
future_terms = get_future_terms_for_date()
terms = terms.union(future_terms)
_untis_import_mysql(terms)
import_mysql_current_future_terms()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment