Source code for docs.ag_master.admin

import base64
import time
import pytz

from google.cloud import storage
from typing import List
from flask import request, render_template, abort
from datetime import datetime

from common.rpc.ag_master import create_assignment, trigger_jobs, upload_zip
from common.rpc.auth import get_endpoint
from common.secrets import new_secret
from common.oauth_client import get_user

from models import Assignment, db, Job
from utils import BUCKET, admin_only

TZ = pytz.timezone("America/Los_Angeles")


[docs]def create_admin_endpoints(app): """Creates various RPC endpoints to manage assignment grading. See the following: - :func:`~common.rpc.ag_master.upload_zip` - :func:`~common.rpc.ag_master.create_assignment` """ @upload_zip.bind(app) @admin_only def upload_zip_rpc(course, name, file): file = base64.b64decode(file.encode("ascii")) bucket = storage.Client().get_bucket(BUCKET) blob = bucket.blob(f"zips/{get_endpoint(course=course)}/{name}") blob.upload_from_string(file, content_type="application/zip") @create_assignment.bind(app) @admin_only def create_assignment_rpc(course, name, file, command, batch_size, grading_base): assignment: Assignment = Assignment.query.filter_by( name=name, course=course, endpoint=get_endpoint(course=course) ).one_or_none() if not assignment: assignment = Assignment( name=name, assignment_secret=new_secret(), course=course, endpoint=get_endpoint(course=course), ) db.session.add(assignment) assignment.file = file assignment.command = command assignment.last_modified = int(time.time()) assignment.batch_size = batch_size assignment.grading_base = grading_base db.session.commit() return assignment.assignment_secret @app.template_filter("dt") def format_dt(value, format="%b %d, %Y at %I:%M %p %Z"): if value is None: return "no date provided" return datetime.fromtimestamp(int(value), TZ).strftime(format) @app.route("/<course>") @admin_only def get_assignments(course): endpoint = get_endpoint(course=course) assignments: List[Assignment] = Assignment.query.filter( Assignment.endpoint == endpoint ).all() return render_template( "assignments.html", course=course, assignments=sorted(assignments, key=lambda a: -a.last_modified), ) @app.route("/<course>/<assignment>/fail_unfinished", methods=["POST"]) @admin_only def fail_unfinished_jobs(course, assignment): endpoint = get_endpoint(course=course) jobs = ( Job.query.join(Assignment) .filter(Assignment.endpoint == endpoint) .filter(Assignment.name == assignment) .filter(Job.status.in_(("queued", "running"))) .all() ) count = Job.query.filter( Job.job_secret.in_([job.job_secret for job in jobs]) ).update( { Job.status: "failed", Job.finished_at: int(time.time()), Job.result: f"Marked as failed by {get_user()['email']}.", }, synchronize_session="fetch", ) db.session.commit() return dict(modified=count) @app.route("/<course>/<assignment>/retrigger_unsuccessful", methods=["POST"]) @admin_only def retrigger_unsuccessful_jobs(course, assignment): endpoint = get_endpoint(course=course) assignment = Assignment.query.filter_by( name=assignment, endpoint=endpoint ).one() jobs = ( Job.query.join(Assignment) .filter(Assignment.endpoint == endpoint) .filter(Assignment.name == assignment.name) .filter(Job.status != "finished") .all() ) for job in jobs: job.status = "queued" db.session.commit() trigger_jobs( assignment_id=assignment.assignment_secret, jobs=[job.job_secret for job in jobs if job.status == "queued"], ) return dict(modified=len(jobs)) @app.route("/<course>/<assign>") @admin_only def get_jobs(course, assign): endpoint = get_endpoint(course=course) jobs: List[Job] = ( Job.query.join(Assignment) .filter(Assignment.endpoint == endpoint) .filter(Assignment.name == assign) ) assign = ( Assignment.query.filter(Assignment.name == assign) .filter(Assignment.endpoint == endpoint) .one_or_none() ) if not assign: abort(404, "Assignment not found.") queued_at = request.args.get("queued_at", 0) if queued_at: jobs = jobs.filter(Job.queued_at == queued_at) status = request.args.get("status", "all") if status != "all": jobs = jobs.filter(Job.status == status) jobs = jobs.all() batches = {} for job in jobs: if str(job.queued_at) not in batches: batches[str(job.queued_at)] = { "jobs": [], "finished": 0, "failed": 0, "running": 0, "queued": 0, "completed": 0, "total": 0, } batch = batches[str(job.queued_at)] details = { "started_at": job.started_at, "finished_at": job.finished_at, "backup": job.backup, "status": job.status, "result": job.result, "id": job.external_job_id, } if details["finished_at"]: if details["status"] == "finished": batch["finished"] += 1 details["duration"] = details["finished_at"] - details["started_at"] else: batch["failed"] += 1 batch["completed"] += 1 elif details["started_at"]: batch["running"] += 1 else: batch["queued"] += 1 batch["total"] += 1 batch["progress"] = batch["completed"] / batch["total"] batch["jobs"].append(details) return render_template( "assignment.html", course=course, assign=assign, batches={k: batches[k] for k in sorted(batches, reverse=True)}, queued_at=str(queued_at), status=status, ) @app.route("/<course>/job/<id>") @admin_only def job_details(course, id): endpoint = get_endpoint(course=course) job: Job = ( Job.query.join(Assignment) .filter(Assignment.endpoint == endpoint) .filter(Job.external_job_id == id) .one_or_none() ) if not job: abort(404, "Job not found.") return render_template( "job.html", course=course, backup=job.backup, status=job.status, result=job.result, start=job.started_at, finish=job.finished_at, )