--- title: "Task graphs" output: github_document vignette: | %\VignetteIndexEntry{TaskGraphs} %\VignetteEngine{knitr::rmarkdown} %\VignetteEncoding{UTF-8} --- ## Delayed functions The `delayed` function constructs a "node" object for a task graph. It's essentially an R function, to be executed later, with a few more items: * Optionally a friendly display name in `name` * Optionally `local=TRUE` to execute the function on the current host -- the default is to execute on TileDB Cloud. * If the function takes arguments, they can be specified by `args=...` in `delayed`, or, via a separate call to `delayed_args`. * The `compute` function executes the delayed functions. ``` a <- delayed(function() { 9 }, name='a', local=TRUE) b <- delayed(function(x) { 10*x }, args=list(a), name='b', local=TRUE) c <- delayed(function(x) { 100*x }, name='c', local=TRUE) delayed_args(c) <- list(b) d <- delayed(function(...) { sum(...) }, args=list(b,c), name='d', local=TRUE) o <- compute(d, namespace=namespace, verbose=TRUE) print(o) ``` with output ``` [1] 990 ``` ## Delayed SQL queries This is a simple convenience wrapper connecting `delayed`, as above, and TileDB Cloud SQL queries as described in the SQL vignette. ``` a <- delayed_sql( namespace='your-namespace', query="select `rows`, AVG(a) as avg_a from `tiledb://TileDB-Inc/quickstart_dense` GROUP BY `rows`", name="rows-query") o <- compute(a, namespace=namespace, verbose=TRUE) print(o) ``` with output ``` avg_a rows 1 2.5000 1 2 6.5000 2 3 10.5000 3 4 14.5000 4 ``` ## Delayed generic UDFs This too is a simple convenience wrapper connecting `delayed` and generic UDFs as described in the UDFs vignette. ``` a <- delayed_generic_udf( namespace='your-namespace', udf=function(vec, exponent) { sum(vec ** exponent) }, name='my-generic' ) delayed_args(a) <- list(vec=1:10, exponent=3) print(compute(a, namespace=namespaceToCharge)) ``` with output ``` [1] 3025 ``` ## Delayed array UDFs This is another simple convenience wrapper connecting `delayed` and array UDFs as described in the UDFs vignette. ``` a <- delayed_array_udf( namespace='your-namespace' array="TileDB-Inc/quickstart_dense", udf=function(df) { vec <- as.vector(df[["a"]]) list(min=min(vec), med=median(vec), max=max(vec)) }, selectedRanges=list(cbind(1,2), cbind(1,2)), attrs=c("a") ) o <- compute(a, namespace=namespace, verbose=TRUE) print(o) ``` with output ``` $min [1] 1 $med [1] 3.5 $max [1] 6 ``` ## Composing tasks into task graphs Nodes, be they from `delayed`, `delayed_sql`, `delayed_array_udf`, or `delayed_multi_array_udf`, can be connected together into directed acyclic graphs. (If you construct a graph with a cyclic dependency, you'll get an error message promptly.) Here's an example: ``` # Build several delayed objects to define a graph. # Locally executed; simple enough. local = delayed(function(x) { x*2 }, local=TRUE) delayed_args(local) <- list(100) # Array UDF -- we specify selected ranges and attributes, then do some R on the # dataframe which the UDF receives. array_apply <- delayed_array_udf( namespace=namespace, # namespace to charge array="TileDB-Inc/quickstart_dense", udf=function(df) { sum(as.vector(df[["a"]])) }, selectedRanges=list(cbind(1,4), cbind(1,4)), attrs=c("a") ) # SQL -- note the output is a dataframe, and values are all strings (MariaDB # "decimal values") so we'll cast them to numeric later. sql = delayed_sql( namespace=namespace, "select SUM(`a`) as a from `tiledb://TileDB-Inc/quickstart_dense`", name="sql" ) # Custom function for averaging all the results we are passing in ourmean <- function(local, array_apply, sql) { mean(c(local, array_apply, sql)) } # This is essentially a task graph that looks like # ourmean # / | \ # / | \ # local array_apply sql # # The `local`, `array_apply` and `sql` tasks will computed first, # and once all three are finished, `ourmean` will computed on their results. # Note here we slot out the ansswer from the SQL dataframe using `[[...]]`, # and also cast to numeric. res <- delayed(ourmean, args=list(local, array_apply, as.numeric(sql[["a"]]))) print(compute(res, namespace=namespace, verbose=TRUE)) ``` with output ``` [1] 168 ``` ## Examining task graphs In the ideal case, we connect together some computations and they run correctly the first time. In other cases, we need to inspect a bit. The `TileDB-Cloud-R` package offers a few different ways to do this. Let's revisit the simple "diamond" example from above, a few different ways. ### Run as-is ``` namespace <- 'your-namespace' a <- delayed(function() { 9 }) b <- delayed(function(x) { 10*x }, args=list(a)) c <- delayed(function(x) { 100*x }, args=list(a)) d <- delayed(function(...) { sum(...)}, args=list(b,c)) o <- compute(d, namespace=namespace) print(o) ``` This is as above, except now we execute remotely. This outputs ``` [1] 990 ``` ### Run with verbosity For more detail we can add display names to each node (the default naming is things like `n000005`, which is less intuitive), and also use the `verbose` option to `compute`. In an R CLI session, this will live-update as the DAG runs; in a notebook, you'll see output when the DAG completes. ``` namespace <- 'your-namespace' a <- delayed(function() { 9 }, name='a') b <- delayed(function(x) { 10*x }, args=list(a), name='b') c <- delayed(function(x) { 100*x }, args=list(a), name='c') d <- delayed(function(...) { sum(...) }, args=list(b,c), name='d') o <- compute(d, namespace=namespace, verbose=TRUE) print(o) ``` with output like ``` All nodes: (4) a, b, c, d Initial nodes: (1) a Terminal node: (1) d Dependencies: a (0) b (1) a c (1) a d (2) b, c Statuses: a args_ready=TRUE status=NOT_STARTED b args_ready=FALSE status=NOT_STARTED c args_ready=FALSE status=NOT_STARTED d args_ready=FALSE status=NOT_STARTED 1644869786 2022-02-14 15:16:26 START a 1644869786 2022-02-14 15:16:26 launch remote compute a 1644869789 2022-02-14 15:16:29 finish remote compute a 1644869789 2022-02-14 15:16:29 END a 1644869789 2022-02-14 15:16:29 START c 1644869789 2022-02-14 15:16:29 START b 1644869789 2022-02-14 15:16:29 launch remote compute c 1644869791 2022-02-14 15:16:31 finish remote compute c 1644869791 2022-02-14 15:16:31 END c 1644869789 2022-02-14 15:16:29 launch remote compute b 1644869791 2022-02-14 15:16:31 finish remote compute b 1644869791 2022-02-14 15:16:31 END b 1644869791 2022-02-14 15:16:31 START d 1644869791 2022-02-14 15:16:31 launch remote compute d 1644869794 2022-02-14 15:16:34 finish remote compute d 1644869794 2022-02-14 15:16:34 END d All nodes: (4) a, b, c, d Initial nodes: (1) a Terminal node: (1) d Dependencies: a (0) b (1) a c (1) a d (2) b, c Statuses: a args_ready=TRUE status=COMPLETED b args_ready=TRUE status=COMPLETED c args_ready=TRUE status=COMPLETED d args_ready=TRUE status=COMPLETED [1] 990 ``` Here we have a print of the DAG; then launch/finish of nodes in dependency order; then another print of the DAG; then finally the output as before. ### Compute locally Here the nodes run locally. Print statements within the nodes go to the terminal. ``` a <- delayed(function() { cat("NODE A\n"); 9 }, name='a') b <- delayed(function(x) { cat("NODE B\n"); 10*x }, args=list(a), name='b') c <- delayed(function(x) { cat("NODE C\n"); 100*x }, args=list(a), name='c') d <- delayed(function(...) { cat("NODE D\n"); sum(...) }, args=list(b,c), name='d') o <- compute(d, namespace=namespace, verbose=TRUE, force_all_local=TRUE) print(o) ``` with output ``` All nodes: (4) a, b, c, d Initial nodes: (1) a Terminal node: (1) d Dependencies: a (0) b (1) a c (1) a d (2) b, c Statuses: a args_ready=TRUE status=NOT_STARTED b args_ready=FALSE status=NOT_STARTED c args_ready=FALSE status=NOT_STARTED d args_ready=FALSE status=NOT_STARTED 1644869845 2022-02-14 15:17:25 START a 1644869845 2022-02-14 15:17:25 launch local compute a NODE A 1644869845 2022-02-14 15:17:25 finish local compute a 1644869845 2022-02-14 15:17:25 END a 1644869845 2022-02-14 15:17:25 START b 1644869845 2022-02-14 15:17:25 START c 1644869845 2022-02-14 15:17:25 launch local compute b NODE B 1644869845 2022-02-14 15:17:25 finish local compute b 1644869845 2022-02-14 15:17:25 END b 1644869845 2022-02-14 15:17:25 launch local compute c NODE C 1644869845 2022-02-14 15:17:25 finish local compute c 1644869845 2022-02-14 15:17:25 END c 1644869845 2022-02-14 15:17:25 START d 1644869845 2022-02-14 15:17:25 launch local compute d NODE D 1644869845 2022-02-14 15:17:25 finish local compute d 1644869845 2022-02-14 15:17:25 END d All nodes: (4) a, b, c, d Initial nodes: (1) a Terminal node: (1) d Dependencies: a (0) b (1) a c (1) a d (2) b, c Statuses: a args_ready=TRUE status=COMPLETED b args_ready=TRUE status=COMPLETED c args_ready=TRUE status=COMPLETED d args_ready=TRUE status=COMPLETED > print(o) [1] 990 ``` ### Compute locally and sequentially Here the nodes run not only locally, but with no concurrency, and all prints go directly to the terminal where you can see them right away: ``` a <- delayed(function() { cat("NODE A\n"); 9 }, name='a') b <- delayed(function(x) { cat("NODE B\n"); 10*x }, args=list(a), name='b') c <- delayed(function(x) { cat("NODE C\n"); 100*x }, args=list(a), name='c') d <- delayed(function(...) { cat("NODE D\n"); sum(...) }, args=list(b,c), name='d') o <- d$compute_sequentially() print(o) ``` ``` NODE A NODE B NODE C NODE D [1] 990 ``` ### Continue from error In this (admittedly artificial) example we show partial progress through a task graph, then a retry and continue. ``` > a <- delayed(function() { 9 }, name='a') > b <- delayed(function(x) { stop("the train!") }, args=list(a), name='b') # Intentional error > c <- delayed(function(x) { 100*x }, args=list(a), name='c') > d <- delayed(function(...) { sum(...) }, args=list(b,c), name='d') > o <- d$compute(namespace=namespace) Error in arg$poll(namespace = namespace, verbose = verbose, force_local = force_local) : node failed: b: tiledbcloud: received error response: Server returned 500 response status code. Message: Error message: received an error from the container: docker container exited with non-zero code: 1 Docker logs: Arguments file name: /dev/shm/tiledb_da86fa44-c1d7-4831-828d-0790844da970/args Error in (function (x) : the train! Calls: main -> compute_result -> do.call -> Execution halted ``` ``` > show(d) node=d,nargs=2Error in self$args_ready() : dependency has failed > show(d$dag_for_terminal) All nodes: (4) a, b, c, d Initial nodes: (1) a Terminal node: (1) d Dependencies: a (0) b (1) a c (1) a d (2) b, c Statuses: a args_ready=TRUE status=COMPLETED b args_ready=TRUE status=FAILED c args_ready=TRUE status=RUNNING d Error in self$args_ready() : dependency has failed ``` Fix the artificial error: ``` > b$func <- function(x) { 10*x } > o <- d$compute(namespace=namespace) > print(o) [1] 990 > show(d$dag_for_terminal) All nodes: (4) a, b, c, d Initial nodes: (1) a Terminal node: (1) d Dependencies: a (0) b (1) a c (1) a d (2) b, c Statuses: a args_ready=TRUE status=COMPLETED b args_ready=TRUE status=COMPLETED c args_ready=TRUE status=COMPLETED d args_ready=TRUE status=COMPLETED > ```