Skip to content

api module reference

AuthorizedTasks

Stores the function object of functions decorated by the @authorized_task decorator at self.list.

Source code in api/resources/auth.py
class AuthorizedTasks:
    """
    Stores the function object of functions decorated by the @authorized_task decorator at self.list.
    """
    def __init__(self):
        self.list = {}
        self.auth_data = {}
    def register_task(self, func):
        """
        Registers a function in the AuthorizedTasks.list dict.

        Args:
            func (function): the function to be registered.
        """
        pure_name = func.name.replace('tasks.packs.', '')
        self.list[pure_name]={"func": func}
        self.auth_data[pure_name]={"func": func}

register_task(func)

Registers a function in the AuthorizedTasks.list dict.

Parameters:

Name Type Description Default
func function

the function to be registered.

required
Source code in api/resources/auth.py
def register_task(self, func):
    """
    Registers a function in the AuthorizedTasks.list dict.

    Args:
        func (function): the function to be registered.
    """
    pure_name = func.name.replace('tasks.packs.', '')
    self.list[pure_name]={"func": func}
    self.auth_data[pure_name]={"func": func}

authorized_task(task_func)

Decorator for functions which require authentication by JWT. When the @authorized_task is placed, it will register the decorated function in the AuthorizedTasks function list.

Parameters:

Name Type Description Default
task_func function

the decorated function

required

Returns:

Name Type Description
wrapper

a function that returns the original task_func function.

Source code in api/resources/auth.py
def authorized_task(task_func):
    """
    Decorator for functions which require authentication by JWT.
    When the @authorized_task is placed, it will register the decorated function in the AuthorizedTasks function list.

    Args:
        task_func (function): the decorated function

    Returns:
        wrapper: a function that returns the original task_func function.
    """
    pure_name = task_func.name.replace('tasks.packs.', '')
    if task_func not in authorizedTasks.list:
        logger.info(f"[auth] task '{pure_name}' registered as authorized")
        authorizedTasks.register_task(task_func)
    def wrapper(*args, **kwargs):
        return task_func(*args, **kwargs)
    return wrapper

generate_new_jwt(payload)

Generates a new JWT token containing a payload.

Parameters:

Name Type Description Default
payload dict

A dictionary with the data that will be placed inside the JWT token.

required

Returns:

Name Type Description
jwt_token str

the generated JWT token

Source code in api/resources/auth.py
def generate_new_jwt(payload: dict):
    """
    Generates a new JWT token containing a payload.

    Args:
        payload (dict): A dictionary with the data that will be placed inside the JWT token.
    Returns:
        jwt_token (str): the generated JWT token
    """
    if use_jwt_expire:
        if 'exp' not in payload:
            max_timestamp = str(calendar.timegm(datetime.now(tz=timezone.utc).timetuple())+jwt_expire_time_sec)
            payload['exp'] = max_timestamp
            return(jwt.encode(payload, jwt_private_pem, algorithm=jwt_algorithm), payload['exp'])
        else:
            return(jwt.encode(payload, jwt_private_pem, algorithm=jwt_algorithm), str(payload['exp']))
    else:
        return(jwt.encode(payload, jwt_private_pem, algorithm=jwt_algorithm), 'infinity')

get_user_permissions(user, password)

Gets user allowed task names by credentials passed. It's possible to customize this function for the needs of your project. We discourage to use hardcoded values for credentials or permissions here. It is strongly recommended to hash your credentials with bcrypt and store them in a database. By default, all users have test.main.myProtectedTask as an allowed task.

Parameters:

Name Type Description Default
user str

user identifier credential

required
password str

password credential

required

Returns:

Name Type Description
payload dict

dictionary with user's allowed tasks.

Source code in api/resources/auth.py
def get_user_permissions(user:str, password:str):
    """
    Gets user allowed task names by credentials passed.
    It's possible to customize this function for the needs of your project.
    We discourage to use hardcoded values for credentials or permissions here.
    It is strongly recommended to hash your credentials with bcrypt and store them in a database.
    By default, all users have test.main.myProtectedTask as an allowed task.

    Args:
        user (str): user identifier credential
        password (str): password credential

    Returns:
        payload (dict): dictionary with user's allowed tasks.
    """
    # You can implement here an algorithm to get a dictionary from database based on credentials.
    payload = {
        "allowed_tasks": {
            "test.main.myProtectedTask": True
        }
    }
    return payload

verify_credentials(user, password)

A function that verifies if the credentials are correct. It's possible to customize this function for the needs of your project. We discourage to use hardcoded values for credentials here. It is strongly recommended to hash your credentials with bcrypt and store them in a database. By default, the correct credentials are user="testuser" and password="testpass"

Parameters:

Name Type Description Default
user str

user identifier credential

required
password str

password credential

required

Returns:

Name Type Description
isVerified boolean

true if credentials are correct, otherwise, false.

Source code in api/resources/auth.py
def verify_credentials(user:str, password:str):
    """
    A function that verifies if the credentials are correct.
    It's possible to customize this function for the needs of your project.
    We discourage to use hardcoded values for credentials here.
    It is strongly recommended to hash your credentials with bcrypt and store them in a database.
    By default, the correct credentials are user="testuser" and password="testpass"

    Args:
        user (str): user identifier credential
        password (str): password credential

    Returns:
        isVerified (boolean): true if credentials are correct, otherwise, false.
    """

    # You can implement here an algorithm to verify the credentials, e.g. verifying a bcrypt hash on database
    if user == "testuser" and password == "testpass":
        return True
    else:
        return False

NewTask

Bases: Resource

Registers a new task request through POST method.

Source code in api/resources/web.py
class NewTask(Resource):
    """
    Registers a new task request through POST method.
    """
    def post(self):
        """Posts a task 

        Returns:
            taskInfo (dict): dictionary with the task id, arguments passed and current status of the task. 
        """
        parser = reqparse.RequestParser()
        parser.add_argument('type', required=True, type=str, help='You need to inform type', location='json')
        parser.add_argument('args', required=True, type=dict, help='You need to inform args dict', location='json')
        parser.add_argument('Authorization', location='headers')
        args = parser.parse_args()

        abort_if_tasktype_doesnt_exist(tasktype=args["type"])
        abort_if_doesnt_have_permission(tasktype=args["type"], auth_header=args["Authorization"])
        abort_if_task_params_are_invalid(tasktype=args["type"], given_params=args["args"])

        try:
            task_class = tasklist[args["type"]]['task']
            task = task_class.delay(**args["args"])
            return {"id": task.id, "type": args["type"], "status": task.state}
        except Exception as exc:
            return{"error": str(exc)}

post()

Posts a task

Returns:

Name Type Description
taskInfo dict

dictionary with the task id, arguments passed and current status of the task.

Source code in api/resources/web.py
def post(self):
    """Posts a task 

    Returns:
        taskInfo (dict): dictionary with the task id, arguments passed and current status of the task. 
    """
    parser = reqparse.RequestParser()
    parser.add_argument('type', required=True, type=str, help='You need to inform type', location='json')
    parser.add_argument('args', required=True, type=dict, help='You need to inform args dict', location='json')
    parser.add_argument('Authorization', location='headers')
    args = parser.parse_args()

    abort_if_tasktype_doesnt_exist(tasktype=args["type"])
    abort_if_doesnt_have_permission(tasktype=args["type"], auth_header=args["Authorization"])
    abort_if_task_params_are_invalid(tasktype=args["type"], given_params=args["args"])

    try:
        task_class = tasklist[args["type"]]['task']
        task = task_class.delay(**args["args"])
        return {"id": task.id, "type": args["type"], "status": task.state}
    except Exception as exc:
        return{"error": str(exc)}