Part 1: A fresh start on the CRNL cluster

Author

Romain Ligneul

Since we have a brand-new cluster, it is a good time to improve your way of using it if you had always felt that your workflow was not optimal. This tutorial will help you to set up your personal environment if you use Python.

Note

If you are working on the CRNL cluster, you can find also the corresponding notebook at this location: /crnldata/projets_communs/tutorials/

Conda environment

Install miniconda to manage your virtual environment

Installing miniconda is not incompatible with using venv later on.

Open a terminal and type the following commands:
cd ~
wget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh
bash Miniconda3-latest-Linux-x86_64.sh
source ~/.bashrc

Follow the instructions and say yes to everything (you make press Ctrl+C once followed by Enter to skip the text faster)

NB: In JupyterLab, you can open a new terminal at ‘File->New->Terminal’. You can then bring this terminal just below your notebook by clicking on its tab and dragging it toward the lower part of the window.

Create your first conda environment

Still in the terminal, type:
conda create -n crnlenv python=3.9
conda activate crnlenv

The crnlenv virtual environment is active! Everything that will be installed from now on will only be accessible when crnlenv has been activated

Make your conda environment visible to Jupyter lab

Still in the terminal type:
conda install ipykernel
python -m ipykernel install --user --name crnlenv --display-name "Python (crnlenv)"

This commands allow your kernel to be accessed from Jupyter Lab, not only from the command line. If you create more conda environments / kernels, you will also have to run these lines

Populate your conda environment / kernel with essential tools

Install a package that allow to submit your jobs easily from any Jupyter notebook on Slurm
conda install -c conda-forge submitit

Install numpy
conda install numpy

Install a memory_profiler
pip install memory_profiler -U

Later on you could install various other tools in your virtual environment, but the priority is to check that you can use the cluster and distribute your jobs.

NB: if you wonder why install alternatively with conda or pip, the answer is: you can almost always do it with pip but if it works with conda, the package might be “better” installed in some case.

Venv environment

If you wish instead to use a venv to manage your virtual environment, you can use the default Python interpreter of the cluster (currently, version 3.11). To do so, type the following commands

python3 -m venv .localenv source ./.localenv/bin/activate pip install numpy submitit pip install memory_profiler -U

If you also use conda, make sure that the (base) environment

UV environment

UV is a new tool for environment management that combines, in principe, the best of conda (ships Python distributions) and venv (fast, light, and 100% free). You may check UV here: https://github.com/astral-sh/uv If you have questions Samuel Garcia will be delighted to guide you ;).

Let’s start computing

You should be able to see the crnlenv in Jupyterlab if you go in “Kernel->Change Kernel”.

Select it and then restart the kernel (“Kernel->Restart Kernel”) to continue this tutorial.

On the top right of this window, you should see something like “Python (crnlenv)”. It means your notebook is running in the right virtual environment!

From now on, you will execute the code cells below, in order. You can do it either by pressing the play button (at the top of the notebook) or by clicking in the target cell and pressing Shift+Enter.

You may also want to check the tutorials of the module submitit used here.

###### Import packages/modules
import submitit
# memory profiler to evaluate how much your jobs demand
from memory_profiler import memory_usage
# import garbage collector: it is sometimes useful to trigger the garbage collector manually with gc.collect()
import gc
# import other modules
import time
###### Define a function that should run on the cluster

# this specific function is very dumb and only for demonstration purposes
# we will just feed it with a number and a string, but we could pass any object to it (filepath, DataFrames, etc.)
# here, the function only return one argument but it could return several (result result1, result2)
def yourFunction(argument1, argument2):

    # print something to the log
    print('I am running with argument1=' + str(argument1))
    
    # sleep for the duration specified by argument1
    # just to illustrate the parallel processing implemented
    time.sleep(argument1)
    
    # here we simply report the waiting time
    results=''
    for i in range(argument1):
        results=f'waited {argument1}s'

    # send the results back to the notebook
    return results
# check time and memory usage of your function
# ideally, try to test it with the input values that will produce the biggest memory consumption 
# such as the largest file in your dataset or the most fine-grained parameters for your analysis
start_time = time.time()
mem_usage=memory_usage((yourFunction, (3,'consumption',)))
end_time = time.time()
print('Maximum memory usage (in MB): %s' % max(mem_usage))
print('Maximum memory usage (in GB): %s' % (max(mem_usage)/1000))
print('Time taken (in s): %s' % (end_time-start_time))
Caution

Unfortunately, profiling memory usage of multi-threaded code (e.g. code that use several CPUs in parallel, etc) can be more difficult, and memory_usage will throw a warning if it detects multithreaded operations.

#### define some array for which each item will be associated with an independent job on the cluster
#### when you execute these cells, the jobs are sent to the cluster 

# here we define an array of numbers: since this array will be used to feed the first argument of yourFunction
# and that yourFunction waits for as many second as its first argument, the jobs will return in the wrong order
# (with the output of the second call about 20s after the first one!)
array_parallel=[1, 20, 2, 5]

# define an additional parameter to be passed to the function
additional_parameter='whatever'

# initialize a list in which our returning jobs will be stored
joblist=[]

# loop over array_parallel
print('#### Start submitting jobs #####')

jcount=0
for i, value in enumerate(array_parallel):
    
  # executor is the submission interface (logs are dumped in the folder)
  # note that the executor was modified since the first version of this tutorial (it used to be AutoExecutor).
  # This has implications for the key-value pairs that can be passed to update_parameters.
  executor = submitit.SlurmExecutor(folder=os.getcwd()+'/tuto_logs/', max_num_timeout=5)
    
  # define execution parameters (see below for a list of common options)
  executor.update_parameters(mem='1000M', time=300, partition ="CPU", nodes=1, job_name=f'example_{i}')

  # actually submit the job: note that "value" correspond to that of array_parallel in this iteration
  job = executor.submit(yourFunction, value, additional_parameter)
  
  # add info about job submission order
  job.job_initial_indice=i 
  
  # print the ID of your job
  print("submit job" + str(job.job_id))  

  # append the job to the joblist
  joblist.append(job)

  # increase the job count
  jcount=jcount+1

### wait for jobs to return (in the correct submission order)
print('#### Start waiting for jobs to return #####')
finished_results = [job.result() for job in jobs]

### display jobs
print('#### All jobs completed #####')
print(finished_results)
Main options of executor.update_parameters
Key Value Effect
mem string (ex: ‘1000M’, ‘1G’, etc) Sets the memory limit of your job
time int (ex: 30) Sets the timeout limit of your job (in minutes)
partition string (‘CPU’ or ‘GPU’) Determines whether you want to use the GPU (usually not)
nodelist string (ex: ‘node8’, ‘node[1-11]’) Whether you want to restrict to a set of compute nodes
excluded string (ex: ‘node21’, ‘node[1-2]’) Whether you want to exclude specific nodes
cpus_per_task int (ex: 4) How many CPUs should be booked for each job
job_name string (ex: ‘myjob’) Custom job name (default: ‘submitit’)
additional_parameters dict (ex: {‘gpus-per-node’: 2, ‘mincpus’: 4} Any parameter used by SBATCH

Other options exist but they are poorly documented by submitit. If you pass an invalid key, submitit will print all the possibilities.

Warning

If you have very big data structures returning from your jobs, this “live” approach might saturate the memory of your notebook. If you encounter such issues, you might want to clean job results as their come back from the compute nodes.

### This piece of code may replace the line `finished_results = [job.result() for job in jobs]`. It might be useful for very heavy jobs and whenever you want to do live processing upon each job immediately when it returns.

# decide whether to clean job output live
clean_jobs_live=True

# create a list to store finished jobs (optional, and depends on whether we need to cleanup job live)
finished_list=[]
finished_order=[]

### now we will keep looking for a new finished job until all jobs are done:
njobs_finished=0
while njobs_finished<jcount:
  doneIdx=-1
  for j, job in enumerate(joblist):
    if job.done():
      doneIdx=j
      break
  if doneIdx>=0:
    print(str(1+njobs_finished)+' on ' + str(jcount))
    # report last job finished
    print("last job finished: " + job.job_id)
    # obtain result from job
    job_result=job.result()
    # do some processing with this job
    print(job_result)
    # decide what to do with the finished job object (e.g. save it somewhere, keep pieces of it, etc)
    if clean_jobs_live:
      # delete the job object
      del job
      # collect all the garbage immediately to spare memory
      gc.collect()
    else:
      # if we decided to keep the jobs in a list for further processing, add it finished job list 
      finished_list.append(job)
      finished_order.append(job.job_initial_indice)
    # increment the count of finished jobs
    njobs_finished=njobs_finished+1
    # remove this finished job from the initial joblist
    joblist.pop(doneIdx)
    
print('#### All jobs completed #####')
### If we chose to keep our job results for subsequent processing, it will often be crucial to reorder as a function of their initial
### submission order, rather than their return order (from the cluster). Here we only keep the results of the job
if clean_jobs_live==False:
  finished_results = [finished_list[finished_order[i]].result() for i in finished_order]
  print('Concatenated results obtained by applying yourFunction() to all items in array_parallel:')
  print(finished_results)

Next part

Click here to go to Part 2

Leave a message

Feel free to comment, make a question, report a bug, etc. If you want to notify a specific user (e.g. to get a faster response), you may include their Github handle in your message (e.g. @romainligneul or @samuelgarcia).