Apache Arrow R 6.0.0 Release
Published
08 Nov 2021
By
Nic Crane, Jonathan Keane, Neal Richardson
We are excited to announce the recent release of version 6.0.0 of the Arrow R package on CRAN. While we usually don’t write a dedicated release blog post for the R package, this one is special. There are a number of major new features in this version, some of which we’ve been building up to for several years.
More dplyr support
In version 0.16.0 (February 2020), we released the first version of the Dataset feature, which allowed you to query multi-file datasets using dplyr::select()
and filter()
. These tools allowed you to find a slice of data in a large dataset that may not fit into memory and pull it into R for further analysis. In version 4.0.0 earlier this year, we added support for mutate()
and a number of other dplyr verbs, and all year we’ve been adding hundreds of functions you can use to transform and filter data in Datasets. However, to aggregate, you’d still need to pull the data into R.
Grouped aggregation
With arrow
6.0.0, you can now summarise()
on Arrow data, both with or without group_by()
. These are supported both with in-memory Arrow tables as well as across partitioned datasets. Most common aggregation functions are supported: n()
, n_distinct()
, min(),
max()
, sum()
, mean()
, var()
, sd()
, any()
, and all()
. median()
and quantile()
with one probability are also supported and currently return approximate results using the t-digest algorithm.
As usual, Arrow will read and process data in chunks and in parallel when possible to produce results much faster than one could by loading it all into memory then processing. This allows for operations that wouldn’t fit into memory on a single machine. For example, using the 1.5-billion row NYC Taxi dataset we use for examples in the package vignette, we can aggregate over the whole dataset even on a laptop:
ds <- open_dataset("nyc-taxi", partitioning = c("year", "month"))
ds %>%
filter(
passenger_count > 0,
passenger_count < 6,
grepl("csh", payment_type, ignore.case = TRUE)
) %>%
group_by(passenger_count) %>%
summarize(
avg = mean(total_amount, na.rm = TRUE),
count = n()
) %>%
arrange(desc(count)) %>%
collect()
#> # A tibble: 5 × 3
#> passenger_count avg count
#> <int> <dbl> <int>
#> 1 1 11.1 257738064
#> 2 2 12.1 58824482
#> 3 5 11.4 26056438
#> 4 3 12.0 18852606
#> 5 4 12.3 10081632
Joins
In addition to aggregation, Arrow also supports all of dplyr’s mutating joins (inner, left, right, and full) and filtering joins (semi and anti).
Suppose I want to get a table of all the flights from JFK to Las Vegas Airport on 9th October 2013, with the full name of the airline included.
arrow_table(nycflights13::flights) %>%
filter(
year == 2013,
month == 10,
day == 9,
origin == "JFK",
dest == "LAS"
) %>%
select(dep_time, arr_time, carrier) %>%
left_join(
arrow_table(nycflights13::airlines)
) %>%
collect()
#> # A tibble: 12 × 4
#> dep_time arr_time carrier name
#> <int> <int> <chr> <chr>
#> 1 637 853 B6 JetBlue Airways
#> 2 648 912 AA American Airlines Inc.
#> 3 812 1029 DL Delta Air Lines Inc.
#> 4 945 1206 VX Virgin America
#> 5 955 1219 B6 JetBlue Airways
#> 6 1018 1231 DL Delta Air Lines Inc.
#> 7 1120 1338 B6 JetBlue Airways
#> 8 1451 1705 DL Delta Air Lines Inc.
#> 9 1656 1915 AA American Airlines Inc.
#> 10 1755 2001 DL Delta Air Lines Inc.
#> 11 1827 2049 B6 JetBlue Airways
#> 12 1917 2126 DL Delta Air Lines Inc.
In this example, we’re working on an in-memory table, so you wouldn’t need arrow
to do this–but the same code would work on a larger-than-memory dataset backed by thousands of Parquet files.
Under the hood
To support these features, we’ve made some internal changes to how queries are built up and–importantly–when they are evaluated. As a result, there are some changes in behavior compared to past versions of arrow
.
First, calls to summarise()
, head()
, and tail()
no longer eagerly evaluate: this means you need to call either compute()
(to evaluate it and produce an Arrow Table) or collect()
(to evaluate and pull the Table into an R data.frame
) to see the results.
Second, the order of rows in a dataset query is no longer determinisitic due to the way the parallelization of work happens in the C++ library. This means that you can’t assume that the results of a query will be in the same order as the rows of data in the files on disk. If you do need a stable sort order, call arrange()
to specify ordering.
While these changes are a break from past arrow
behavior, they are consistent with many dbplyr
backends and are needed to allow queries to scale beyond data-frame workflows that can fit into memory.
Integration with DuckDB
The Arrow engine is not the only new way to query Arrow Datasets in this release. If you have the duckdb package installed, you can hand off an Arrow Dataset or query object to DuckDB for further querying using the to_duckdb()
function. This allows you to use duckdb’s dbplyr
methods, as well as its SQL interface, to aggregate data. DuckDB supports filter pushdown, so you can take advantage of Arrow Datasets and Arrow-based optimizations even within a DuckDB SQL query with a where
clause. Filtering and column projection specified before the to_duckdb()
call in a pipeline is evaluated in Arrow; this can be helpful in some circumstances like complicated dbplyr pipelines. You can also hand off DuckDB data (or the result of a query) to arrow with the to_arrow()
call.
In the example below, we are looking at flights between NYC and Chicago, and want to avoid the worst-of-the-worst delays. To do this, we can use percent_rank()
; however that requires a window function which isn’t yet available in Arrow, so let’s try sending the data to DuckDB to do that, then pull it back into Arrow:
library(arrow, warn.conflicts = FALSE)
library(dplyr, warn.conflicts = FALSE)
flights_filtered <- arrow_table(nycflights13::flights) %>%
select(carrier, origin, dest, arr_delay) %>%
# arriving early doesn't matter, so call negative delays 0
mutate(arr_delay = pmax(arr_delay, 0)) %>%
to_duckdb() %>%
# for each carrier-origin-dest, take the worst 5% of delays
group_by(carrier, origin, dest) %>%
mutate(arr_delay_rank = percent_rank(arr_delay)) %>%
filter(arr_delay_rank > 0.95)
head(flights_filtered)
#> # Source: lazy query [?? x 5]
#> # Database: duckdb_connection
#> # Groups: carrier, origin, dest
#> carrier origin dest arr_delay arr_delay_rank
#> <chr> <chr> <chr> <dbl> <dbl>
#> 1 9E JFK RIC 119 0.952
#> 2 9E JFK RIC 125 0.956
#> 3 9E JFK RIC 137 0.960
#> 4 9E JFK RIC 137 0.960
#> 5 9E JFK RIC 158 0.968
#> 6 9E JFK RIC 163 0.972
Now we have all of the flights filtered to those that are the worst-of-the-worst, and stored as a dbplyr lazy tbl
with our DuckDB connection. This is an example of using Arrow -> DuckDB.
But we can do more: we can then bring that data back into Arrow just as easily. For the rest of our analysis, we pick up where we left off with the tbl
referring to the DuckDB query:
# pull data back into arrow to complete analysis
flights_filtered %>%
to_arrow() %>%
# now summarise to get mean/min
group_by(carrier, origin, dest) %>%
summarise(
arr_delay_mean = mean(arr_delay),
arr_delay_min = min(arr_delay),
num_flights = n()
) %>%
filter(dest %in% c("ORD", "MDW")) %>%
arrange(desc(arr_delay_mean)) %>%
collect()
#> # A tibble: 10 × 6
#> # Groups: carrier, origin [10]
#> carrier origin dest arr_delay_mean arr_delay_min num_flights
#> <chr> <chr> <chr> <dbl> <dbl> <int>
#> 1 MQ EWR ORD 190. 103 113
#> 2 9E JFK ORD 185. 134 52
#> 3 UA LGA ORD 179. 101 157
#> 4 WN LGA MDW 178. 107 103
#> 5 AA JFK ORD 178. 133 19
#> 6 B6 JFK ORD 174. 129 46
#> 7 WN EWR MDW 167. 107 103
#> 8 UA EWR ORD 149. 87 189
#> 9 AA LGA ORD 135. 78 280
#> 10 EV EWR ORD 35 35 1
And just like that, we’ve passed data back and forth between Arrow and DuckDB without having to write a single file to disk!
Expanded use of ALTREP
We are continuing our use of R’s ALTREP where possible. In 5.0.0 there were a limited set of circumstances that took advantage of ALTREP, but in 6.0.0 we have expanded types to include strings, as well as vectors with NA
s.
library(microbenchmark)
library(arrow)
tbl <-
arrow_table(data.frame(
x = rnorm(10000000),
y = sample(c(letters, NA), 10000000, replace = TRUE)
))
with_altrep <- function(data){
options(arrow.use_altrep = TRUE)
as.data.frame(data)
}
without_altrep <- function(data){
options(arrow.use_altrep = FALSE)
as.data.frame(data)
}
microbenchmark(
without_altrep(tbl),
with_altrep(tbl)
)
#> Unit: milliseconds
#> expr min lq mean median uq max neval
#> without_altrep(tbl) 191.0788 213.82235 249.65076 225.52120 244.26977 512.1652 100
#> with_altrep(tbl) 48.7152 50.97269 65.56832 52.93795 55.24505 338.4602 100
Airgapped installation on Linux
With every release, we continue to improve the installation experience on Linux. Unlike macOS and Windows, CRAN does not host binary packages for Linux, and unless you’re using a service like RStudio Package Manger that hosts binaries, you have to build arrow
from source. Because Arrow involves a large C++ project, this can be slow and sensitive to differences in build environments. To ensure a reliable installation experience, we work hard to test on a wide range of platforms and configurations and eagerly seek to simplify the process so that install.packages("arrow")
just works and you don’t have to think about it.
A big improvement in 6.0.0 is that arrow
can now install in a fully offline mode. The R package now includes the C++ source, so it does not need to be downloaded at build time. This does not include optional dependencies like compression libraries, the AWS SDK for accessing data in S3, and more. For folks who need to install Arrow on an airgapped server with all of those features, we have included a helper function to download and assemble a “fat” pacakge that contains everything that would be downloaded lazily at build time.
The function create_package_with_all_dependencies()
can be run from a computer that does have access to the internet, and creates a fat-source package which can then be transferred and installed on a server without connectivity. This helper is also available on GitHub without installing the arrow package. For more installation see the docs.
Another installation change is that we’ve changed the source build to fail cleanly if the C++ library is not found or cannot be built. Previously, if the C++ library failed to build, you would get a successful R package installation, but the package wouldn’t do anything useful, it would just tell you to reinstall. This was helpful back in the early days of the package when we weren’t confident it would build everywhere that CRAN checked, but we now have much more experience (and extensive testing). In recent months this failure mode caused more confusion than it was worth, and it led many people to think that after you install arrow, you always have to install_arrow()
again.
Thanks
This is a significant milestone for Arrow, and the R package specifically, and there is much gratitude to go around. In the 6.0.0 release, there were 77 individuals who contributed to Arrow, many of whom did the heavy lifting in the C++ library to make the new dataset query features a reality. Specifically in the R package, we wanted to acknowledge Phillip Cloud, Dewey Dunnington, Dragoș Moldovan-Grünfeld, Matt Peterson, and Percy Camilo Triveño Aucahuasi for their their first contributions to the R package. And a special thanks goes to Karl Dunkle Werner for the hard work on the offline package build!
We also want to thank you in advance for your help. For this release of the Arrow query engine, we’ve focused our effort on getting the core functionality implemented. (In fact, this first release is something of an R-exclusive: bindings for these features haven’t yet been added to pyarrow, the Python Arrow library!) By focusing on the essentials, it means that there are a number of performance optimizations we plan to do but didn’t have time for in this release–and there are surely more issues to improve that we don’t yet know. We are eager for your feedback: please let us know of any issues you encounter so that we can improve these for our next release.