Multinode - Rapidly build distributed cloud applications in Python | Product Hunt

Core concepts

Jobs

Jobs are used to execute extended distributed workflows. They are similar to functions in that they are executed on dynamically provisioned hardware, and can run in parallel.

However, jobs provide some additional functionality.

  • Jobs provide support for progress monitoring and cancellation while the execution is in flight.

  • Job statuses and job results can be queried at any time, from anywhere in the codebase.

  • Jobs can hold locks, preventing certain kinds of race conditions.

import multinode as mn

@mn.job()
def process_data(input_data):
    output_data = perform_extended_computation(input_data)
    yield output_data

Yield, not return - exposing intermediate results

Why does a job yield rather than return? If the job takes several minutes to run, you may want to expose intermediate results before the job is finished.

For example, to produce a progress bar, your job can yield a progress percentage, which is a value between 0% and 100% that is increased upon reaching certain milestones. This mechanism would be impossible to implement with a simple return statement.

The example at the end of this section demonstrates this idea.

Methods on the job object

.start

This creates a new job, and returns an auto-generated UUID for this job.

job_id = process_data.start(input_data)

.get

This returns a JobInfo object (or raises a JobDoesNotExist error if there exists no job with the ID provided).

job_info = process_data.get(job_id)

The JobInfo object has three important fields:

  1. .status - an enum, whose possible values are "PENDING", "RUNNING", "SUCCEEDED", "FAILED", "CANCELLING", "CANCELLED".
  2. .result - the value most recently yielded by the job.
  3. .error - an error message if .status is "FAILED", or None, otherwise.

.cancel

This cancels a job (or raises a JobDoesNotExist error if there exists no job with the ID provided).

process_data.cancel(job_id)

.remove

This removes data about a job from the system. If the job is still "RUNNING" or "PENDING", it will be cancelled first.

process_data.remove(job_id)

There is no expiry time for job data. The data is persisted until .remove is called.

.list_ids

This lists the IDs of existing jobs in the system.

ids_of_existing_jobs = process_data.list_ids()

Implementing an API for managing job lifecycles

Let's expose our job to our users as an API. The API will be implemented as a service, which calls the various methods on the job object from within.

import multinode as mn

from fastapi import FastAPI

app = FastAPI()

@mn.job()
def process_data(input_data):
    output_data = perform_extended_computation(input_data)
    yield output_data

@app.post("/jobs")
def start_job(input_data):
    job_id = process_data.start(input_data)
    return job_id

@app.get("/jobs/{job_id}")
def get_job_info(job_id):
    return process_data.get(job_id).result

@app.put("/jobs/{job_id}/cancel")
def cancel_job(job_id):
    process_data.cancel(job_id)

@mn.service(port=80)
def main():
    app.run()

Gracefully handling cancellations

Sometimes, we may want to perform some cleanup when a job is cancelled. When the cancellation request is propagated to the job runtime, a JobCancelled error is raised. We can handle this error just like any another error using try-except-finally blocks.

import multinode as mn
from multinode.errors import JobCancelled

@mn.job()
def process_data(input_data):
    try:
        output_data = perform_extended_computation(input_data)
        yield output_data
    except JobCancelled:
        # e.g. log the cancellation event
    finally:
        # Clean up

Any cleanup action should finish within 5 minutes of the JobCancelled error, after which the job will be forcibly killed.

Locking: handling race conditions

Sometimes, we need to ensure that certain jobs do not run in parallel with one another, as this would lead to race conditions.

Suppose we have a job that performs maintenance on database tables. It is essential that we never have two concurrent job executions modifying the same table, since that may leave the table in an inconsistent state.

This issue can be resolved by locking on the table ID.

@app.post("/jobs")
def start_job(job_definition):
    try:
        process_data.start(
            input_data,
            lock=job_definition.table_id
        )
    except LockAlreadyInUse:
        # Exception raised if another job is locking on the same table_id
        return Response(
            status_code=400,
            content="Conflict with another job"
        )

Distributing a job workload using functions

Jobs can be optimised by parallelising the workload over multiple workers. This can be achieved by invoking functions from within the job.

Suppose we are building an app that calculates the best next n moves in a chess game.

We calculate one move at the time, based on previous moves that have already been calculated. At each step, five competing strategies are used to suggest candidate moves. Then, out of the five candidates, the one with the highest evaluation is selected.

The workflow is as follows:

  1. Data preprocessing. This requires a medium amount of compute and a medium amount of memory. It cannot be parallelised.
  2. Candidate generation. The five strategies can run in parallel with one another. Each requires heavy compute, but only a modest amount of memory.
  3. Ranking based on evaluations. There is no need to parallelise this because this is quick to begin with.

Steps 2 and 3 are repeated n times.

Chess example (light)

This calculation is extremely easy to implement in multinode!

STRATEGIES = [
    SimpleStrategy(),
    GreedyStrategy(),
    CarlsenStrategy(),
    MittensStrategy(),
    KasparovStrategy(),
]

@mn.job(cpu=0.1, memory="1GiB")
def calculate_next_n_moves(board_data, n):
    preprocessed_board_data = preprocess_data(board_data)

    chosen_moves = []
    for i in range(n):
        arguments = [
            (preprocessed_board_data, chosen_moves, s)
            for s in STRATEGIES
        ]
        candidates_for_next_move = suggest_candidate_move.map(arguments)

        best_next_move = choose_best_move(candidates_for_next_move)

        chosen_moves.append(best_next_move)
        yield chosen_moves

@mn.function(cpu=4, memory="32GiB")
def preprocess_data(board_data):
    # Do some preprocessing
    return preprocessed_board_data

@mn.function(cpu=16, memory="4GiB")
def suggest_candidate_move(preprocessed_board_data, previous_moves, strategy):
    # Do some heavy computations depending on the strategy
    return suggested_move

# NB not a @mn.function(), since this is inexpensive
def choose_best_move(candidate_moves):
    return max(moves, key=lambda m: m.evaluation)


# Now you can wrap this job as an API

Incidentally, this example clearly demonstrates the benefits of using yield rather than return. If we call .get(job_id) while the job is in progress, the .result field will contain the moves that have been generated so far. And of course, if we cancel the job midway, we retain access to the moves that have already been generated.

Previous
Services