###### Import packages/modules
import submitit
# memory profiler to evaluate how much your jobs demand
from memory_profiler import memory_usage
# import garbage collector: it is sometimes useful to trigger the garbage collector manually with gc.collect()
import gc
# import other modules
import time
Part 1: A fresh start on the CRNL cluster
Since we have a brand-new cluster, it is a good time to improve your way of using it if you had always felt that your workflow was not optimal. This tutorial will help you to set up your personal environment if you use Python.
If you are working on the CRNL cluster, you can find also the corresponding notebook at this location: /crnldata/projets_communs/tutorials/
Conda environment
Install miniconda to manage your virtual environment
Installing miniconda is not incompatible with using venv later on.
Open a terminal and type the following commands:
cd ~
wget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh
bash Miniconda3-latest-Linux-x86_64.sh
source ~/.bashrc
Follow the instructions and say yes to everything (you make press Ctrl+C once followed by Enter to skip the text faster)
NB: In JupyterLab, you can open a new terminal at ‘File->New->Terminal’. You can then bring this terminal just below your notebook by clicking on its tab and dragging it toward the lower part of the window.
Create your first conda environment
Still in the terminal, type:
conda create -n crnlenv python=3.9
conda activate crnlenv
The crnlenv virtual environment is active! Everything that will be installed from now on will only be accessible when crnlenv has been activated
Make your conda environment visible to Jupyter lab
Still in the terminal type:
conda install ipykernel
python -m ipykernel install --user --name crnlenv --display-name "Python (crnlenv)"
This commands allow your kernel to be accessed from Jupyter Lab, not only from the command line. If you create more conda environments / kernels, you will also have to run these lines
Populate your conda environment / kernel with essential tools
Install a package that allow to submit your jobs easily from any Jupyter notebook on Slurm
conda install -c conda-forge submitit
Install numpy
conda install numpy
Install a memory_profiler
pip install memory_profiler -U
Later on you could install various other tools in your virtual environment, but the priority is to check that you can use the cluster and distribute your jobs.
NB: if you wonder why install alternatively with conda or pip, the answer is: you can almost always do it with pip but if it works with conda, the package might be “better” installed in some case.
Venv environment
If you wish instead to use a venv to manage your virtual environment, you can use the default Python interpreter of the cluster (currently, version 3.11). To do so, type the following commands
python3 -m venv .localenv
source ./.localenv/bin/activate
pip install numpy submitit
pip install memory_profiler -U
If you also use conda, make sure that the (base) environment
UV environment
UV is a new tool for environment management that combines, in principe, the best of conda (ships Python distributions) and venv (fast, light, and 100% free). You may check UV here: https://github.com/astral-sh/uv If you have questions Samuel Garcia will be delighted to guide you ;).
Let’s start computing
You should be able to see the crnlenv in Jupyterlab if you go in “Kernel->Change Kernel”.
Select it and then restart the kernel (“Kernel->Restart Kernel”) to continue this tutorial.
On the top right of this window, you should see something like “Python (crnlenv)”. It means your notebook is running in the right virtual environment!
From now on, you will execute the code cells below, in order. You can do it either by pressing the play button (at the top of the notebook) or by clicking in the target cell and pressing Shift+Enter.
You may also want to check the tutorials of the module submitit used here.
###### Define a function that should run on the cluster
# this specific function is very dumb and only for demonstration purposes
# we will just feed it with a number and a string, but we could pass any object to it (filepath, DataFrames, etc.)
# here, the function only return one argument but it could return several (result result1, result2)
def yourFunction(argument1, argument2):
# print something to the log
print('I am running with argument1=' + str(argument1))
# sleep for the duration specified by argument1
# just to illustrate the parallel processing implemented
time.sleep(argument1)
# here we simply report the waiting time
=''
resultsfor i in range(argument1):
=f'waited {argument1}s'
results
# send the results back to the notebook
return results
# check time and memory usage of your function
# ideally, try to test it with the input values that will produce the biggest memory consumption
# such as the largest file in your dataset or the most fine-grained parameters for your analysis
= time.time()
start_time =memory_usage((yourFunction, (3,'consumption',)))
mem_usage= time.time()
end_time print('Maximum memory usage (in MB): %s' % max(mem_usage))
print('Maximum memory usage (in GB): %s' % (max(mem_usage)/1000))
print('Time taken (in s): %s' % (end_time-start_time))
Unfortunately, profiling memory usage of multi-threaded code (e.g. code that use several CPUs in parallel, etc) can be more difficult, and memory_usage will throw a warning if it detects multithreaded operations.
#### define some array for which each item will be associated with an independent job on the cluster
#### when you execute these cells, the jobs are sent to the cluster
# here we define an array of numbers: since this array will be used to feed the first argument of yourFunction
# and that yourFunction waits for as many second as its first argument, the jobs will return in the wrong order
# (with the output of the second call about 20s after the first one!)
=[1, 20, 2, 5]
array_parallel
# define an additional parameter to be passed to the function
='whatever'
additional_parameter
# initialize a list in which our returning jobs will be stored
=[]
joblist
# loop over array_parallel
print('#### Start submitting jobs #####')
=0
jcountfor i, value in enumerate(array_parallel):
# executor is the submission interface (logs are dumped in the folder)
# note that the executor was modified since the first version of this tutorial (it used to be AutoExecutor).
# This has implications for the key-value pairs that can be passed to update_parameters.
= submitit.SlurmExecutor(folder=os.getcwd()+'/tuto_logs/', max_num_timeout=5)
executor
# define execution parameters (see below for a list of common options)
='1000M', time=300, partition ="CPU", nodes=1, job_name=f'example_{i}')
executor.update_parameters(mem
# actually submit the job: note that "value" correspond to that of array_parallel in this iteration
= executor.submit(yourFunction, value, additional_parameter)
job
# add info about job submission order
=i
job.job_initial_indice
# print the ID of your job
print("submit job" + str(job.job_id))
# append the job to the joblist
joblist.append(job)
# increase the job count
=jcount+1
jcount
### wait for jobs to return (in the correct submission order)
print('#### Start waiting for jobs to return #####')
= [job.result() for job in jobs]
finished_results
### display jobs
print('#### All jobs completed #####')
print(finished_results)
Main options of executor.update_parameters
Key | Value | Effect |
---|---|---|
mem | string (ex: ‘1000M’, ‘1G’, etc) | Sets the memory limit of your job |
time | int (ex: 30) | Sets the timeout limit of your job (in minutes) |
partition | string (‘CPU’ or ‘GPU’) | Determines whether you want to use the GPU (usually not) |
nodelist | string (ex: ‘node8’, ‘node[1-11]’) | Whether you want to restrict to a set of compute nodes |
excluded | string (ex: ‘node21’, ‘node[1-2]’) | Whether you want to exclude specific nodes |
cpus_per_task | int (ex: 4) | How many CPUs should be booked for each job |
job_name | string (ex: ‘myjob’) | Custom job name (default: ‘submitit’) |
additional_parameters | dict (ex: {‘gpus-per-node’: 2, ‘mincpus’: 4} | Any parameter used by SBATCH |
Other options exist but they are poorly documented by submitit. If you pass an invalid key, submitit will print all the possibilities.
If you have very big data structures returning from your jobs, this “live” approach might saturate the memory of your notebook. If you encounter such issues, you might want to clean job results as their come back from the compute nodes.
### This piece of code may replace the line `finished_results = [job.result() for job in jobs]`. It might be useful for very heavy jobs and whenever you want to do live processing upon each job immediately when it returns.
# decide whether to clean job output live
=True
clean_jobs_live
# create a list to store finished jobs (optional, and depends on whether we need to cleanup job live)
=[]
finished_list=[]
finished_order
### now we will keep looking for a new finished job until all jobs are done:
=0
njobs_finishedwhile njobs_finished<jcount:
=-1
doneIdxfor j, job in enumerate(joblist):
if job.done():
=j
doneIdxbreak
if doneIdx>=0:
print(str(1+njobs_finished)+' on ' + str(jcount))
# report last job finished
print("last job finished: " + job.job_id)
# obtain result from job
=job.result()
job_result# do some processing with this job
print(job_result)
# decide what to do with the finished job object (e.g. save it somewhere, keep pieces of it, etc)
if clean_jobs_live:
# delete the job object
del job
# collect all the garbage immediately to spare memory
gc.collect()else:
# if we decided to keep the jobs in a list for further processing, add it finished job list
finished_list.append(job)
finished_order.append(job.job_initial_indice)# increment the count of finished jobs
=njobs_finished+1
njobs_finished# remove this finished job from the initial joblist
joblist.pop(doneIdx)
print('#### All jobs completed #####')
### If we chose to keep our job results for subsequent processing, it will often be crucial to reorder as a function of their initial
### submission order, rather than their return order (from the cluster). Here we only keep the results of the job
if clean_jobs_live==False:
= [finished_list[finished_order[i]].result() for i in finished_order]
finished_results print('Concatenated results obtained by applying yourFunction() to all items in array_parallel:')
print(finished_results)
Next part
Leave a message
Feel free to comment, make a question, report a bug, etc. If you want to notify a specific user (e.g. to get a faster response), you may include their Github handle in your message (e.g. @romainligneul or @samuelgarcia).