About task graphs

Parallelism

  • The future package’s default behavior is zero parallelism – hence we need an explicit future::plan.

  • Future’s plan::multicore is more or less like Python’s ProcessPoolExecutor. There exists no analog of ThreadPoolExecutor for R, and our understanding is there never will be – too many issues with threads and R.

Processes

  • Say you have a 2-node DAG where node B depends on node A.

  • There is an R process for the call to compute(B). Inside that is dag$poll() which invokes node$poll() for both the A and B nodes.

  • There will be a forked R process (mananged by the future package) for the call to A’s future, and another forked R process for the call to B’s future.

  • Both of those forked R processes, in turn, are just sitting in a synchronous call to the REST server to execute the UDF/SQL/etc. So, in any non-trivial DAG, the parent/polling process spends most of its time awaiting futures which are running in the child/forked processes, and those are in turn spending most of their time awaiting the results of HTTP requests to the REST server.

  • Those futures resolve into the parent process’ called to resolved(). The only communication between child processes and the parent process is via (a) the return value from the future, and stdout prints by the child. Crucially, prints to stderr are completely lost, and prints to stdout do not go directly to stdout – they only are obtainable once the future is resolved and the parent process gets all the child process’ stdout lines all at one go via self$future_result$stdout).

Call flow

  • Nodes are defined by calls to the delayed factory function, with edges via their args. Note that the DAG is at this point implicit in these nodes and edges.
    a <- delayed(function()  {   9  },                 name='a')
    b <- delayed(function(x) { 10*x }, args=list(a),   name='b')
    o <- compute(a), namespace='namespace_to_charge')
  • The terminal node’s compute() function instantiates a DAG object and stashes it as a slot within that node, for easy reference later. Nodes and edges are already in existence; the DAG’s purpose is to check for cycles, identify initial nodes, etc.

  • The DAG is poll-driven: its poll() method calls every node’s poll() method until computation is complete, or has failed at a node. In the latter case there is a stop() which aborts the DAG poll and returns control to the user.

  • Calling compute() again on the terminal node will leave computed nodes computed, and running nodes running, but will retry the failed nodes.

Debugging

  • If you are doing light debug, just have R do print/cat/etc within the node functions – their stdout will appear once the future completes.

  • If you are doing hard debug, and if you need info on child processes which are still running – I’ve used R’s broswer to debug the parent process, but I don’t know how to make that to work to debug child processes. The best experience I can suggest is doing write-to-disk-file within the child process and then tail that disk file from a separate terminal.

  • DAG invocation: More detail in the user-level vignette, but the essentials are:

    • a <- delayed(function() { 9 }, name='a')
    • b <- delayed(function(x) { 10*x }, args=list(a), name='b')
    • Simplest:
      • o <- compute(a), namespace='namespace_to_charge')
      • show(o$dag_for_terminal)
      • str(o$dag_for_terminal)
      • o <- compute(a), namespace='namespace_to_charge') # retry failed nodes
    • More detail:
      • o <- compute(a), namespace='namespace_to_charge', verbose=TRUE)