Pitfalls - Concurrency issues
Summary
Concurrency Issues:
Common in parallel computations when multiple threads modify the same source or element.
Concurrent File Access:
Issues arise when multiple processes try to write to or read from the same file simultaneously.
Failed jobs due to simultaneous writes.
Missing or corrupt data from conflicting writes.
Solution: Ensure parallelized code operates independently.
Use index-specific output files and a collection script.
Example Slurm script for job dependency and collection provided.
Concurrent Database Access:
Databases handled by a server generally manage multiple requests and avoid concurrency issues.
However, server bottlenecks can occur, leading to decreased performance.
Probably the most common issues when running embarrassingly, or in general parallel computations, arises due to concurrency issues, where multiple threads try to modify the same source or the same element. Here, we will give some examples of issues that can arise
Concurrent File Access
Let’s take the following piece of code:
import pickle
from processing import load_and_preprocess_data, calculate
data = load_and_preprocess_data()
res = []
for i in range(len(data)):
res.append(calculate(data[i]))
with open("results", "wb") as f:
pickle.dump(res, "results")
library(processing)
# Load and preprocess data
data <- load_and_preprocess_data()
# Initialize an empty list to store results
res <- list()
# Loop through each element in data and calculate results
for (i in seq_along(data)) {
res[[i]] <- calculate(data[[i]])
}
# Save results using serialize
serialize(res, file="results")
which we have identified as embarrassingly parallel code and converted to the following:
import pickle
import sys
from processing import load_and_preprocess_data, calculate
def append_result(result, index):
"""
Add a result to the result file
"""
result_file = "results.pkl"
try:
with open(result_file, "rb") as f:
allres = pickle.load(f)
except FileNotFoundError:
# The file does not yet exist
allres = {}
allres[index] = result
with open(result_file, "wb") as f:
pickle.dump(allres, f)
# Assuming load_and_preprocess_data and calculate are functions from processing library
# Load and preprocess data
data = load_and_preprocess_data()
# Get index from command line argument (assuming this Python script is called with the index as an argument)
index = int(sys.argv[1])
# Calculate result
res = calculate(data[index])
# Append result to the result file
append_result(res, index)
library(processing)
append_result <- function(result, index) {
result_file <- "results.rds"
if (file.exists(result_file)) {
allres <- readRDS(result_file)
} else {
allres <- list()
}
allres[[as.character(index)]] <- result
writeRDS(allres, file = result_file)
}
# Assuming load_and_preprocess_data and calculate are functions from processing library
# Load and preprocess data
data <- load_and_preprocess_data()
# Get index from command line argument (assuming this R script is called with the index as an argument)
args <- commandArgs(trailingOnly = TRUE)
index <- as.integer(args[1])
# Calculate result
res <- calculate(data[[index]])
# Append result to the result file
append_result(res, index)
Our append_result
function either creates the results file, if it does not exist yet, or, if
it does, it reads in the result array. We run this function several times via an sarray
job.
This can work fine. However, it can also lead to errors, and worst case invalid results.
The problem arises because we don’t know the point in time at which the result file is opened
and written to. Likely some jobs will fail with an error thrown, when two processes try to
simultaneously write to the same file, or that some try to read from the file while it is
written to. More inconvenient could be suspiciously missing results in the results file, due to
two reads of the file, which leads to conflicting writes, where one result is simply lost.
Or, we could end up with an entirely corrupt file, when two jobs write simultaneously and the
file system doesn’t complain about it…
So, an important thing to keep in mind is to have the parallelized code really be independent of other instances. In most cases this means, using index specific output files and having a “collection script” that combines the data after the jobs are complete. This can even be done with slurm internal methods if you use an array job:
#!/bin/bash
#SBATCH --job-name=collection_job
#SBATCH --dependency=afterok:your_array_job_id
# run your collection script
python collection.py
If you use a submission script you would need to list all jobs in the dependency part, at which point it likely is easier to just wait for completion and then manually do the collection. The above case would then become something like:
Calculation script:
import pickle
import sys
from processing import load_and_preprocess_data, calculate
# Assuming load_and_preprocess_data and calculate are functions from processing library
# Load and preprocess data
data = load_and_preprocess_data()
# Get index from command line argument (assuming this Python script is called with the index as an argument)
index = int(sys.argv[1])
result_folder = sys.argv[2]
# Calculate result
res = calculate(data[index])
# Write the result
with open(f"({result_folder}/result_{index})", "wb") as f:
pickle.dump(res, f)
library(processing)
# Assuming load_and_preprocess_data and calculate are functions from processing library
# Load and preprocess data
data <- load_and_preprocess_data()
# Get index from command line argument (assuming this R script is called with the index as an argument)
args <- commandArgs(trailingOnly = TRUE)
index <- as.integer(args[1])
folder <- args[2]
# Calculate result
res <- calculate(data[[index]])
# Append result to the result file
writeRDS(res, file = sprintf("%s/result_%i", folder, index))
Submission:
#!/bin/bash
#SBATCH --job-name=long_job
#SBATCH --array=1-10
#SBATCH --time=01:00:00
#SBATCH --mem=1G
#SBATCH --cpus-per-task=1
# We assume, that singularity runs out of the box on your cluster, if not, you will have to
# add a command here that makes the singularity command available on your cluster
# Define a Result directory, since this should not ollide with others we use the slurm job ID
# To distinguish between different runs
RESULTDIR="results_$SLURM_JOB_ID"
# create the result dir if it does not exist
mkdir -p $RESULTDIR
# Example command: replace with your actual command
singularity exec python_container python train_and_plot.py $SLURM_ARRAY_TASK_ID $RESULTDIR
#!/bin/bash
#SBATCH --job-name=long_job
#SBATCH --array=1-10
#SBATCH --time=01:00:00
#SBATCH --mem=1G
#SBATCH --cpus-per-task=1
# We assume, that python3 runs out of the box on your cluster, if not, you will have to
# add a command here that makes the python3 command available on your cluster
# Define a Result directory, since this should not ollide with others we use the slurm job ID
# To distinguish between different runs
RESULTDIR="results_$SLURM_JOB_ID"
# create the result dir if it does not exist
mkdir -p $RESULTDIR
# Example command: replace with your actual command
Rscript your_example.r $SLURM_ARRAY_TASK_ID $RESULTDIR
Collection script:
import os
import pickle
import sys
folder_path = sys.argv[1]
data_array = []
for filename in os.listdir(folder_path):
if filename.startswith("result_"):
index = int(filename.split("_")[1].split(".")[0])
file_path = os.path.join(folder_path, filename)
with open(file_path, "rb") as f:
data_array.insert(index, pickle.load(f))
with open(file_path, "wb") as f:
pickle.dump(data_array, f)
args <- commandArgs(trailingOnly = TRUE)
folder_path <- args[1]
data_array <- list()
files <- list.files(folder_path)
for (filename in files) {
if (grepl("^result_[0-9]+\\.rds$", filename)) {
index <- as.numeric(sub("^result_([0-9]+)\\.rds$", "\\1", filename))
file_path <- file.path(folder_path, filename)
data_array[[index]] <- readRDS(file_path)
}
}
# Write the data array to a result file
result_file_path <- file.path(folder_path, "result.rds")
saveRDS(data_array, result_file_path)
Submission for collection:
#!/bin/bash
#SBATCH --job-name=collection_job
#SBATCH --dependency=afterok:your_array_job_id
RESULTDIR="results_your_array_job_id"
# run your collection script
python collection.py RESULTDIR
#!/bin/bash
#SBATCH --job-name=collection_job
#SBATCH --dependency=afterok:your_array_job_id
RESULTDIR="results_your_array_job_id"
# run your collection script
Rscript collection.r $RESULTDIR
While the above example might be a bit artificial, we once had a user, who had a more complex workflow, which needed to call scripts from several languages, and used files to communicate between the different runs. However, these temporary files were the same for all runs, which lead to results that were completely unreliable. The user was lucky that some of the runs actually failed with a very strange error and they had to re-run all of their runs as none were reliable. And these things can even be hidden within some libraries, which expect to only be used in one process at a time and e.g. write some temporary files to home folders.
Concurrent database access
Often using a database handled by a server protects against the problems described above, since the database
server is developed to processes multiple different requests simultaneously and avoid concurrent modifications
of entries in the database. However, this only applies to databases which are handled by an explicit server running
in its own process.
However, using such a server can also lead to a bottleneck, where jobs block execution because they are waiting for
the server to handle their request. While commonly less problematic, this can potentially lead to an unexpected
decrease in performance.