flask-tasks-docker
This is a generic ready-to-run Flask RESTful API written in Python that can receive task requests with some arguments, run some logic with the arguments provided (using Celery and AMQP queues) and store the results on a SQL database. The results can then be retrieved from the API. The processing instances can be scaled according to your needs using docker.
Main features
- You can scale multiple workers that can proccess tasks in parallel
- You can add as many custom tasks as you want: highly customizable
- Can receive tasks from
POST
requests passing multiple parameters injson
- Can store the result of the tasks in a SQL database
- Can return the result of a task from
GET
requests passing the task ID as parameter
Current development
The project is currently on release 2.0.
Features/changes made on release 2.0:
- Project redesigned to manage the tasks using Celery + RabbitMQ + PostgreSQL
- Added authentication by JWT
- Removed Janitor, now RabbitMQ + Celery manages the completed tasks.
Upcoming features/changes on release 3.0:
- Packaging system for tasks
- Isolated environment for each task
- C/C++ wrapper for tasks written in C/C++
Getting started
-
In what applications should I use it?
You should use the
flask-tasks
when you need a server to do some computation based on received arguments and store the results of the computation for later visualization
Setting up and running
First, make sure you have docker
and docker compose
installed.
Before running, please make sure to configure the environment variables for API and Handler.
To run the API docker container, use the following command.
To run the worker container, use the following command. You will need to have uuidgen
to generate a unique worker name.
If you want to specify a custom worker name, change the WORKER_NAME
variable. Make sure that all workers have unique names.
Configuring environment variables
- API:
Please set the SQL Database URL, the AMQP backend URL, and the number of gunicorn workers and threads you want.
If you want to, you can allow CORS only for your front-end site by setting the FRONTEND_URL with the front-end URL.
You need to generate a 256 RSA keypair to use authentication based on JWT tokens.
ENV DATABASE_URL postgres://postgres:123@localhost:5432/flask_tasks_v2
ENV AMQP_URL redis://localhost:6379
ENV FRONTEND_URL *
ENV GUNICORN_WORKERS 3
ENV GUNICORN_THREADS 1
ENV JWT_PUBLIC_PEM "<Your public RSA 256 key here>"
ENV JWT_PRIVATE_PEM "<Your public RSA 256 key here>"
To generate the 256 RSA keypar, you can run the following (make sure to have openssl installed):
Then, set the environment variables with your keys.
- Handler:
Please set the SQL Database URL, the AMQP backend URL and the worker name you want. Make sure the worker name is unique.
ENV DATABASE_URL postgres://postgres:123@localhost:5432/flask_tasks_v2
ENV AMQP_URL redis://localhost:6379
ENV WORKER_NAME worker
API endpoints
/task/new
Method: POST
This endpoint will register a new task in the server. You need to pass some required arguments inside a application/json
. The json should contain a string with the task type, named type
and the task-specific required arguments as a dictionary, named args
:
Example of request (task of type "add"):
Parameters:
curl -X POST localhost:5000/task/new -H 'Content-Type: application/json' -d '{"type":"add", "args":{"x": 1, "y": 2}}'
Example of response:
{
"id": "5861c3a8-fa0f-4b84-9e54-04b545408114",
"result": {},
"args": {"x": 1, "y": 2},
"status": "PENDING",
"type": "add",
}
If the task type passed doesn't match the name of any function declared in the api/tasks.py
file, then it will return an error:
If the task exists but you didn't pass an required argument of the declared function, then it will return an error:
If the task exists, all arguments were passed but the type of an argument doesn't match the type of the argument on the declared function, then it will return an error:
If you pass an argument that is not required, then it will return an error:
You can also use the example task "mov3d" for testing purposes, which will simulate the trajectory of a particle:
Parameters:
{
"type":"mov3d",
"args": {
"dt": 0.001,
"mass": 1.0,
"r0": [0.5, 0.5, 0.5],
"v0": [10.0, 10.0, 10.0],
"radius": 0.3,
"drag": false
}
}
Example of response:
{
"id": "5861c348-fa0f-4b84-9e54-04b545408114",
"result": {[all results of simulation]},
"args": {[args you passed]},
"status": "SUCCESS",
"type": "mov3d",
}
"result":
{
"r": [[x1,y1,z1], [x2,y2,z2], [xn,yn,zn]],
"v": [[x1,y1,z1], [x2,y2,z2], [xn,yn,zn]],
"a": [[x1,y1,z1], [x2,y2,z2], [xn,yn,zn]],
"alpha": [[x1,y1,z1], [x2,y2,z2], [xn,yn,zn]],
"w": [[x1,y1,z1], [x2,y2,z2], [xn,yn,zn]],
"t": [t1, t2, tn]
}
/task/<task_id>
/view
Method: GET
This endpoint will return the current data for the task with id task_id
in application/json
format:
Example of request:
Parameters: <task_id>
Example of response:
{
"id": "5861c3a8-fa0f-4b84-9e54-04b545408114",
"result": 3,
"args": {"x": 1, "y": 2},
"status": "SUCCESS",
"type": "add",
}
The status of a retrieved task can be:
- PENDING
Means that the task doesn't exist OR exists and has not yet been received by a worker.
STARTED
Means that the task has been received by a worker and is actually being computed
SUCCESS
Means that the task has been received by a worker, have already been computed and is available for view
The handler
The handler is a module of the system based on Celery that will subscribe to the AMQP queue and wait for new tasks. When it receives a task, it will execute the task and store its value on the SQL database.
You can add as many tasks as you want in the code, by adding the decorator @app.task
on top of a function in the handler\tasks.py
file. Then, the function name will become a new task type and can be called from the API. For example, if you want to add a task that adds two numbers x and y, you should write the following:
It is required to explicitly declare the function arguments with annotations so that the API can process them correctly.
The handler will mark the current task row as locked in the database, so that the other instances of handlers can't edit at the same time.
⚠️ In order for all tasks processed by the API to be able to run on all handler instances, both the API and Handler
tasks.py
file must be EXACTLY the same.If you want some handler instances to run only specific tasks, see the
Customizing handler instances for specific tasks
section.
Customizing handler instances for specific tasks
A handler instance will only run the tasks that have been declared on the handler/tasks.py
.
If you want some tasks to run in a specific handler instance, then you should copy the source handler
directory to directories with different names for each handler instance type you want:
Then, you can edit each handler-typeX/tasks.py
file and add specific tasks to each handler.
After that, you can customize your docker-compose.yml
file to scale your different handler instance types:
version: "3"
services:
api:
build: ./api
ports:
- "8000:8080"
network_mode: "host"
environment:
DATABASE_URL: "postgres://postgres:123@localhost:5432/flask_tasks"
handler-type1:
build: ./handler-type1
network_mode: "host"
environment:
WORKER_NAME: "worker2"
AMQP_URL: "redis://localhost:6379"
DATABASE_URL: "postgres://postgres:123@localhost:5432/flask_tasks"
handler-type2:
build: ./handler-type2
network_mode: "host"
environment:
WORKER_NAME: "worker2"
AMQP_URL: "redis://localhost:6379"
DATABASE_URL: "postgres://postgres:123@localhost:5432/flask_tasks"
To deploy, run the docker compose build command
Then, run the docker compose up command: