Core concepts
Scheduled tasks
Scheduled tasks are pieces of code that run periodically. They often play a supporting role in web app backends, providing batch analytical processing capabilities.
Conceptually, a scheduled task is similar to a cron job - but with the crucial distinction that the compute resources are provisioned dynamically for the duration of the task execution.
Basic example
Let's define a scheduled task that runs once every day at 11pm, using a worker with 16 CPUs that is spun up on demand.
import multinode as mn
from datetime import datetime, timedelta
@mn.scheduled_task(
reference_time=datetime(2023, 9, 8, 23, 0),
interval=timedelta(days=1),
cpu=16
)
def run_daily():
# run daily task
Scheduling options
Reference time and interval
In our basic example, the schedule was specified using the reference_time
and interval
keywords. The task will run whenever the current time is equal to reference_time
plus an integer multiple of the interval
.
If you need to specify the timezone explicitly, you can do that by passing in a tzinfo
when defining the reference_time
.
The minimum possible interval
is 15 minutes.
Cron format
For a more granular control over the schedule, you can use cron syntax.
import multinode as mn
@mn.schedule(cron="0 23 * * 1-5")
def run_daily():
# Some daily job that only runs on weekdays (Monday - Friday)
Scheduled tasks in a wider context
A service exposing the results of a scheduled task
The results of this scheduled task are stored in a multinode
dict - a key-value store that is accessible from any process in the application. When the service receives a GET
request, the relevant results are loaded from this dict.
import multinode as mn
from fastapi import API
import uvicorn
summaries_dict = mn.get_dict(name="tweet_summaries")
@mn.scheduled_task(period=timedelta(days=1))
def extract_summaries_from_last_day():
tweets = scrape_tweets()
for tweet in tweets:
summary = summarise(tweet)
summaries_dict[summaries.subject] = summary.content
app = FastAPI()
@app.get(/tweet_summaries)
def get_summary_of_most_recent_tweet(subject):
return summaries_dict[subject]
@mn.service(port=80)
def api():
unicorn.run(app, host="0.0.0.0", port=80)
A scheduled task that distributes work across function calls
In the jobs section, we explained how a job can distribute its work across function calls. A scheduled task can do the same.
COUNTRY_NAMES = ["UK", "USA", "China", "India", ... ]
@mn.function(cpu=16)
def do_heavy_calculation(country_name):
# perform CPU-intensive calculation
@mn.scheduled_task(
reference_time=datetime(2023, 9, 8, 23, 0),
interval=timedelta(days=1)
cpu=0.1
)
def run_daily():
results_per_country = do_heavy_calculation.map(COUNTRY_NAMES)
save_results(results_per_country)