Task graphs

Delayed functions

The delayed function constructs a “node” object for a task graph. It’s essentially an R function, to be executed later, with a few more items:

  • Optionally a friendly display name in name
  • Optionally local=TRUE to execute the function on the current host – the default is to execute on TileDB Cloud.
  • If the function takes arguments, they can be specified by args=... in delayed, or, via a separate call to delayed_args.
  • The compute function executes the delayed functions.
a <- delayed(function()    {    9  },                    name='a', local=TRUE)
b <- delayed(function(x)   {  10*x },    args=list(a),   name='b', local=TRUE)
c <- delayed(function(x)   { 100*x },                    name='c', local=TRUE)
delayed_args(c) <- list(b)
d <- delayed(function(...) { sum(...) }, args=list(b,c), name='d', local=TRUE)

o <- compute(d, namespace=namespace, verbose=TRUE)
print(o)

with output

[1] 990

Delayed SQL queries

This is a simple convenience wrapper connecting delayed, as above, and TileDB Cloud SQL queries as described in the SQL vignette.

a <- delayed_sql(
    namespace='your-namespace',
    query="select `rows`, AVG(a) as avg_a from `tiledb://TileDB-Inc/quickstart_dense` GROUP BY `rows`",
    name="rows-query")
o <- compute(a, namespace=namespace, verbose=TRUE)
print(o)

with output

    avg_a rows
1  2.5000    1
2  6.5000    2
3 10.5000    3
4 14.5000    4

Delayed generic UDFs

This too is a simple convenience wrapper connecting delayed and generic UDFs as described in the UDFs vignette.

a <- delayed_generic_udf(
  namespace='your-namespace',
  udf=function(vec, exponent) {
    sum(vec ** exponent)
  },
  name='my-generic'
)
delayed_args(a) <- list(vec=1:10, exponent=3)
print(compute(a, namespace=namespaceToCharge))

with output

[1]
3025

Delayed array UDFs

This is another simple convenience wrapper connecting delayed and array UDFs as described in the UDFs vignette.

a <- delayed_array_udf(
  namespace='your-namespace'
  array="TileDB-Inc/quickstart_dense",
  udf=function(df) {
    vec <- as.vector(df[["a"]])
    list(min=min(vec), med=median(vec), max=max(vec))
  },
  selectedRanges=list(cbind(1,2), cbind(1,2)),
  attrs=c("a")
)
o <- compute(a, namespace=namespace, verbose=TRUE)
print(o)

with output

$min
[1] 1

$med
[1] 3.5

$max
[1] 6

Composing tasks into task graphs

Nodes, be they from delayed, delayed_sql, delayed_array_udf, or delayed_multi_array_udf, can be connected together into directed acyclic graphs. (If you construct a graph with a cyclic dependency, you’ll get an error message promptly.)

Here’s an example:

# Build several delayed objects to define a graph.

# Locally executed; simple enough.
local = delayed(function(x) { x*2 }, local=TRUE)
delayed_args(local) <- list(100)

# Array UDF -- we specify selected ranges and attributes, then do some R on the
# dataframe which the UDF receives.
array_apply <- delayed_array_udf(
  namespace=namespace, # namespace to charge
  array="TileDB-Inc/quickstart_dense",
  udf=function(df) { sum(as.vector(df[["a"]])) },
  selectedRanges=list(cbind(1,4), cbind(1,4)),
  attrs=c("a")
)

# SQL -- note the output is a dataframe, and values are all strings (MariaDB
# "decimal values") so we'll cast them to numeric later.
sql = delayed_sql(
  namespace=namespace,
  "select SUM(`a`) as a from `tiledb://TileDB-Inc/quickstart_dense`",
  name="sql"
)

# Custom function for averaging all the results we are passing in
ourmean <- function(local, array_apply, sql) {
    mean(c(local, array_apply, sql))
}

# This is essentially a task graph that looks like
#               ourmean
#          /       |      \
#         /        |       \    
#      local  array_apply  sql 
#
# The `local`, `array_apply` and `sql` tasks will computed first,
# and once all three are finished, `ourmean` will computed on their results.
# Note here we slot out the ansswer from the SQL dataframe using `[[...]]`,
# and also cast to numeric.
res <- delayed(ourmean, args=list(local, array_apply, as.numeric(sql[["a"]])))
print(compute(res, namespace=namespace, verbose=TRUE))

with output

[1] 168

Examining task graphs

In the ideal case, we connect together some computations and they run correctly the first time. In other cases, we need to inspect a bit. The TileDB-Cloud-R package offers a few different ways to do this. Let’s revisit the simple “diamond” example from above, a few different ways.

Run as-is

namespace <- 'your-namespace'
a <- delayed(function()    {    9  })
b <- delayed(function(x)   {  10*x },   args=list(a))
c <- delayed(function(x)   { 100*x },   args=list(a))
d <- delayed(function(...) { sum(...)}, args=list(b,c))

o <- compute(d, namespace=namespace)
print(o)

This is as above, except now we execute remotely.

This outputs

[1] 990

Run with verbosity

For more detail we can add display names to each node (the default naming is things like n000005, which is less intuitive), and also use the verbose option to compute. In an R CLI session, this will live-update as the DAG runs; in a notebook, you’ll see output when the DAG completes.

namespace <- 'your-namespace'

a <- delayed(function()    {    9  },                    name='a')
b <- delayed(function(x)   {  10*x },    args=list(a),   name='b')
c <- delayed(function(x)   { 100*x },    args=list(a),   name='c')
d <- delayed(function(...) { sum(...) }, args=list(b,c), name='d')

o <- compute(d, namespace=namespace, verbose=TRUE)
print(o)

with output like

All      nodes:    (4) a, b, c, d
Initial  nodes:    (1) a
Terminal node:     (1) d
Dependencies:
  a (0)
  b (1) a
  c (1) a
  d (2) b, c
Statuses:
  a  args_ready=TRUE status=NOT_STARTED
  b  args_ready=FALSE status=NOT_STARTED
  c  args_ready=FALSE status=NOT_STARTED
  d  args_ready=FALSE status=NOT_STARTED
1644869786 2022-02-14 15:16:26 START a
1644869786 2022-02-14 15:16:26 launch remote compute   a
1644869789 2022-02-14 15:16:29 finish remote compute   a
1644869789 2022-02-14 15:16:29 END   a
1644869789 2022-02-14 15:16:29 START c
1644869789 2022-02-14 15:16:29 START b
1644869789 2022-02-14 15:16:29 launch remote compute   c
1644869791 2022-02-14 15:16:31 finish remote compute   c
1644869791 2022-02-14 15:16:31 END   c
1644869789 2022-02-14 15:16:29 launch remote compute   b
1644869791 2022-02-14 15:16:31 finish remote compute   b
1644869791 2022-02-14 15:16:31 END   b
1644869791 2022-02-14 15:16:31 START d
1644869791 2022-02-14 15:16:31 launch remote compute   d
1644869794 2022-02-14 15:16:34 finish remote compute   d
1644869794 2022-02-14 15:16:34 END   d
All      nodes:    (4) a, b, c, d
Initial  nodes:    (1) a
Terminal node:     (1) d
Dependencies:
  a (0)
  b (1) a
  c (1) a
  d (2) b, c
Statuses:
  a  args_ready=TRUE status=COMPLETED
  b  args_ready=TRUE status=COMPLETED
  c  args_ready=TRUE status=COMPLETED
  d  args_ready=TRUE status=COMPLETED
[1] 990

Here we have a print of the DAG; then launch/finish of nodes in dependency order; then another print of the DAG; then finally the output as before.

Compute locally

Here the nodes run locally. Print statements within the nodes go to the terminal.

a <- delayed(function()    { cat("NODE A\n");    9  },                    name='a')
b <- delayed(function(x)   { cat("NODE B\n");  10*x },    args=list(a),   name='b')
c <- delayed(function(x)   { cat("NODE C\n"); 100*x },    args=list(a),   name='c')
d <- delayed(function(...) { cat("NODE D\n"); sum(...) }, args=list(b,c), name='d')

o <- compute(d, namespace=namespace, verbose=TRUE, force_all_local=TRUE)
print(o)

with output

All      nodes:    (4) a, b, c, d
Initial  nodes:    (1) a
Terminal node:     (1) d
Dependencies:
  a (0)
  b (1) a
  c (1) a
  d (2) b, c
Statuses:
  a  args_ready=TRUE status=NOT_STARTED
  b  args_ready=FALSE status=NOT_STARTED
  c  args_ready=FALSE status=NOT_STARTED
  d  args_ready=FALSE status=NOT_STARTED
1644869845 2022-02-14 15:17:25 START a
1644869845 2022-02-14 15:17:25 launch local compute   a
NODE A
1644869845 2022-02-14 15:17:25 finish local compute   a
1644869845 2022-02-14 15:17:25 END   a
1644869845 2022-02-14 15:17:25 START b
1644869845 2022-02-14 15:17:25 START c
1644869845 2022-02-14 15:17:25 launch local compute   b
NODE B
1644869845 2022-02-14 15:17:25 finish local compute   b
1644869845 2022-02-14 15:17:25 END   b
1644869845 2022-02-14 15:17:25 launch local compute   c
NODE C
1644869845 2022-02-14 15:17:25 finish local compute   c
1644869845 2022-02-14 15:17:25 END   c
1644869845 2022-02-14 15:17:25 START d
1644869845 2022-02-14 15:17:25 launch local compute   d
NODE D
1644869845 2022-02-14 15:17:25 finish local compute   d
1644869845 2022-02-14 15:17:25 END   d
All      nodes:    (4) a, b, c, d
Initial  nodes:    (1) a
Terminal node:     (1) d
Dependencies:
  a (0)
  b (1) a
  c (1) a
  d (2) b, c
Statuses:
  a  args_ready=TRUE status=COMPLETED
  b  args_ready=TRUE status=COMPLETED
  c  args_ready=TRUE status=COMPLETED
  d  args_ready=TRUE status=COMPLETED
> print(o)
[1] 990

Compute locally and sequentially

Here the nodes run not only locally, but with no concurrency, and all prints go directly to the terminal where you can see them right away:

a <- delayed(function()    { cat("NODE A\n");    9  },                    name='a')
b <- delayed(function(x)   { cat("NODE B\n");  10*x },    args=list(a),   name='b')
c <- delayed(function(x)   { cat("NODE C\n"); 100*x },    args=list(a),   name='c')
d <- delayed(function(...) { cat("NODE D\n"); sum(...) }, args=list(b,c), name='d')

o <- d$compute_sequentially()
print(o)
NODE A
NODE B
NODE C
NODE D
[1] 990

Continue from error

In this (admittedly artificial) example we show partial progress through a task graph, then a retry and continue.

> a <- delayed(function()    {  9                 },                    name='a')
> b <- delayed(function(x)   { stop("the train!") },    args=list(a),   name='b') # Intentional error
> c <- delayed(function(x)   { 100*x              },    args=list(a),   name='c')
> d <- delayed(function(...) { sum(...)           }, args=list(b,c), name='d')

> o <- d$compute(namespace=namespace)
Error in arg$poll(namespace = namespace, verbose = verbose, force_local = force_local) :
  node failed: b: tiledbcloud: received error response: Server returned 500 response status code. Message:  Error message: received an error from the container: docker container exited with non-zero code: 1

Docker logs:
Arguments file name:  /dev/shm/tiledb_da86fa44-c1d7-4831-828d-0790844da970/args
Error in (function (x)  : the train!
Calls: main -> compute_result -> do.call -> <Anonymous>
Execution halted
> show(d)
node=d,nargs=2Error in self$args_ready() : dependency has failed

> show(d$dag_for_terminal)
All      nodes:    (4) a, b, c, d
Initial  nodes:    (1) a
Terminal node:     (1) d
Dependencies:
  a (0)
  b (1) a
  c (1) a
  d (2) b, c
Statuses:
  a  args_ready=TRUE status=COMPLETED
  b  args_ready=TRUE status=FAILED
  c  args_ready=TRUE status=RUNNING
  d Error in self$args_ready() : dependency has failed

Fix the artificial error:

> b$func <- function(x) { 10*x }

> o <- d$compute(namespace=namespace)
> print(o)
[1] 990

> show(d$dag_for_terminal)
All      nodes:    (4) a, b, c, d
Initial  nodes:    (1) a
Terminal node:     (1) d
Dependencies:
  a (0)
  b (1) a
  c (1) a
  d (2) b, c
Statuses:
  a  args_ready=TRUE status=COMPLETED
  b  args_ready=TRUE status=COMPLETED
  c  args_ready=TRUE status=COMPLETED
  d  args_ready=TRUE status=COMPLETED
>