The delayed function constructs a “node” object for a
task graph. It’s essentially an R function, to be executed later, with a
few more items:
namelocal=TRUE to execute the function on the
current host – the default is to execute on TileDB Cloud.args=... in delayed, or, via a separate call
to delayed_args.compute function executes the delayed
functions.a <- delayed(function() { 9 }, name='a', local=TRUE)
b <- delayed(function(x) { 10*x }, args=list(a), name='b', local=TRUE)
c <- delayed(function(x) { 100*x }, name='c', local=TRUE)
delayed_args(c) <- list(b)
d <- delayed(function(...) { sum(...) }, args=list(b,c), name='d', local=TRUE)
o <- compute(d, namespace=namespace, verbose=TRUE)
print(o)
with output
[1] 990
This is a simple convenience wrapper connecting delayed,
as above, and TileDB Cloud SQL queries as described in the SQL
vignette.
a <- delayed_sql(
namespace='your-namespace',
query="select `rows`, AVG(a) as avg_a from `tiledb://TileDB-Inc/quickstart_dense` GROUP BY `rows`",
name="rows-query")
o <- compute(a, namespace=namespace, verbose=TRUE)
print(o)
with output
avg_a rows
1 2.5000 1
2 6.5000 2
3 10.5000 3
4 14.5000 4
This too is a simple convenience wrapper connecting
delayed and generic UDFs as described in the UDFs
vignette.
a <- delayed_generic_udf(
namespace='your-namespace',
udf=function(vec, exponent) {
sum(vec ** exponent)
},
name='my-generic'
)
delayed_args(a) <- list(vec=1:10, exponent=3)
print(compute(a, namespace=namespaceToCharge))
with output
[1]
3025
This is another simple convenience wrapper connecting
delayed and array UDFs as described in the UDFs
vignette.
a <- delayed_array_udf(
namespace='your-namespace'
array="TileDB-Inc/quickstart_dense",
udf=function(df) {
vec <- as.vector(df[["a"]])
list(min=min(vec), med=median(vec), max=max(vec))
},
selectedRanges=list(cbind(1,2), cbind(1,2)),
attrs=c("a")
)
o <- compute(a, namespace=namespace, verbose=TRUE)
print(o)
with output
$min
[1] 1
$med
[1] 3.5
$max
[1] 6
Nodes, be they from delayed, delayed_sql,
delayed_array_udf, or delayed_multi_array_udf,
can be connected together into directed acyclic graphs. (If you
construct a graph with a cyclic dependency, you’ll get an error message
promptly.)
Here’s an example:
# Build several delayed objects to define a graph.
# Locally executed; simple enough.
local = delayed(function(x) { x*2 }, local=TRUE)
delayed_args(local) <- list(100)
# Array UDF -- we specify selected ranges and attributes, then do some R on the
# dataframe which the UDF receives.
array_apply <- delayed_array_udf(
namespace=namespace, # namespace to charge
array="TileDB-Inc/quickstart_dense",
udf=function(df) { sum(as.vector(df[["a"]])) },
selectedRanges=list(cbind(1,4), cbind(1,4)),
attrs=c("a")
)
# SQL -- note the output is a dataframe, and values are all strings (MariaDB
# "decimal values") so we'll cast them to numeric later.
sql = delayed_sql(
namespace=namespace,
"select SUM(`a`) as a from `tiledb://TileDB-Inc/quickstart_dense`",
name="sql"
)
# Custom function for averaging all the results we are passing in
ourmean <- function(local, array_apply, sql) {
mean(c(local, array_apply, sql))
}
# This is essentially a task graph that looks like
# ourmean
# / | \
# / | \
# local array_apply sql
#
# The `local`, `array_apply` and `sql` tasks will computed first,
# and once all three are finished, `ourmean` will computed on their results.
# Note here we slot out the ansswer from the SQL dataframe using `[[...]]`,
# and also cast to numeric.
res <- delayed(ourmean, args=list(local, array_apply, as.numeric(sql[["a"]])))
print(compute(res, namespace=namespace, verbose=TRUE))
with output
[1] 168
In the ideal case, we connect together some computations and they run
correctly the first time. In other cases, we need to inspect a bit. The
TileDB-Cloud-R package offers a few different ways to do
this. Let’s revisit the simple “diamond” example from above, a few
different ways.
namespace <- 'your-namespace'
a <- delayed(function() { 9 })
b <- delayed(function(x) { 10*x }, args=list(a))
c <- delayed(function(x) { 100*x }, args=list(a))
d <- delayed(function(...) { sum(...)}, args=list(b,c))
o <- compute(d, namespace=namespace)
print(o)
This is as above, except now we execute remotely.
This outputs
[1] 990
For more detail we can add display names to each node (the default
naming is things like n000005, which is less intuitive),
and also use the verbose option to compute. In
an R CLI session, this will live-update as the DAG runs; in a notebook,
you’ll see output when the DAG completes.
namespace <- 'your-namespace'
a <- delayed(function() { 9 }, name='a')
b <- delayed(function(x) { 10*x }, args=list(a), name='b')
c <- delayed(function(x) { 100*x }, args=list(a), name='c')
d <- delayed(function(...) { sum(...) }, args=list(b,c), name='d')
o <- compute(d, namespace=namespace, verbose=TRUE)
print(o)
with output like
All nodes: (4) a, b, c, d
Initial nodes: (1) a
Terminal node: (1) d
Dependencies:
a (0)
b (1) a
c (1) a
d (2) b, c
Statuses:
a args_ready=TRUE status=NOT_STARTED
b args_ready=FALSE status=NOT_STARTED
c args_ready=FALSE status=NOT_STARTED
d args_ready=FALSE status=NOT_STARTED
1644869786 2022-02-14 15:16:26 START a
1644869786 2022-02-14 15:16:26 launch remote compute a
1644869789 2022-02-14 15:16:29 finish remote compute a
1644869789 2022-02-14 15:16:29 END a
1644869789 2022-02-14 15:16:29 START c
1644869789 2022-02-14 15:16:29 START b
1644869789 2022-02-14 15:16:29 launch remote compute c
1644869791 2022-02-14 15:16:31 finish remote compute c
1644869791 2022-02-14 15:16:31 END c
1644869789 2022-02-14 15:16:29 launch remote compute b
1644869791 2022-02-14 15:16:31 finish remote compute b
1644869791 2022-02-14 15:16:31 END b
1644869791 2022-02-14 15:16:31 START d
1644869791 2022-02-14 15:16:31 launch remote compute d
1644869794 2022-02-14 15:16:34 finish remote compute d
1644869794 2022-02-14 15:16:34 END d
All nodes: (4) a, b, c, d
Initial nodes: (1) a
Terminal node: (1) d
Dependencies:
a (0)
b (1) a
c (1) a
d (2) b, c
Statuses:
a args_ready=TRUE status=COMPLETED
b args_ready=TRUE status=COMPLETED
c args_ready=TRUE status=COMPLETED
d args_ready=TRUE status=COMPLETED
[1] 990
Here we have a print of the DAG; then launch/finish of nodes in dependency order; then another print of the DAG; then finally the output as before.
Here the nodes run locally. Print statements within the nodes go to the terminal.
a <- delayed(function() { cat("NODE A\n"); 9 }, name='a')
b <- delayed(function(x) { cat("NODE B\n"); 10*x }, args=list(a), name='b')
c <- delayed(function(x) { cat("NODE C\n"); 100*x }, args=list(a), name='c')
d <- delayed(function(...) { cat("NODE D\n"); sum(...) }, args=list(b,c), name='d')
o <- compute(d, namespace=namespace, verbose=TRUE, force_all_local=TRUE)
print(o)
with output
All nodes: (4) a, b, c, d
Initial nodes: (1) a
Terminal node: (1) d
Dependencies:
a (0)
b (1) a
c (1) a
d (2) b, c
Statuses:
a args_ready=TRUE status=NOT_STARTED
b args_ready=FALSE status=NOT_STARTED
c args_ready=FALSE status=NOT_STARTED
d args_ready=FALSE status=NOT_STARTED
1644869845 2022-02-14 15:17:25 START a
1644869845 2022-02-14 15:17:25 launch local compute a
NODE A
1644869845 2022-02-14 15:17:25 finish local compute a
1644869845 2022-02-14 15:17:25 END a
1644869845 2022-02-14 15:17:25 START b
1644869845 2022-02-14 15:17:25 START c
1644869845 2022-02-14 15:17:25 launch local compute b
NODE B
1644869845 2022-02-14 15:17:25 finish local compute b
1644869845 2022-02-14 15:17:25 END b
1644869845 2022-02-14 15:17:25 launch local compute c
NODE C
1644869845 2022-02-14 15:17:25 finish local compute c
1644869845 2022-02-14 15:17:25 END c
1644869845 2022-02-14 15:17:25 START d
1644869845 2022-02-14 15:17:25 launch local compute d
NODE D
1644869845 2022-02-14 15:17:25 finish local compute d
1644869845 2022-02-14 15:17:25 END d
All nodes: (4) a, b, c, d
Initial nodes: (1) a
Terminal node: (1) d
Dependencies:
a (0)
b (1) a
c (1) a
d (2) b, c
Statuses:
a args_ready=TRUE status=COMPLETED
b args_ready=TRUE status=COMPLETED
c args_ready=TRUE status=COMPLETED
d args_ready=TRUE status=COMPLETED
> print(o)
[1] 990
Here the nodes run not only locally, but with no concurrency, and all prints go directly to the terminal where you can see them right away:
a <- delayed(function() { cat("NODE A\n"); 9 }, name='a')
b <- delayed(function(x) { cat("NODE B\n"); 10*x }, args=list(a), name='b')
c <- delayed(function(x) { cat("NODE C\n"); 100*x }, args=list(a), name='c')
d <- delayed(function(...) { cat("NODE D\n"); sum(...) }, args=list(b,c), name='d')
o <- d$compute_sequentially()
print(o)
NODE A
NODE B
NODE C
NODE D
[1] 990
In this (admittedly artificial) example we show partial progress through a task graph, then a retry and continue.
> a <- delayed(function() { 9 }, name='a')
> b <- delayed(function(x) { stop("the train!") }, args=list(a), name='b') # Intentional error
> c <- delayed(function(x) { 100*x }, args=list(a), name='c')
> d <- delayed(function(...) { sum(...) }, args=list(b,c), name='d')
> o <- d$compute(namespace=namespace)
Error in arg$poll(namespace = namespace, verbose = verbose, force_local = force_local) :
node failed: b: tiledbcloud: received error response: Server returned 500 response status code. Message: Error message: received an error from the container: docker container exited with non-zero code: 1
Docker logs:
Arguments file name: /dev/shm/tiledb_da86fa44-c1d7-4831-828d-0790844da970/args
Error in (function (x) : the train!
Calls: main -> compute_result -> do.call -> <Anonymous>
Execution halted
> show(d)
node=d,nargs=2Error in self$args_ready() : dependency has failed
> show(d$dag_for_terminal)
All nodes: (4) a, b, c, d
Initial nodes: (1) a
Terminal node: (1) d
Dependencies:
a (0)
b (1) a
c (1) a
d (2) b, c
Statuses:
a args_ready=TRUE status=COMPLETED
b args_ready=TRUE status=FAILED
c args_ready=TRUE status=RUNNING
d Error in self$args_ready() : dependency has failed
Fix the artificial error:
> b$func <- function(x) { 10*x }
> o <- d$compute(namespace=namespace)
> print(o)
[1] 990
> show(d$dag_for_terminal)
All nodes: (4) a, b, c, d
Initial nodes: (1) a
Terminal node: (1) d
Dependencies:
a (0)
b (1) a
c (1) a
d (2) b, c
Statuses:
a args_ready=TRUE status=COMPLETED
b args_ready=TRUE status=COMPLETED
c args_ready=TRUE status=COMPLETED
d args_ready=TRUE status=COMPLETED
>