# -*- coding: utf-8 -*-
"""Module contains all models directly related to the main entity - tasks."""
from collections import namedtuple
from typing import Any, Dict, List
from bson import ObjectId
from flask_mongoengine import Document
from mongoengine import (CASCADE, BooleanField, DateTimeField, DictField,
IntField, ListField, ReferenceField, StringField)
from vulyk.models.user import User
from vulyk.signals import on_batch_done
__all__ = [
'AbstractAnswer',
'AbstractTask',
'Batch',
'BatchUpdateResult'
]
BatchUpdateResult = namedtuple('BatchUpdateResult', ['success', 'closed'])
[docs]class Batch(Document):
"""
Helper category to group tasks.
"""
id = StringField(max_length=50, primary_key=True)
task_type = StringField(max_length=50, required=True, db_field='taskType')
tasks_count = IntField(default=0, required=True, db_field='tasksCount')
tasks_processed = IntField(default=0, db_field='tasksProcessed')
closed = BooleanField(default=False, required=False)
batch_meta = DictField(db_field='batchMeta')
meta = {
'collection': 'batches',
'allow_inheritance': True,
'indexes': [
'task_type',
'closed'
]
}
[docs] @classmethod
def task_done_in(cls, batch_id: str) -> BatchUpdateResult:
"""
Increment needed values upon a task from the batch is done. In case if
all tasks are finished – close the batch.
:param batch_id: Batch ID
:type batch_id: str
:return: Aggregate which represents complex effect of the method
:rtype: BatchUpdateResult
"""
num_changed = 0
batch = cls.objects.get(id=batch_id) # type: Batch
processed = batch.tasks_processed + 1
if processed > batch.tasks_count:
return BatchUpdateResult(success=False, closed=False)
closed = processed == batch.tasks_count
update_q = {'inc__tasks_processed': 1}
if closed:
update_q['set__closed'] = closed
num_changed = cls \
.objects(id=batch.id, closed=False) \
.update(**update_q)
if num_changed == 0:
update_q.pop('set__closed', None)
closed = False
num_changed = batch.update(**update_q)
elif closed:
on_batch_done.send(batch)
return BatchUpdateResult(success=num_changed > 0, closed=closed)
def __str__(self) -> str:
return str(self.id)
def __repr__(self) -> str:
return 'Batch [{id}] ({processed}/{count})'.format(
id=self.id,
processed=self.tasks_processed,
count=self.tasks_count)
[docs]class AbstractTask(Document):
"""
This is AbstractTask model.
You need to inherit it in your model
"""
id = StringField(max_length=200, default='', primary_key=True)
task_type = StringField(max_length=50, required=True, db_field='taskType')
batch = ReferenceField(Batch, reverse_delete_rule=CASCADE)
users_count = IntField(default=0, db_field='usersCount')
users_processed = ListField(ReferenceField(User),
db_field='usersProcessed')
users_skipped = ListField(ReferenceField(User), db_field='usersSkipped')
closed = BooleanField(default=False)
task_data = DictField(required=True)
meta = {
'collection': 'tasks',
'allow_inheritance': True,
'indexes': [
'task_type',
'batch'
]
}
[docs] def as_dict(self) -> Dict[str, Any]:
"""
Converts the model-instance into a safe and lightweight dictionary.
:rtype: Dict[str, Any]
"""
return {
'id': self.id,
'closed': self.closed,
'data': self.task_data
}
[docs] @classmethod
def ids_in_batch(cls, batch: Batch) -> List[str]:
"""
Collects IDs of all tasks that belong to certain batch.
:param batch: Batch instance
:type batch: Batch
:return: List of IDs
:rtype: List[str]
"""
return cls.objects(batch=batch).distinct('id')
def __str__(self) -> str:
return str(self.id)
def __repr__(self) -> str:
return str(self)
[docs]class AbstractAnswer(Document):
"""
This is AbstractTask model.
You need to inherit it in your model
"""
task = ReferenceField(AbstractTask, reverse_delete_rule=CASCADE)
created_by = ReferenceField(User, reverse_delete_rule=CASCADE,
db_field='createdBy')
created_at = DateTimeField(db_field='createdAt')
task_type = StringField(max_length=50, required=True, db_field='taskType')
# not sure - could be extended
result = DictField()
meta = {
'collection': 'reports',
'allow_inheritance': True,
'indexes': [
'task',
'created_by',
'created_at',
{
'fields': ['created_by', 'task'],
'unique': True
}
]
}
# TODO: decide, if we need it at all
@property
def corrections(self) -> int:
"""
Returns whole amount of actions/corrections given by user in this
particular answer.
:return: Count of corrections in this answer
:rtype: int
"""
return 1
@corrections.setter
def corrections(self, value: int) -> None:
pass
@corrections.deleter
def corrections(self) -> None:
pass
[docs] @classmethod
def answers_numbers_by_tasks(cls, task_ids: List[str]) -> Dict[ObjectId, int]:
"""
Groups answers, filtered by tasks they belong to, by user and count
number of answers for every user.
:param task_ids: List of tasks IDs
:type task_ids: List[str]
:return: Map having user IDs as keys and answers numbers as values
:rtype: Dict[ObjectId, int]
"""
return cls.objects(task__in=task_ids).item_frequencies('created_by')
[docs] def as_dict(self) -> Dict[str, Dict]:
"""
Converts the model-instance into a safe that will include also task
and user.
:rtype: Dict[str, Dict]
"""
return {
'task': self.task.as_dict(),
'answer': self.result,
'user': self.created_by.as_dict()
}
def __str__(self) -> str:
return str(self.pk)
def __repr__(self) -> str:
return 'Report [{} by {}]'.format(self.created_by, self.task)