The future
package’s default behavior is zero
parallelism – hence we need an explicit
future::plan
.
Future’s plan::multicore
is more or less like
Python’s ProcessPoolExecutor
. There exists no analog of
ThreadPoolExecutor
for R, and our understanding is there
never will be – too many issues with threads and R.
Say you have a 2-node DAG where node B depends on node A.
There is an R process for the call to compute(B)
.
Inside that is dag$poll()
which invokes
node$poll()
for both the A and B nodes.
There will be a forked R process (mananged by the
future
package) for the call to A’s future, and another
forked R process for the call to B’s future.
Both of those forked R processes, in turn, are just sitting in a synchronous call to the REST server to execute the UDF/SQL/etc. So, in any non-trivial DAG, the parent/polling process spends most of its time awaiting futures which are running in the child/forked processes, and those are in turn spending most of their time awaiting the results of HTTP requests to the REST server.
Those futures resolve into the parent process’ called to
resolved()
. The only communication between child
processes and the parent process is via (a) the return value from the
future, and stdout prints by the child. Crucially, prints to stderr are
completely lost, and prints to stdout do not go directly to
stdout – they only are obtainable once the future is resolved
and the parent process gets all the child process’ stdout lines all at
one go via self$future_result$stdout)
.
a <- delayed(function() { 9 }, name='a')
b <- delayed(function(x) { 10*x }, args=list(a), name='b')
o <- compute(a), namespace='namespace_to_charge')
The terminal node’s compute()
function instantiates
a DAG object and stashes it as a slot within that node, for easy
reference later. Nodes and edges are already in existence; the DAG’s
purpose is to check for cycles, identify initial nodes, etc.
The DAG is poll-driven: its poll()
method calls
every node’s poll()
method until computation is complete,
or has failed at a node. In the latter case there is a
stop()
which aborts the DAG poll and returns control to the
user.
Calling compute()
again on the terminal node will
leave computed nodes computed, and running nodes running, but will retry
the failed nodes.
If you are doing light debug, just have R do
print
/cat
/etc within the node functions –
their stdout will appear once the future completes.
If you are doing hard debug, and if you need info on child
processes which are still running – I’ve used R’s broswer
to debug the parent process, but I don’t know how to make that to work
to debug child processes. The best experience I can suggest is doing
write-to-disk-file within the child process and then tail that disk file
from a separate terminal.
DAG invocation: More detail in the user-level vignette, but the essentials are:
a <- delayed(function() { 9 }, name='a')
b <- delayed(function(x) { 10*x }, args=list(a), name='b')
o <- compute(a), namespace='namespace_to_charge')
show(o$dag_for_terminal)
str(o$dag_for_terminal)
o <- compute(a), namespace='namespace_to_charge') # retry failed nodes
o <- compute(a), namespace='namespace_to_charge', verbose=TRUE)