The delayed
function constructs a “node” object for a
task graph. It’s essentially an R function, to be executed later, with a
few more items:
name
local=TRUE
to execute the function on the
current host – the default is to execute on TileDB Cloud.args=...
in delayed
, or, via a separate call
to delayed_args
.compute
function executes the delayed
functions.a <- delayed(function() { 9 }, name='a', local=TRUE)
b <- delayed(function(x) { 10*x }, args=list(a), name='b', local=TRUE)
c <- delayed(function(x) { 100*x }, name='c', local=TRUE)
delayed_args(c) <- list(b)
d <- delayed(function(...) { sum(...) }, args=list(b,c), name='d', local=TRUE)
o <- compute(d, namespace=namespace, verbose=TRUE)
print(o)
with output
[1] 990
This is a simple convenience wrapper connecting delayed
,
as above, and TileDB Cloud SQL queries as described in the SQL
vignette.
a <- delayed_sql(
namespace='your-namespace',
query="select `rows`, AVG(a) as avg_a from `tiledb://TileDB-Inc/quickstart_dense` GROUP BY `rows`",
name="rows-query")
o <- compute(a, namespace=namespace, verbose=TRUE)
print(o)
with output
avg_a rows
1 2.5000 1
2 6.5000 2
3 10.5000 3
4 14.5000 4
This too is a simple convenience wrapper connecting
delayed
and generic UDFs as described in the UDFs
vignette.
a <- delayed_generic_udf(
namespace='your-namespace',
udf=function(vec, exponent) {
sum(vec ** exponent)
},
name='my-generic'
)
delayed_args(a) <- list(vec=1:10, exponent=3)
print(compute(a, namespace=namespaceToCharge))
with output
[1]
3025
This is another simple convenience wrapper connecting
delayed
and array UDFs as described in the UDFs
vignette.
a <- delayed_array_udf(
namespace='your-namespace'
array="TileDB-Inc/quickstart_dense",
udf=function(df) {
vec <- as.vector(df[["a"]])
list(min=min(vec), med=median(vec), max=max(vec))
},
selectedRanges=list(cbind(1,2), cbind(1,2)),
attrs=c("a")
)
o <- compute(a, namespace=namespace, verbose=TRUE)
print(o)
with output
$min
[1] 1
$med
[1] 3.5
$max
[1] 6
Nodes, be they from delayed
, delayed_sql
,
delayed_array_udf
, or delayed_multi_array_udf
,
can be connected together into directed acyclic graphs. (If you
construct a graph with a cyclic dependency, you’ll get an error message
promptly.)
Here’s an example:
# Build several delayed objects to define a graph.
# Locally executed; simple enough.
local = delayed(function(x) { x*2 }, local=TRUE)
delayed_args(local) <- list(100)
# Array UDF -- we specify selected ranges and attributes, then do some R on the
# dataframe which the UDF receives.
array_apply <- delayed_array_udf(
namespace=namespace, # namespace to charge
array="TileDB-Inc/quickstart_dense",
udf=function(df) { sum(as.vector(df[["a"]])) },
selectedRanges=list(cbind(1,4), cbind(1,4)),
attrs=c("a")
)
# SQL -- note the output is a dataframe, and values are all strings (MariaDB
# "decimal values") so we'll cast them to numeric later.
sql = delayed_sql(
namespace=namespace,
"select SUM(`a`) as a from `tiledb://TileDB-Inc/quickstart_dense`",
name="sql"
)
# Custom function for averaging all the results we are passing in
ourmean <- function(local, array_apply, sql) {
mean(c(local, array_apply, sql))
}
# This is essentially a task graph that looks like
# ourmean
# / | \
# / | \
# local array_apply sql
#
# The `local`, `array_apply` and `sql` tasks will computed first,
# and once all three are finished, `ourmean` will computed on their results.
# Note here we slot out the ansswer from the SQL dataframe using `[[...]]`,
# and also cast to numeric.
res <- delayed(ourmean, args=list(local, array_apply, as.numeric(sql[["a"]])))
print(compute(res, namespace=namespace, verbose=TRUE))
with output
[1] 168
In the ideal case, we connect together some computations and they run
correctly the first time. In other cases, we need to inspect a bit. The
TileDB-Cloud-R
package offers a few different ways to do
this. Let’s revisit the simple “diamond” example from above, a few
different ways.
namespace <- 'your-namespace'
a <- delayed(function() { 9 })
b <- delayed(function(x) { 10*x }, args=list(a))
c <- delayed(function(x) { 100*x }, args=list(a))
d <- delayed(function(...) { sum(...)}, args=list(b,c))
o <- compute(d, namespace=namespace)
print(o)
This is as above, except now we execute remotely.
This outputs
[1] 990
For more detail we can add display names to each node (the default
naming is things like n000005
, which is less intuitive),
and also use the verbose
option to compute
. In
an R CLI session, this will live-update as the DAG runs; in a notebook,
you’ll see output when the DAG completes.
namespace <- 'your-namespace'
a <- delayed(function() { 9 }, name='a')
b <- delayed(function(x) { 10*x }, args=list(a), name='b')
c <- delayed(function(x) { 100*x }, args=list(a), name='c')
d <- delayed(function(...) { sum(...) }, args=list(b,c), name='d')
o <- compute(d, namespace=namespace, verbose=TRUE)
print(o)
with output like
All nodes: (4) a, b, c, d
Initial nodes: (1) a
Terminal node: (1) d
Dependencies:
a (0)
b (1) a
c (1) a
d (2) b, c
Statuses:
a args_ready=TRUE status=NOT_STARTED
b args_ready=FALSE status=NOT_STARTED
c args_ready=FALSE status=NOT_STARTED
d args_ready=FALSE status=NOT_STARTED
1644869786 2022-02-14 15:16:26 START a
1644869786 2022-02-14 15:16:26 launch remote compute a
1644869789 2022-02-14 15:16:29 finish remote compute a
1644869789 2022-02-14 15:16:29 END a
1644869789 2022-02-14 15:16:29 START c
1644869789 2022-02-14 15:16:29 START b
1644869789 2022-02-14 15:16:29 launch remote compute c
1644869791 2022-02-14 15:16:31 finish remote compute c
1644869791 2022-02-14 15:16:31 END c
1644869789 2022-02-14 15:16:29 launch remote compute b
1644869791 2022-02-14 15:16:31 finish remote compute b
1644869791 2022-02-14 15:16:31 END b
1644869791 2022-02-14 15:16:31 START d
1644869791 2022-02-14 15:16:31 launch remote compute d
1644869794 2022-02-14 15:16:34 finish remote compute d
1644869794 2022-02-14 15:16:34 END d
All nodes: (4) a, b, c, d
Initial nodes: (1) a
Terminal node: (1) d
Dependencies:
a (0)
b (1) a
c (1) a
d (2) b, c
Statuses:
a args_ready=TRUE status=COMPLETED
b args_ready=TRUE status=COMPLETED
c args_ready=TRUE status=COMPLETED
d args_ready=TRUE status=COMPLETED
[1] 990
Here we have a print of the DAG; then launch/finish of nodes in dependency order; then another print of the DAG; then finally the output as before.
Here the nodes run locally. Print statements within the nodes go to the terminal.
a <- delayed(function() { cat("NODE A\n"); 9 }, name='a')
b <- delayed(function(x) { cat("NODE B\n"); 10*x }, args=list(a), name='b')
c <- delayed(function(x) { cat("NODE C\n"); 100*x }, args=list(a), name='c')
d <- delayed(function(...) { cat("NODE D\n"); sum(...) }, args=list(b,c), name='d')
o <- compute(d, namespace=namespace, verbose=TRUE, force_all_local=TRUE)
print(o)
with output
All nodes: (4) a, b, c, d
Initial nodes: (1) a
Terminal node: (1) d
Dependencies:
a (0)
b (1) a
c (1) a
d (2) b, c
Statuses:
a args_ready=TRUE status=NOT_STARTED
b args_ready=FALSE status=NOT_STARTED
c args_ready=FALSE status=NOT_STARTED
d args_ready=FALSE status=NOT_STARTED
1644869845 2022-02-14 15:17:25 START a
1644869845 2022-02-14 15:17:25 launch local compute a
NODE A
1644869845 2022-02-14 15:17:25 finish local compute a
1644869845 2022-02-14 15:17:25 END a
1644869845 2022-02-14 15:17:25 START b
1644869845 2022-02-14 15:17:25 START c
1644869845 2022-02-14 15:17:25 launch local compute b
NODE B
1644869845 2022-02-14 15:17:25 finish local compute b
1644869845 2022-02-14 15:17:25 END b
1644869845 2022-02-14 15:17:25 launch local compute c
NODE C
1644869845 2022-02-14 15:17:25 finish local compute c
1644869845 2022-02-14 15:17:25 END c
1644869845 2022-02-14 15:17:25 START d
1644869845 2022-02-14 15:17:25 launch local compute d
NODE D
1644869845 2022-02-14 15:17:25 finish local compute d
1644869845 2022-02-14 15:17:25 END d
All nodes: (4) a, b, c, d
Initial nodes: (1) a
Terminal node: (1) d
Dependencies:
a (0)
b (1) a
c (1) a
d (2) b, c
Statuses:
a args_ready=TRUE status=COMPLETED
b args_ready=TRUE status=COMPLETED
c args_ready=TRUE status=COMPLETED
d args_ready=TRUE status=COMPLETED
> print(o)
[1] 990
Here the nodes run not only locally, but with no concurrency, and all prints go directly to the terminal where you can see them right away:
a <- delayed(function() { cat("NODE A\n"); 9 }, name='a')
b <- delayed(function(x) { cat("NODE B\n"); 10*x }, args=list(a), name='b')
c <- delayed(function(x) { cat("NODE C\n"); 100*x }, args=list(a), name='c')
d <- delayed(function(...) { cat("NODE D\n"); sum(...) }, args=list(b,c), name='d')
o <- d$compute_sequentially()
print(o)
NODE A
NODE B
NODE C
NODE D
[1] 990
In this (admittedly artificial) example we show partial progress through a task graph, then a retry and continue.
> a <- delayed(function() { 9 }, name='a')
> b <- delayed(function(x) { stop("the train!") }, args=list(a), name='b') # Intentional error
> c <- delayed(function(x) { 100*x }, args=list(a), name='c')
> d <- delayed(function(...) { sum(...) }, args=list(b,c), name='d')
> o <- d$compute(namespace=namespace)
Error in arg$poll(namespace = namespace, verbose = verbose, force_local = force_local) :
node failed: b: tiledbcloud: received error response: Server returned 500 response status code. Message: Error message: received an error from the container: docker container exited with non-zero code: 1
Docker logs:
Arguments file name: /dev/shm/tiledb_da86fa44-c1d7-4831-828d-0790844da970/args
Error in (function (x) : the train!
Calls: main -> compute_result -> do.call -> <Anonymous>
Execution halted
> show(d)
node=d,nargs=2Error in self$args_ready() : dependency has failed
> show(d$dag_for_terminal)
All nodes: (4) a, b, c, d
Initial nodes: (1) a
Terminal node: (1) d
Dependencies:
a (0)
b (1) a
c (1) a
d (2) b, c
Statuses:
a args_ready=TRUE status=COMPLETED
b args_ready=TRUE status=FAILED
c args_ready=TRUE status=RUNNING
d Error in self$args_ready() : dependency has failed
Fix the artificial error:
> b$func <- function(x) { 10*x }
> o <- d$compute(namespace=namespace)
> print(o)
[1] 990
> show(d$dag_for_terminal)
All nodes: (4) a, b, c, d
Initial nodes: (1) a
Terminal node: (1) d
Dependencies:
a (0)
b (1) a
c (1) a
d (2) b, c
Statuses:
a args_ready=TRUE status=COMPLETED
b args_ready=TRUE status=COMPLETED
c args_ready=TRUE status=COMPLETED
d args_ready=TRUE status=COMPLETED
>