mcparallel / mccollect

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

mcparallel / mccollect

michellang
Hi there,

I've tried to implement an asynchronous job scheduler using
parallel::mcparallel() and parallel::mccollect(..., wait=FALSE). My
goal was to send processes to the background, leaving the R session
open for interactive use while all jobs store their results on the
file system. To keep track of the running jobs I've stored the process
ids and written a little helper to not spawn new threads before
already started threads have terminated if the maximum number of CPUs
is reached.

Unfortunately, this turned out to be impossible with the current
implementation in parallel for a number of reasons:

1) The returned results are not named by process id or job name if
wait is set to FALSE.

2) The number of returned results depends on the state of computation:
If all or none jobs are finished, just NULL is returned. Otherwise a
list of so far collected results is returned.

3) Combining (1) and (2) renders mapping the results to the stored
process ids impossible. E.g., if you query mccollect for the results
of 4 jobs and set wait=FALSE, you can get an unnamed list with one
result or a list with four results but in a different order.

4) An obvious workaround would wrap the expression to evaluate in a
function which sticks a unique identifier to the return value. This
way, one would not have to rely on process ids or job names. However,
each job has to be collected twice:  the first time you get the result
(which is fine for the workaround), the second time you just get NULL.
And you have to collect them twice to free used resources -- at least
on unix systems.

Here is a small example to illustrate the current behavior:

library(parallel)
f = function(x) { Sys.sleep(x); sprintf("job with x = %i", x) }
jobs = integer()
jobs = c(jobs, mcparallel(f(10), name = "jobname1")$pid)
jobs = c(jobs, mcparallel(f(3), name = "jobname2")$pid)

for (i in 1:13) {
  message("\ni = ", i)
  print(mccollect(jobs, wait = FALSE, timeout = 0))
  Sys.sleep(1)
}

I've created a small patch
(<https://gist.github.com/mllg/82410d0f564a7a24251e9e747e210b39>)
which applies the same mechanism to name the results for wait=FALSE as
it was already implemented for wait=TRUE. I think the documentation is
already rather describing the behavior after my patch than before my
patch.
A note on the need to collect results twice might prove useful for the
future though.

Thanks,
Michel

______________________________________________
[hidden email] mailing list
https://stat.ethz.ch/mailman/listinfo/r-devel
Reply | Threaded
Open this post in threaded view
|

Re: mcparallel / mccollect

Simon Urbanek
Michel,

thanks, you're right, that the list should have names. Your patch has the match() part backwards, but is otherwise the right idea. I have committed a variant in R-devel and will back-port later.

Thanks,
Simon



> On Aug 30, 2016, at 8:43 AM, Michel Lang <[hidden email]> wrote:
>
> Hi there,
>
> I've tried to implement an asynchronous job scheduler using
> parallel::mcparallel() and parallel::mccollect(..., wait=FALSE). My
> goal was to send processes to the background, leaving the R session
> open for interactive use while all jobs store their results on the
> file system. To keep track of the running jobs I've stored the process
> ids and written a little helper to not spawn new threads before
> already started threads have terminated if the maximum number of CPUs
> is reached.
>
> Unfortunately, this turned out to be impossible with the current
> implementation in parallel for a number of reasons:
>
> 1) The returned results are not named by process id or job name if
> wait is set to FALSE.
>
> 2) The number of returned results depends on the state of computation:
> If all or none jobs are finished, just NULL is returned. Otherwise a
> list of so far collected results is returned.
>
> 3) Combining (1) and (2) renders mapping the results to the stored
> process ids impossible. E.g., if you query mccollect for the results
> of 4 jobs and set wait=FALSE, you can get an unnamed list with one
> result or a list with four results but in a different order.
>
> 4) An obvious workaround would wrap the expression to evaluate in a
> function which sticks a unique identifier to the return value. This
> way, one would not have to rely on process ids or job names. However,
> each job has to be collected twice:  the first time you get the result
> (which is fine for the workaround), the second time you just get NULL.
> And you have to collect them twice to free used resources -- at least
> on unix systems.
>
> Here is a small example to illustrate the current behavior:
>
> library(parallel)
> f = function(x) { Sys.sleep(x); sprintf("job with x = %i", x) }
> jobs = integer()
> jobs = c(jobs, mcparallel(f(10), name = "jobname1")$pid)
> jobs = c(jobs, mcparallel(f(3), name = "jobname2")$pid)
>
> for (i in 1:13) {
>  message("\ni = ", i)
>  print(mccollect(jobs, wait = FALSE, timeout = 0))
>  Sys.sleep(1)
> }
>
> I've created a small patch
> (<https://gist.github.com/mllg/82410d0f564a7a24251e9e747e210b39>)
> which applies the same mechanism to name the results for wait=FALSE as
> it was already implemented for wait=TRUE. I think the documentation is
> already rather describing the behavior after my patch than before my
> patch.
> A note on the need to collect results twice might prove useful for the
> future though.
>
> Thanks,
> Michel
>
> ______________________________________________
> [hidden email] mailing list
> https://stat.ethz.ch/mailman/listinfo/r-devel
>

______________________________________________
[hidden email] mailing list
https://stat.ethz.ch/mailman/listinfo/r-devel