Multinode - Rapidly build distributed cloud applications in Python | Product Hunt

Core concepts

Scheduled tasks

Scheduled tasks are pieces of code that run periodically. They often play a supporting role in web app backends, providing batch analytical processing capabilities.

Conceptually, a scheduled task is similar to a cron job - but with the crucial distinction that the compute resources are provisioned dynamically for the duration of the task execution.

Basic example

Let's define a scheduled task that runs once every day at 11pm, using a worker with 16 CPUs that is spun up on demand.

import multinode as mn
from datetime import datetime, timedelta

@mn.scheduled_task(
    reference_time=datetime(2023, 9, 8, 23, 0),
    interval=timedelta(days=1),
    cpu=16
)
def run_daily():
    # run daily task

Scheduling options

Reference time and interval

In our basic example, the schedule was specified using the reference_time and interval keywords. The task will run whenever the current time is equal to reference_time plus an integer multiple of the interval.

If you need to specify the timezone explicitly, you can do that by passing in a tzinfo when defining the reference_time.

The minimum possible interval is 15 minutes.

Cron format

For a more granular control over the schedule, you can use cron syntax.

import multinode as mn

@mn.schedule(cron="0 23 * * 1-5")
def run_daily():
    # Some daily job that only runs on weekdays (Monday - Friday)

Scheduled tasks in a wider context

A service exposing the results of a scheduled task

The results of this scheduled task are stored in a multinode dict - a key-value store that is accessible from any process in the application. When the service receives a GET request, the relevant results are loaded from this dict.

import multinode as mn
from fastapi import API
import uvicorn

summaries_dict = mn.get_dict(name="tweet_summaries")

@mn.scheduled_task(period=timedelta(days=1))
def extract_summaries_from_last_day():
    tweets = scrape_tweets()
    for tweet in tweets:
        summary = summarise(tweet)
        summaries_dict[summaries.subject] = summary.content

app = FastAPI()

@app.get(/tweet_summaries)
def get_summary_of_most_recent_tweet(subject):
    return summaries_dict[subject]

@mn.service(port=80)
def api():
    unicorn.run(app, host="0.0.0.0", port=80)

A scheduled task that distributes work across function calls

In the jobs section, we explained how a job can distribute its work across function calls. A scheduled task can do the same.

COUNTRY_NAMES = ["UK", "USA", "China", "India", ... ]

@mn.function(cpu=16)
def do_heavy_calculation(country_name):
    # perform CPU-intensive calculation

@mn.scheduled_task(
    reference_time=datetime(2023, 9, 8, 23, 0),
    interval=timedelta(days=1)
    cpu=0.1
)
def run_daily():
    results_per_country = do_heavy_calculation.map(COUNTRY_NAMES)
    save_results(results_per_country)
Previous
Jobs