Pitfalls - Concurrency issues

Summary

  • Concurrency Issues:

    • Common in parallel computations when multiple threads modify the same source or element.

  • Concurrent File Access:

    • Issues arise when multiple processes try to write to or read from the same file simultaneously.

    • Failed jobs due to simultaneous writes.

    • Missing or corrupt data from conflicting writes.

    • Solution: Ensure parallelized code operates independently.

    • Use index-specific output files and a collection script.

    • Example Slurm script for job dependency and collection provided.

  • Concurrent Database Access:

    • Databases handled by a server generally manage multiple requests and avoid concurrency issues.

    • However, server bottlenecks can occur, leading to decreased performance.

Probably the most common issues when running embarrassingly, or in general parallel computations, arises due to concurrency issues, where multiple threads try to modify the same source or the same element. Here, we will give some examples of issues that can arise

Concurrent File Access

Let’s take the following piece of code:

import pickle
from processing import load_and_preprocess_data, calculate

data = load_and_preprocess_data()
res = []
for i in range(len(data)):
    res.append(calculate(data[i]))

with open("results", "wb") as f:
    pickle.dump(res, "results")

which we have identified as embarrassingly parallel code and converted to the following:

import pickle
import sys
from processing import load_and_preprocess_data, calculate


def append_result(result, index):
    """
    Add a result to the result file
    """
    result_file = "results.pkl"
    try:
        with open(result_file, "rb") as f:
            allres = pickle.load(f)
    except FileNotFoundError:
        # The file does not yet exist
        allres = {}
    allres[index] = result
    with open(result_file, "wb") as f:
        pickle.dump(allres, f)


# Assuming load_and_preprocess_data and calculate are functions from processing library

# Load and preprocess data
data = load_and_preprocess_data()

# Get index from command line argument (assuming this Python script is called with the index as an argument)
index = int(sys.argv[1])

# Calculate result
res = calculate(data[index])

# Append result to the result file
append_result(res, index)

Our append_result function either creates the results file, if it does not exist yet, or, if it does, it reads in the result array. We run this function several times via an sarray job. This can work fine. However, it can also lead to errors, and worst case invalid results. The problem arises because we don’t know the point in time at which the result file is opened and written to. Likely some jobs will fail with an error thrown, when two processes try to simultaneously write to the same file, or that some try to read from the file while it is written to. More inconvenient could be suspiciously missing results in the results file, due to two reads of the file, which leads to conflicting writes, where one result is simply lost. Or, we could end up with an entirely corrupt file, when two jobs write simultaneously and the file system doesn’t complain about it…

So, an important thing to keep in mind is to have the parallelized code really be independent of other instances. In most cases this means, using index specific output files and having a “collection script” that combines the data after the jobs are complete. This can even be done with slurm internal methods if you use an array job:

#!/bin/bash
#SBATCH --job-name=collection_job
#SBATCH --dependency=afterok:your_array_job_id

# run your collection script
python collection.py

If you use a submission script you would need to list all jobs in the dependency part, at which point it likely is easier to just wait for completion and then manually do the collection. The above case would then become something like:

Calculation script:

import pickle
import sys
from processing import load_and_preprocess_data, calculate

# Assuming load_and_preprocess_data and calculate are functions from processing library

# Load and preprocess data
data = load_and_preprocess_data()

# Get index from command line argument (assuming this Python script is called with the index as an argument)
index = int(sys.argv[1])
result_folder = sys.argv[2]

# Calculate result
res = calculate(data[index])

# Write the result
with open(f"({result_folder}/result_{index})", "wb") as f:
    pickle.dump(res, f)

Submission:

#!/bin/bash
#SBATCH --job-name=long_job
#SBATCH --array=1-10
#SBATCH --time=01:00:00
#SBATCH --mem=1G
#SBATCH --cpus-per-task=1

# We assume, that singularity runs out of the box on your cluster, if not, you will have to
# add a command here that makes the singularity command available on your cluster

# Define a Result directory, since this should not ollide with others we use the slurm job ID
# To distinguish between different runs
RESULTDIR="results_$SLURM_JOB_ID"

# create the result dir if it does not exist
mkdir -p $RESULTDIR

# Example command: replace with your actual command
singularity exec python_container python train_and_plot.py $SLURM_ARRAY_TASK_ID $RESULTDIR

Collection script:

import os
import pickle
import sys

folder_path = sys.argv[1]

data_array = []
for filename in os.listdir(folder_path):
    if filename.startswith("result_"):
        index = int(filename.split("_")[1].split(".")[0])
        file_path = os.path.join(folder_path, filename)
        with open(file_path, "rb") as f:
            data_array.insert(index, pickle.load(f))

with open(file_path, "wb") as f:
    pickle.dump(data_array, f)

Submission for collection:

#!/bin/bash
#SBATCH --job-name=collection_job
#SBATCH --dependency=afterok:your_array_job_id

RESULTDIR="results_your_array_job_id"
# run your collection script
python collection.py RESULTDIR

While the above example might be a bit artificial, we once had a user, who had a more complex workflow, which needed to call scripts from several languages, and used files to communicate between the different runs. However, these temporary files were the same for all runs, which lead to results that were completely unreliable. The user was lucky that some of the runs actually failed with a very strange error and they had to re-run all of their runs as none were reliable. And these things can even be hidden within some libraries, which expect to only be used in one process at a time and e.g. write some temporary files to home folders.

Concurrent database access

Often using a database handled by a server protects against the problems described above, since the database server is developed to processes multiple different requests simultaneously and avoid concurrent modifications of entries in the database. However, this only applies to databases which are handled by an explicit server running in its own process. However, using such a server can also lead to a bottleneck, where jobs block execution because they are waiting for the server to handle their request. While commonly less problematic, this can potentially lead to an unexpected
decrease in performance.