Source code for vulyk.models.task_types

# -*- coding: utf-8 -*-
"""Module contains all models related to task type (plugin root) entity."""

import logging
import random
from datetime import datetime
from hashlib import sha1
from typing import Any, AnyStr, Dict, Generator, List, Optional, Tuple, Union

import orjson as json
from bson import ObjectId
from mongoengine import Q
from mongoengine.errors import (InvalidQueryError, LookUpError, NotUniqueError,
                                OperationError, ValidationError)

from vulyk.ext.leaderboard import LeaderBoardManager
from vulyk.ext.worksession import WorkSessionManager
from vulyk.models.exc import (TaskImportError, TaskNotFoundError,
                              TaskSaveError, TaskSkipError,
                              TaskValidationError)
from vulyk.models.stats import WorkSession
from vulyk.models.tasks import AbstractAnswer, AbstractTask, Batch
from vulyk.models.user import User
from vulyk.utils import get_tb

__all__ = [
    'AbstractTaskType'
]


[docs]class AbstractTaskType: """ The main entity in the application. Contains all the logic we need to handle task emission/accounting. Could be overridden in plugins to fit your needs. The simplest and most common scenario of being overridden is to have own task type name and description to separate your tasks from any other. """ # models answer_model = None task_model = None template = '' helptext_template = '' type_name = '' redundancy = 3 JS_ASSETS = [] CSS_ASSETS = [] # properties _name = '' _description = '' # Meta information on project to copy to task batches _task_type_meta = {} # managers _work_session_manager = None # type: WorkSessionManager _leaderboard_manager = None # type: LeaderBoardManager def __init__(self, settings: Dict[str, Any]) -> None: """ Constructor. :param settings: We pass global settings dictionary into the constructor when instantiating plugins. Could be useful for plugins. :type settings: Dict[str, Any] """ self._logger = logging.getLogger('vulyk.app') self._leaderboard_manager = \ self._leaderboard_manager or LeaderBoardManager(self.type_name, self.answer_model, User) self._work_session_manager = \ self._work_session_manager or WorkSessionManager(WorkSession) assert issubclass(self.task_model, AbstractTask), \ 'You should define task_model property' assert issubclass(self.answer_model, AbstractAnswer), \ 'You should define answer_model property' assert isinstance(self._work_session_manager, WorkSessionManager), \ 'You should define _work_session_manager property' assert isinstance(self._leaderboard_manager, LeaderBoardManager), \ 'You should define _leaderboard_manager property' assert self.type_name, 'You should define type_name (underscore)' assert self.template, 'You should define template' assert isinstance(self._task_type_meta, dict), \ 'Batch meta must of dict type' @property def name(self) -> str: """ Human-readable name of the plugin. :return: Name of the task type. :rtype: str """ return self._name if len(self._name) > 0 else self.type_name @property def task_type_meta(self) -> Dict[str, Any]: """ Dict with task type metadata (freeform dict) :return: project specific metadata :rtype: Dict[str, Any] """ return self._task_type_meta @property def description(self) -> str: """ Explicit description of the plugin. :return: Plugin description. :rtype: str """ return self._description if len(self._description) > 0 else '' @property def work_session_manager(self) -> WorkSessionManager: """ Returns current instance of WorkSessionManager used in the task type. :return: Active WorkSessionManager instance. :rtype: WorkSessionManager """ return self._work_session_manager
[docs] def import_tasks( self, tasks: List[Dict], batch: Optional[AnyStr] ) -> None: """Imports tasks from an iterable over dicts io is left out of scope here. :param tasks: An iterable over dicts :type tasks: List[Dict] :param batch: Batch ID (optional) :type batch: Optional[AnyStr] :raise: TaskImportError """ errors = (AttributeError, TypeError, ValidationError, OperationError, AssertionError) bulk = [] try: for task in tasks: assert isinstance(task, dict) bulk.append(self.task_model( id=sha1(json.dumps(task)).hexdigest()[:20], batch=batch, task_type=self.type_name, task_data=task)) self.task_model.objects.insert(bulk) self._logger.debug('Inserted %s tasks in batch %s for plugin <%s>', len(bulk), batch, self.name) except errors as e: raise TaskImportError('Can\'t load task: {}'.format(e))
[docs] def export_reports( self, batch: str, closed: bool = True, qs=None ) -> Generator[Dict[str, Any], None, None]: """Exports results. IO is left out of scope here as well :param batch: Certain batch to extract :type batch: str :param closed: Specify if we need to export only closed tasks reports :type closed: bool :param qs: Queryset, an optional argument. Default value is QS that exports all tasks with amount of answers > redundancy :type qs: QuerySet :returns: Generator of lists of dicts with results :rtype: Generator[Dict[str, Any], None, None] """ if qs is None: query = Q() if batch != '__all__': query &= Q(batch=batch) if closed: query &= Q(closed=closed) qs = self.task_model.objects(query) for task in qs: yield list(map(lambda a: a.as_dict(), self.answer_model.objects(task=task)))
[docs] def get_leaders(self) -> List[Tuple[ObjectId, int]]: """Return sorted list of tuples (user_id, tasks_done) :returns: list of tuples (user_id, tasks_done) :rtype: List[Tuple[ObjectId, int]] """ return self._leaderboard_manager.get_leaders()
[docs] def get_leaderboard(self, limit: int = 10) -> List[Dict]: """Find users who contributed the most :param limit: number of top users to return :type limit: int :returns: List of dicts {user: user_obj, freq: count} :rtype: List[Dict] """ return self._leaderboard_manager.get_leaderboard(limit)
[docs] def get_next(self, user: User) -> Dict: """ Finds given user a new task and starts new WorkSession :param user: an instance of User model :type user: User :returns: Prepared dictionary of model, or empty dictionary :rtype: Dict """ task = self._get_next_task(user) if task is not None: # Not sure if we should do that here on GET requests self._work_session_manager.start_work_session(task, user.id) self._logger.debug('Assigned task %s to user %s', task.id, user.id) return task.as_dict() else: self._logger.debug('No suitable task found for user %s', user.id) return {}
def _get_next_task(self, user: User) -> Optional[AbstractTask]: """ Finds given user a new task :param user: an instance of User model :type user: User :returns: Model instance or None :rtype: Optional[AbstractTask] """ rs = None base_q = Q(task_type=self.type_name) \ & Q(users_processed__nin=[user]) \ & Q(closed__ne=True) for batch in Batch \ .objects(task_type=self.type_name, closed__ne=True) \ .order_by('id'): if batch.tasks_count == batch.tasks_processed: continue rs = self.task_model.objects( base_q & Q(users_skipped__nin=[user]) & Q(batch=batch.id)) if rs.count() == 0: del rs rs = self.task_model.objects(base_q & Q(batch=batch.id)) if rs.count() > 0: break else: # Now searching w/o batch restriction rs = self.task_model.objects( base_q & Q(users_skipped__nin=[user])) if rs.count() == 0: del rs rs = self.task_model.objects(base_q) if rs: _id = random.choice(rs.distinct('id') or []) try: return rs.get(id=_id) except self.task_model.DoesNotExist: self._logger.error( 'DoesNotExist when trying to fetch task {}'.format(_id)) return None else: return None
[docs] def record_activity( self, user_id: Union[AnyStr, ObjectId], task_id: AnyStr, seconds: int ) -> None: """ Increases the counter of activity for current user in given task. :param user_id: ID of user, who gets new task :type user_id: Union[AnyStr, ObjectId] :param task_id: Current task :type task_id: AnyStr :param seconds: User was active for :type seconds: int :raises: TaskSkipError, TaskNotFoundError """ try: task = self.task_model.objects.get(id=task_id, task_type=self.type_name) self._work_session_manager.record_activity(task, user_id, seconds) self._logger.debug('Recording %s seconds of activity of user %s ' 'on task %s', seconds, user_id, task_id) except self.task_model.DoesNotExist: raise TaskNotFoundError()
[docs] def skip_task(self, task_id: AnyStr, user: User): """ Marks given task as a skipped by a given user Assumes that user is eligible for this kind of tasks :param task_id: Given task ID :type task_id: AnyStr :param user: an instance of User model who provided an answer :type user: User :raises: TaskSkipError, TaskNotFoundError """ try: task = self.task_model.objects.get( id=task_id, task_type=self.type_name) task.update(add_to_set__users_skipped=user) self._work_session_manager.delete_work_session(task, user.id) self._logger.debug('User %s skipped the task %s', user.id, task_id) except self.task_model.DoesNotExist: raise TaskNotFoundError() except OperationError as err: raise TaskSkipError('Can not skip the task: {0}.'.format(err))
[docs] def on_task_done( self, user: User, task_id: AnyStr, result: Dict[str, Any] ) -> None: """ Saves user's answers for a given task. Assumes that user is eligible for this kind of tasks. :param task_id: Given task ID :type task_id: AnyStr :param user: an instance of User model who provided an answer :type user: User :param result: Task solving result :type result: Dict[str, Any] :raises: TaskSaveError - in case of general problems :raises: TaskValidationError - in case of validation problems """ answer = None try: task = self.task_model.objects.get( id=task_id, task_type=self.type_name) except self.task_model.DoesNotExist: raise TaskNotFoundError('Task with ID {id} not found while ' 'trying to save an answer from {user!r}.' .format(id=task_id, user=user)) try: answer = self.answer_model.objects.create( task=task, created_by=user, created_at=datetime.now(), task_type=self.type_name, result=result) # update task closed = self._update_task_on_answer(task, answer, user) # update user user.update(inc__processed=1) # update stats record self._work_session_manager.end_work_session(task, user.id, answer) self._logger.debug('User %s has done task %s', user.id, task_id) if closed and task.batch is not None: Batch.task_done_in(batch_id=task.batch.id) except NotUniqueError: raise TaskValidationError('Attempt to save over the existing ' 'answer for task {id} by user {user!r}' .format(id=task_id, user=user)) except ValidationError as err: raise TaskValidationError(err, get_tb()) except (OperationError, LookUpError, InvalidQueryError) as err: raise TaskSaveError(err, get_tb())
def _is_ready_for_autoclose( self, task: AbstractTask, answer: AbstractAnswer ) -> bool: """ Checks if task could be closed before it Should be overridden if you need more complex logic. :param task: an instance of self.task_model model :type task: AbstractTask :param answer: Task solving result :type answer: AbstractAnswer :returns: How many identical answers we got :rtype: bool """ return False def _update_task_on_answer( self, task: AbstractTask, answer: AbstractAnswer, user: User ) -> bool: """ Sets flag 'closed' to True if task's goal has been reached :param task: an instance of self.task_model model :type task: AbstractTask :param answer: Task solving result :type answer: AbstractAnswer :param user: an instance of User model who provided an answer :type user: User :rtype: bool """ users_count = task.users_count + 1 update_q = { 'inc__users_count': 1, 'add_to_set__users_processed': user} closed = self._is_ready_for_autoclose(task, answer) \ or (users_count >= self.redundancy) if closed: update_q['set__closed'] = closed num_changed = self.task_model \ .objects(id=task.id, closed=False) \ .update_one(**update_q) # type: int if closed and num_changed == 0: update_q.pop('set__closed', None) closed = False task.update(**update_q) return closed
[docs] def to_dict(self) -> Dict[str, Any]: """ Prepare simplified dict that contains basic info about the task type. :return: distilled dict with basic info :rtype: Dict[str, Any] """ closed_tasks = self.task_model \ .objects(closed=True) \ .count() # type: int tasks = self.task_model.objects().count() # type: int open_tasks = tasks - closed_tasks # type: int return { 'name': self.name, 'description': self.description, 'type': self.type_name, 'tasks': tasks, 'open_tasks': open_tasks, 'closed_tasks': closed_tasks, 'has_tasks': open_tasks > 0 }