Integrating qsub with RStudio

10 minute read

In a previous post, I explained how to run RStudio on the compute nodes of the HPC. I am going to continue on this theme and explore some of the interesting side effects of running RStudio in a High Performance Computing environment.

This post assumes you are running all of the following code in an RStudio container on the HPC. Specifically, you must be using a container image that has the PBS commands wrapped in ssh commands (explained below).

Motivation

Imagine we want to do some data exploration and initial analysis. However, we are unsure of what computational resources we will require. We can request a large number of resources when we submit our RStudio container to the HPC. That way we will have the resources we need when we need them. However, as most of our work will be to understand our data and not run intensive analyses, most of these resources will be wasted.

Typical when we are working on the HPC we request resources per job and submit that to the queue. So why don’t we do that in our RStudio sessions? In the ideal scenario, we can request resources per chunk in our notebook. Thus, we can request resources only when we need them.

To make this possible we need to submit jobs to the queue using qsub from inside the container. However, there is one problem, qsub (and the other PBS commands) are not accessible from within the container. We need a way to talk to the PBS server from within the RStudio container so we can submit jobs to the queue with variable resources.

The solution is to wrap the PBS commands in an ssh command. That way, when we want to execute qsub from within the container the command is actually ssh’d out of the container. This code is already in the RStudio container recipe file. When the container is built a fake qsub command is created within the container for your use.

The fake qsub command sends an ssh command out of the container before calling the real qsub that submits the user defined code back into the queue to run on a different (or the same) compute node. Finally, the results are then read back into the starting container via the shared file system.

qsub-hpc-rstudio

Calling qsub from bash

Lets see how this works in practice. We’ll build our way up to a workable solution starting with a simple bash command.

# make a bash script
echo "#!/bin/bash
echo \"Hello World\"" > myscript.sh

# submit to qsub / redirect output to current dir
qsub -o $(pwd) $(pwd)/myscript.sh
# returns a job ID
>>> 1579438.jobmgr1

The qsub command used here is actually the ssh wrapper script from above. The fake qsub is taking the arguments and sending them to the real qsub via ssh. Because we redirect output to current directory we can read in the results once the script finishes running in the queue.

cat 1579438.jobmgr1.OU
>>> Hello World

We can streamline the process by piping the commands directly into qsub.

# submit to qsub using pipe / redirect output to current dir
echo "echo \"Hello World\"" | qsub -o $(pwd)
>>> 1579439.jobmgr1

Calling qsub from R

The code above must be run in a bash chunk in an RStudio markdown files. However, we can use R to submit qsub jobs and thus run them from regular R chunks.

# change to current dir and run `ls`
cd_current_dir <- paste("cd", getwd())
cmd <- paste(cd_current_dir, 'ls', sep = ' && ')  

# the command defined above is inserted into the submission pipeline code
qsub_cmd <- sprintf('echo "%s" | qsub -o $(pwd)', cmd)

 # call bash command with R
qsub_id <- system(qsub_cmd, intern = TRUE) 
qsub_id
>>> [1] "1579441.jobmgr1"

Now we can submit jobs, but we still need to read the results back into R.

reading results into R

We can read the job output file into R using the readLines function. However, because the output file is not create until the job completes we must wait for it to be created first.

cmd <- "
echo hello
echo world
echo hello
echo world
"

# call qsub from R and set the output dir to the current dir
qsub_cmd <- sprintf('echo "%s" | qsub -j oe -o %s', cmd, getwd())
qsub_id <- system(qsub_cmd, intern = TRUE)

# we know what the outfile will look like and where it'll be...
outfile <- paste0(qsub_id, ".OU")

# so we wait for the outfile to be created
while (!file.exists(outfile)) {
  Sys.sleep(1)
}

# once it's created we read the outfile into R
output <- readLines(outfile)

# clean up by removing the outfile
rm_outfile <- file.remove(outfile)

output
>>> [1] "hello" "world" "hello" "world"

writing a R qsub function

The above code can be generalised into a function. This qsub function allows you to wrap code that should be passed to the qsub command and run on a compute node.

qsub <-
  function(cmd,
           qsub_prams = "",
           run_dir = getwd(),
           outfile_dir = getwd(),
           remove_outfile = TRUE,
           sleep_time = 1) {
    cd_dir <- paste("cd", run_dir)
    cmd <- paste(cd_dir, cmd, sep = ' && ')
    qsub_cmd <-
      sprintf('echo "%s" | qsub %s -j oe -o %s', cmd, qsub_prams, outfile_dir)
    qsub_id <- system(qsub_cmd, intern = TRUE)
    outfile <- paste0(qsub_id, ".OU")
    while (!file.exists(outfile)) {
      Sys.sleep(sleep_time)
    }
    output <- readLines(outfile)
    if (remove_outfile) {
      rm_outfile <- file.remove(outfile)
    }
    output
  }

This function provides a simple interface for submitting jobs from within RStudio on the HPC.

cmd <- "echo hello world"
qsub(cmd)
>>> [1] "hello world"

The qsub function has a qsub_prams argument. This means that we can request resources on a per-function call basis and the request will be passed to the real qsub command.

qsub(cmd, qsub_prams="-l select=1:ncpus=4:mem=8gb")

Blocking code

The problem with this solution is that R waits for each qsub function call to finish before running the next one. Each call blocks until its outfile is created, only then will the next call run.

cmd <- "
sleep 5
echo finished
"
system.time({
  qsub(cmd)
  qsub(cmd)
  qsub(cmd)
})
# 
>>> user  system elapsed 
0.109   0.108  18.677 

To take full advantage of the HPC we want each command to run in parallel.

Running qsub in background jobs with RStudio

There are a few ways to run code at the “same” time in R. One example is using the background jobs feature. We do this by saving the qsub function we want to run into a RScript file and then running that file as a job. Using the importEnv and exportEnv args we can make sure our job runs with our environment and that the results are automatically added once the job completes.

# make a R script
echo "job2_res <- qsub(cmd)
" > qsub_job_script.R
# Use the rstudioapi to start the job (qsub_job_script.R) with a copy of the glob env and copy the result back to the global env.
job_id <- rstudioapi::jobRunScript(path = "qsub_job_script.R", importEnv = TRUE, exportEnv = 'R_GlobalEnv')

Running qsub in background jobs with parallel

A more practical solution to the blocking problem is to use the parallel library. With parallel::mcparallel you can evaluate an R expression asynchronously in a separate process.

library(parallel)
pid1 <- mcparallel(qsub(cmd))
pid2 <- mcparallel(qsub(cmd))
pid3 <- mcparallel(qsub(cmd))

All of these calls run instantly, however, they do not return the result of the qsub function. Instead, the mcparallel function returns a PID (process ID) for each job. The PID can be used to collect the results once they are finished.

While the processes are running you can do other things e.g. check the queue status.

qstat -u jc220896
>>> Job ID          Username Queue    Jobname    SessID NDS TSK Memory Time  S Time
--------------- -------- -------- ---------- ------ --- --- ------ ----- - -----
1579407.jobmgr1 jc220896 short    r_studio   107753   1   4    8gb 24:00 R 00:35
1579434.jobmgr1 jc220896 short    STDIN      138182   1   1    8gb 12:00 R 00:00
1579435.jobmgr1 jc220896 short    STDIN      138183   1   1    8gb 12:00 R 00:00
1579436.jobmgr1 jc220896 short    STDIN      138184   1   1    8gb 12:00 R 00:00

When you need the results you can call the mccollect function on each of the PIDs. The mccollect function will block until all of the qsub function calls are ready to return. However, they will all continue to run in parallel.

system.time({
res <- mccollect(list(pid1, pid2, pid3))
})
res
>>> user  system elapsed 
0.094   0.119   5.995 
$`194100`
[1] "finished"

$`194106`
[1] "finished"

$`194113`
[1] "finished"

Parallel qsub function

We can improve the qsub function so that mcparallel will be called each time. The new qsub function will not block until the results are collected.

parallel_qsub <-
  function(cmd,
           qsub_prams = "",
           run_dir = getwd(),
           outfile_dir = getwd(),
           remove_outfile = TRUE,
           sleep_time = 1) {
    cd_dir <- paste("cd", run_dir)
    cmd <- paste(cd_dir, cmd, sep = ' && ')
    qsub_cmd <-
      sprintf('echo "%s" | qsub %s -j oe -o %s', cmd, qsub_prams, outfile_dir)
    qsub_id <- system(qsub_cmd, intern = TRUE)
    outfile <- paste0(qsub_id, ".OU")
    
    # move mcparallel inside qsub
    output_pid <- mcparallel({
      while (!file.exists(outfile)) {
        Sys.sleep(sleep_time)
      }
      
      output <- readLines(outfile)
      if (remove_outfile) {
        rm_outfile <- file.remove(outfile)
      }
      output
    })
    output_pid
  }

This improves the interface for submitting jobs to the HPC.

pid1 <- parallel_qsub(cmd)
pid2 <- parallel_qsub(cmd)
pid3 <- parallel_qsub(cmd)

res1 <- mccollect(pid1)
res2 <- mccollect(pid2)
res3 <- mccollect(pid3)

future

Finally, we can use the package future to write non-blocking code with many different backends.

install.packages("future")

We can use a multisession plan to run multiple R sessions at the same time.

library(future)
# setting the plan takes sometime to spin up the other R session but it only has to run once
plan(multisession)

The qsub function call can be wrapped in implicit futures (v %<-% {}), they will only block when the value is queried.

cmd <- "
sleep 5
echo finished
"

# implicit futures calls run in the background
res1 %<-% {
  qsub(cmd)
}

res2 %<-% {
  qsub(cmd)
}

res3 %<-% {
  qsub(cmd)
}

When required the values can be queried. They will only block if they haven’t finished running.

res1
res2
res3
>>> [1] "finished"
[1] "finished"
[1] "finished"

An explicit future version of the qsub function can be created by wrapping the blocking code in the function with the future::future function.

future_qsub <-
  function(cmd,
           qsub_prams = "",
           run_dir = getwd(),
           outfile_dir = getwd(),
           remove_outfile = TRUE,
           sleep_time = 1) {
    cd_dir <- paste("cd", run_dir)
    cmd <- paste(cd_dir, cmd, sep = ' && ')
    qsub_cmd <-
      sprintf('echo "%s" | qsub %s -j oe -o %s', cmd, qsub_prams, outfile_dir)
    qsub_id <- system(qsub_cmd, intern = TRUE)
    outfile <- paste0(qsub_id, ".OU")
    
    # move future inside qsub
    output_future <- future({
      while (!file.exists(outfile)) {
        Sys.sleep(sleep_time)
      }
      
      output <- readLines(outfile)
      if (remove_outfile) {
        rm_outfile <- file.remove(outfile)
      }
      output
    })
    output_future
  }

The future_qsub function runs the code in the background and returns a future.

fut1 <- future_qsub(cmd)
fut2 <- future_qsub(cmd)
fut3 <- future_qsub(cmd)

fut1
>>> MultisessionFuture:
Label: <none>
Expression:
{
    while (!file.exists(outfile)) {
        Sys.sleep(sleep_time)
    }
    output <- readLines(outfile)
    if (remove_outfile) {
        rm_outfile <- file.remove(outfile)
    }
    output
}
Lazy evaluation: FALSE
Asynchronous evaluation: TRUE
Local evaluation: TRUE
Environment: <environment: 0x55d6e80a5b58>
Capture standard output: TRUE
Capture condition classes: condition
Globals: 3 objects totaling 248 bytes (character outfile of 136 bytes, numeric sleep_time of 56 bytes, logical remove_outfile of 56 bytes)
Packages: <none>
L'Ecuyer-CMRG RNG seed: <none> (seed = FALSE)
Resolved: FALSE
Value: <not collected>
Conditions captured: <none>
Early signaling: FALSE
Owner process: e2c28bcd-ac2d-dec8-96c8-1585b116328b
Class: MultisessionFuture, ClusterFuture, MultiprocessFuture, Future, environment

We can use the future::value function to get the value from a future.

res1 <- value(fut1)
res2 <- value(fut2)
res3 <- value(fut3)

res1
res2
res3
>>> [1] "finished"
[1] "finished"
[1] "finished"

Conclusions

We have seen a variety of methods to submit qsub scripts to the compute nodes from within our RStudio session on the HPC. However, the way we are submitting jobs here is still not optimal. We are submitting bash commands to qsub that then run on HPC, but ideally we want to submit R code. There are several solutions to this problem that I will elaborate in a future post (e.g. templates, container proxies, batchtools). For now, check out future.batchtools. Using future.batchtools you can seamlessly run R code on compute nodes without the hacky bash workaround found above. This allows you to do things like render plots in a job.

Comments