Parallelize using scripting

Even in an embarrassingly parallel situation we will have to make a couple of adaptions to the code in order to be able to run parallel executions of our code. In the previous episode, we converted a Jupyter notebook into a basic Python script that could be submitted with Slurm. This starting script can be found on github.

Updating the code for parallel execution (and flexibility)

Working on a cluster allows us to run code simultaneously on multiple machines. However, we need to find a way to split our work into suitable pieces, in order to make use of this. If you look at lines 30-66 of our code, you will notice, that it essentially runs the same code for all possible metricies and neighborhood numbers. All of these runs are completely independent of each other, and could all be run at the same time. We will start by running all metricies for each of the neighbor counts in parallel. To do so, we need to make the neighbor count a parameter of the function, that we can then pass in. Currently, our code has hard coded parameters, which makes this a difficult task, so it would be good to convert it such, that it allows us to specify the parameters that we want to use as arguments. There are many options to do so, but we will use argparse in our example (some more details can be found here).

from pathlib import Path
import pickle
import argparse
import matplotlib.pyplot as plt

from sklearn.inspection import DecisionBoundaryDisplay
from sklearn.neighbors import KNeighborsClassifier
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler

# ## Fit pipelines and plot decision boundaries
#
# Loop over the `n_neighbors` parameter
#
# - Fit a standard scaler + knn classifier pipeline
# - Plot decision boundaries and save the image to disk

# Parse arguments
parser = argparse.ArgumentParser()
parser.add_argument(
    "--n_neighbors",
    type=int,
    help="The number of neighbors to use for calculation.",
)
args = parser.parse_args()
n_neighbors = args.n_neighbors

# Load preprocessed data from disk
with open("data/preprocessed/Iris.pkl", "rb") as f:
    data = pickle.load(f)
    X, X_train, X_test, y, y_train, y_test, features, classes = data


# Parameters
# Metrics: https://scikit-learn.org/stable/modules/generated/sklearn.metrics.pairwise.distance_metrics.html#sklearn.metrics.pairwise.distance_metrics
metrics = ["euclidean", "manhattan", "l1", "haversine", "cosine"]

# Loop over n_neighbors
for metric in metrics:
    # Fit
    clf = Pipeline(
        steps=[
            ("scaler", StandardScaler()),
            ("knn", KNeighborsClassifier(n_neighbors=n_neighbors, metric=metric)),
        ]
    )
    clf.fit(X_train, y_train)

    # Plot
    disp = DecisionBoundaryDisplay.from_estimator(
        clf,
        X_test,
        response_method="predict",
        plot_method="pcolormesh",
        xlabel=features[0],
        ylabel=features[1],
        shading="auto",
        alpha=0.5,
    )
    scatter = disp.ax_.scatter(X.iloc[:, 0], X.iloc[:, 1], c=y, edgecolors="k")
    disp.ax_.legend(
        scatter.legend_elements()[0],
        classes,
        loc="lower left",
        title="Classes",
    )
    _ = disp.ax_.set_title(
        f"3-Class classification\n(k={n_neighbors!r}, metric={metric!r})"
    )
    plt.show()

    # Save image to disk
    Path("results/").mkdir(parents=True, exist_ok=True)
    plt.savefig(f"results/n_neighbors={n_neighbors}___metric={metric}.png")
    plt.close()

This file now receives one parameter called n_neighbors and uses this to determine the number of neighbors to run the different metrics for. This also requires us to update the submission script, in order to allow passing in the neighbor count. There are two ways how this can be done.

  • Slurm Array jobs

  • Custom Submission scripts

Slurm array jobs have the advantage of being an integral part of slurm, so you can use features like mail notification once the whole job is done. The difference between a “normal” job and an array job is that an array-job runs the same code multiple times with the only difference between the jobs being the $SLURM_ARRAY_JOB_ID variable. The values for the variable are defined in the submission script using the --array parameter, which can take either ranges (--array=[0-3], four jobs with array id 0,1,2 and 3) or individual numbers (--array=1,3,6 three jobs with id 1,3 and 6). However you can only use integer values for this, so if you need any other input, you will either need to convert the numbers in your code, or keep hard coded variable lists in your code, which makes it less flexible.

Using a custom submission script has the disadvantage, that you need to code it. In addition, since it submits individual jobs, you also cannot make use of notifications for the whole set of jobs. However, it has the advantage that you are more flexible in what kind of variables you want to post to your job.

For this walk-through, we will use a custom submission script, but we also give an example of how to do it with a slurm array job here

Update the slurm scripts

First, we need to separate the execution of the pre-processing from the actual calculations. This is to avoid having multiple jobs writing to the preprocessed data file at once and thus corrupting that file. The submission script for this is:

#!/bin/bash
#SBATCH --job-name=long_job
#SBATCH --time=00:30:00
#SBATCH --mem=500M
#SBATCH --cpus-per-task=1

# We assume, that singularity runs out of the box on your cluster, if not, you will have to
# add a command here that makes the singularity command available on your cluster

# Example command: replace with your actual command
singularity exec python_container python preprocess.py

Which is just the first part of our previous submission script. Once this script has completed successfully we want to run all the different neighbor options, and thus need a submission script, which allows setting those:

#!/bin/bash
#SBATCH --job-name=long_job
#SBATCH --time=01:00:00
#SBATCH --mem=1G
#SBATCH --cpus-per-task=1

# We assume, that singularity runs out of the box on your cluster, if not, you will have to
# add a command here that makes the singularity command available on your cluster

singularity exec python_container python train_and_plot.py --n_neighbors $1

This script takes in one argument and passes it on to the train_and_plot script.

Create a submission script

Finally, we need to submit our job. This can be done using whatever language you prefer. Here, we give some examples:

import subprocess

neighbors = [1, 2, 4, 8, 16, 32, 64]

for i in neighbors:
    result = subprocess.run(["sbatch", "submission.sh", f"{i}"])

You can run this code by running python3 submission.py and it will submit the submission.sh file to the slurm scheduler for each of your options.

Post processing steps

While not relevant to our example, in many cases, you will generate many output files, that you need to combine again after the computation is done (e.g. to compute some general statistics). In this situation, you need to think about how the result data from each run can be stored. A simple example of this could be (in pseudo-code):

data = load_preprocessed_data()
for i in 1..20:
    res[i] = compute(data[i])
calc_statistics(res)

If you parallelize this code you will end up with something along the lines of:

i = index_from_arguments()
data = load_preprocessed_data()
res = compute(data[i])
with open(f'outputs/result_{i}') as f:
    pickle.dump(f, res)

which saves the data to a file in the results folder You can then use the following code to collect these results into one large file, (or directly process it instead of saving it again)

import os
import pickle
import sys

folder_path = sys.argv[1]

data_array = []
for filename in os.listdir(folder_path):
    if filename.startswith("result_"):
        index = int(filename.split("_")[1].split(".")[0])
        file_path = os.path.join(folder_path, filename)
        with open(file_path, "rb") as f:
            data_array.insert(index, pickle.load(f))

with open(file_path, "wb") as f:
    pickle.dump(data_array, f)

Exercise 1

Parallel-1: Add the metrics as a parameter to the submission

Let’s assume that we noticed, that running all metrics still took too long for our purpose, and we also want to parallelize that part. You will need to update the train_and_plot.py script, the submission script and the sbatch script.