Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Memory consumption with collect() #434

Open
larry77 opened this issue Jan 1, 2025 · 19 comments
Open

Memory consumption with collect() #434

larry77 opened this issue Jan 1, 2025 · 19 comments
Labels
duckdb 🦆 Issues where work in the duckb package is needed
Milestone

Comments

@larry77
Copy link

larry77 commented Jan 1, 2025

Hello,
Please have a look at the reprex at the end of the file.
I have a seasoned laptop with 8Gb of RAM which runs debian stable.
When I carry out an aggregation and then a left join with arrow, I need to use some swap when I collect the result (a long dataframe), but I can run the computation.
Instead, if just run the part in duckplyr (commented out in the second part of the reprex), my memory is so insufficient that the laptop freezes. Can someone take a look into this? Is there a much higher memory consumption in duckplyr wrt arrow? Thanks a lot.

library(tidyverse)

library(arrow)
#> 
#> Attaching package: 'arrow'
#> The following object is masked from 'package:lubridate':
#> 
#>     duration
#> The following object is masked from 'package:utils':
#> 
#>     timestamp


# Uncomment and run this only once
## dd <- tibble(x=1:100000000, y=rep(LETTERS[1:20], 5000000))


## write_csv(dd, "test.csv")




df2  <- open_dataset("test.csv",
  format = "csv",
  skip_rows = 0)

system.time({
    df_stat2 <- df2 |>
        group_by(y) |> 
        summarise(total=sum(x)) |>
        ungroup()

    df_out2 <- df2 |>
        left_join(y=df_stat2, by=c("y")) |>
        collect()

    
})
#>    user  system elapsed 
#>  36.892   5.439  26.582


df_out2|>glimpse()
#> Rows: 100,000,000
#> Columns: 3
#> $ x     <int> 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 1…
#> $ y     <chr> "A", "B", "C", "D", "E", "F", "G", "H", "I", "J", "K", "L", "M",…
#> $ total <int64> 249999955000000, 249999960000000, 249999965000000, 24999997000…


## uncomment to run --this takes a lot of memory on my system

## library(duckplyr)


## df <- duck_csv("test.csv")

## system.time({
## df_stat <- df |>
##     summarise(total=sum(x), .by = y) 



## df_out <- df |>
##     left_join(y=df_stat,  by=c("y")) |>
##     collect()

## })


sessionInfo()
#> R version 4.4.2 (2024-10-31)
#> Platform: x86_64-pc-linux-gnu
#> Running under: Debian GNU/Linux 12 (bookworm)
#> 
#> Matrix products: default
#> BLAS:   /usr/lib/x86_64-linux-gnu/blas/libblas.so.3.11.0 
#> LAPACK: /usr/lib/x86_64-linux-gnu/lapack/liblapack.so.3.11.0
#> 
#> locale:
#>  [1] LC_CTYPE=en_GB.UTF-8       LC_NUMERIC=C              
#>  [3] LC_TIME=en_GB.UTF-8        LC_COLLATE=en_GB.UTF-8    
#>  [5] LC_MONETARY=en_GB.UTF-8    LC_MESSAGES=en_GB.UTF-8   
#>  [7] LC_PAPER=en_GB.UTF-8       LC_NAME=C                 
#>  [9] LC_ADDRESS=C               LC_TELEPHONE=C            
#> [11] LC_MEASUREMENT=en_GB.UTF-8 LC_IDENTIFICATION=C       
#> 
#> time zone: Europe/Brussels
#> tzcode source: system (glibc)
#> 
#> attached base packages:
#> [1] stats     graphics  grDevices utils     datasets  methods   base     
#> 
#> other attached packages:
#>  [1] arrow_15.0.1    lubridate_1.9.3 forcats_1.0.0   stringr_1.5.1  
#>  [5] dplyr_1.1.4     purrr_1.0.2     readr_2.1.5     tidyr_1.3.1    
#>  [9] tibble_3.2.1    ggplot2_3.5.1   tidyverse_2.0.0
#> 
#> loaded via a namespace (and not attached):
#>  [1] bit_4.0.5          gtable_0.3.5       compiler_4.4.2     reprex_2.1.0      
#>  [5] tidyselect_1.2.1   assertthat_0.2.1   scales_1.3.0       yaml_2.3.8        
#>  [9] fastmap_1.1.1      R6_2.5.1           generics_0.1.3     knitr_1.46        
#> [13] munsell_0.5.1      R.cache_0.16.0     pillar_1.10.1.9000 tzdb_0.4.0        
#> [17] R.utils_2.12.3     rlang_1.1.4        stringi_1.8.4      xfun_0.43         
#> [21] fs_1.6.4           bit64_4.0.5        timechange_0.3.0   cli_3.6.3         
#> [25] withr_3.0.0        magrittr_2.0.3     digest_0.6.35      grid_4.4.2        
#> [29] hms_1.1.3          lifecycle_1.0.4    R.methodsS3_1.8.2  R.oo_1.26.0       
#> [33] vctrs_0.6.5        evaluate_0.23      glue_1.7.0         styler_1.10.3     
#> [37] colorspace_2.1-0   rmarkdown_2.26     tools_4.4.2        pkgconfig_2.0.3   
#> [41] htmltools_0.5.8.1

Created on 2025-01-01 with reprex v2.1.0

@krlmlr krlmlr added this to the 1.0.0 milestone Jan 1, 2025
@krlmlr
Copy link
Member

krlmlr commented Jan 1, 2025

Thanks, we need to work on the docs here.

Can you try:

duck_exec("set memory_limit='1GB'")

or a similar value?

@larry77
Copy link
Author

larry77 commented Jan 1, 2025

Mmmhhh.... I added that line just after loading the duckplyr library, but I still have the RAM consumption going through the roof of my system.

@krlmlr

This comment has been minimized.

@larry77

This comment has been minimized.

@krlmlr
Copy link
Member

krlmlr commented Jan 2, 2025

Thanks, confirming that, on a VM with 4 GB with Debian Bookworm running in OrbStack, the following example is killed:

options(conflicts.policy = list(warn = FALSE))
library(dplyr)
library(duckplyr)
library(readr)


if (!file.exists("test.csv")) {
  dd <- tibble(x=1:100000000, y=rep(LETTERS[1:20], 5000000))

  write_csv(dd, "test.csv")
}

duck_exec("set memory_limit='1GB'")

df <- duck_csv("test.csv")

df_stat <- df |>
    summarise(total=sum(x), .by = y)

df_out <- df |>
    left_join(y=df_stat,  by=c("y")) |>
    collect()

df_out

@krlmlr krlmlr changed the title Memory Consumption when performing a left_join Memory Consumption when performing a left_join() Jan 2, 2025
@larry77
Copy link
Author

larry77 commented Jan 2, 2025

I see. I have 8Gb in my machine and I opened the issue because I can run the code with arrow on my machine (with some difficulties), but not with duckplyr. No intention to start a competition between the two tools, but I assumed there may be a memory leak in duckplyr

@krlmlr

This comment has been minimized.

@larry77
Copy link
Author

larry77 commented Jan 2, 2025

To finish this off on my side: if I kill almost any other process, duckplyr also gets the job done on my machine, but the memory consumption is significantly higher than under arrow.

@krlmlr
Copy link
Member

krlmlr commented Jan 4, 2025

I'm no longer sure that collect() is really the problem here

The following example works, even with 4 GB:

options(conflicts.policy = list(warn = FALSE))
library(dplyr)
library(duckplyr)
library(readr)


if (!file.exists("test.csv")) {
  dd <- tibble(x=1:100000000, y=rep(LETTERS[1:20], 5000000))

  write_csv(dd, "test.csv")
}

duck_exec("set memory_limit='1GB'")

df <- duck_csv("test.csv")

df_stat <- df |>
    summarise(total=sum(x), .by = y)

df_out <-
  df |>
    left_join(y=df_stat,  by=c("y")) |>
    compute_parquet("test.parquet")

df_out

Perhaps you can also use compute() . #439 might be a problem, this also needs a very recent version of duckdb, on CRAN later this month.

The large memory consumption is still interesting, though.

@larry77
Copy link
Author

larry77 commented Jan 4, 2025

Thanks. I now also have the procedure to convert a csv into an parquet file without ingesting everything into memory.

@krlmlr krlmlr changed the title Memory Consumption when performing a left_join() Memory consumption with collect() Jan 17, 2025
@krlmlr
Copy link
Member

krlmlr commented Jan 17, 2025

I looked at this a little closer. With collect(), we need twice as much memory as the data size because we're storing the whole data set both in native DuckDB storage in memory and as a data frame. This could be fixed in the duckdb R package.

A modified, scaled-down version of the example that works with current duckplyr, to illustrate the overhead:

options(conflicts.policy = list(warn = FALSE))
library(dplyr)
library(duckplyr)
#> ✔ Overwriting dplyr methods with duckplyr methods.
#> ℹ Turn off with `duckplyr::methods_restore()`.
library(readr)

res_size_kb <- function() {
  pid_switch <- if (Sys.info()[["sysname"]] == "Darwin") "-p" else "-q"
  cmd_line <- paste0("ps x -o rss ", pid_switch, " ", Sys.getpid())
  as.numeric(system(cmd_line, intern = TRUE)[[2]])
}

N <- 500000

path <- paste0("test-", N, ".csv")

if (!file.exists(path)) {
  dd <- tibble(x = seq.int(N * 20), y = rep(1:20, N))

  write_csv(dd, path)
}

pillar::num(file.size(path), notation = "si")
#> <pillar_num(si)[1]>
#> [1] 98.9M

db_exec("set memory_limit='1GB'")

df <- read_csv_duckdb(path)

df_stat <- df |>
  summarise(total = sum(x), .by = y)

df_join <- df |>
  left_join(y = df_stat, by = c("y"))

res_size_before <- res_size_kb()

df_out <-
  df_join |>
  collect()

# At the very latest, materialization happens here:
rows <- nrow(df_out)

res_size_after <- res_size_kb()

# Approximation, for speed
obj_size_bytes <- object.size(df_out[rep(NA_integer_, 1000), ]) * rows / 1000
obj_size <- as.numeric(obj_size_bytes) / 1024

pillar::num(rows, notation = "si")
#> <pillar_num(si)[1]>
#> [1] 10M

pillar::num(c(res_size_after, res_size_before, obj_size), notation = "si")
#> <pillar_num(si)[3]>
#> [1] 732.k 267.k 245.k
(res_size_after - res_size_before) / obj_size
#> [1] 1.894857

Created on 2025-01-17 with reprex v2.1.1

@krlmlr krlmlr added the duckdb 🦆 Issues where work in the duckb package is needed label Jan 17, 2025
@krlmlr krlmlr modified the milestones: 1.0.0, reldf Jan 17, 2025
@toppyy
Copy link
Contributor

toppyy commented Jan 25, 2025

I did some digging around based @krlmlr's research. To me it seems that we cannot release the query results immediately after materialization. If I've understood correctly, the query result (a set of vectors) is not transformed to it's R-representation immediately after materialization. Instead, vectors are transformed based on demand, ie. when they are touched.

An example:

    df <- duck_csv("test.csv")
    nrow(df)    # 1. materializes the query, duckdb's result object is now available

    head(df$x)  # 2. transforms x into R-representation (see AltrepVectorWrapper::Dataptr())
                # cannot release the query results as df$y has not been transformed yet

    head(df)    # 3. now all columns have been transformed -> the query result can be released

duckdb/duckdb-r#1027 has a rough draft for a way to reduce the memory consumption by releasing the query results when they are no longer needed. The idea is to keeps tabs on how many of the vectors have been translated. And once all are transformed, reset the pointer to the query results.

An alternative approach would be to expose the possibility to reset the pointer with an R-function. This way you could do head(df) within collect() and release_duckdb_results(df) (or smth).

Below is an example of the effect of the the change made in the PR (the memory consumption goes down). The latter reprex includes that logic outlined above, the former does not.

Query results not released (current status)

options(conflicts.policy = list(warn = FALSE))
library(dplyr)
library(duckplyr)
#> The duckplyr package is configured to fall back to dplyr when it encounters an
#> incompatibility. Fallback events can be collected and uploaded for analysis to
#> guide future development. By default, data will be collected but no data will
#> be uploaded.
#> ℹ Automatic fallback uploading is not controlled and therefore disabled, see
#>   `?duckplyr::fallback()`.
#> ✔ Number of reports ready for upload: 4.
#> → Review with `duckplyr::fallback_review()`, upload with
#>   `duckplyr::fallback_upload()`.
#> ℹ Configure automatic uploading with `duckplyr::fallback_config()`.
#> ✔ Overwriting dplyr methods with duckplyr methods.
#> ℹ Turn off with `duckplyr::methods_restore()`.
library(readr)

res_size_kb <- function() {
  pid_switch <- if (Sys.info()[["sysname"]] == "Darwin") "-p" else "-q"
  cmd_line <- paste0("ps x -o rss ", pid_switch, " ", Sys.getpid())
  as.numeric(system(cmd_line, intern = TRUE)[[2]])
}

N <- 3000000

path <- paste0("test-", N, ".csv")

if (!file.exists(path)) {
  dd <- tibble(x = seq.int(N * 20), y = rep(1:20, N))
  write_csv(dd, path)
}

pillar::num(file.size(path), notation = "si")
#> <pillar_num(si)[1]>
#> [1] 682.M

db_exec("set memory_limit='1GB'")

df <- read_csv_duckdb(path)

df_stat <- df |>
  summarise(total = sum(x), .by = y)

df_join <- df |>
  left_join(y = df_stat, by = c("y"))

res_size_before <- res_size_kb()

df_out <-
  df_join |>
  collect()

# At the very latest, materialization happens here:
rows <- nrow(df_out)

# Releases the query result at DuckDB's end because all columns are now transformed to their R-representation
head(df_out) 
#> # A tibble: 6 × 3
#>       x     y   total
#>   <dbl> <dbl>   <dbl>
#> 1     1     1 9.00e13
#> 2     2     2 9.00e13
#> 3     3     3 9.00e13
#> 4     4     4 9.00e13
#> 5     5     5 9.00e13
#> 6     6     6 9.00e13

res_size_after <- res_size_kb()

# Approximation, for speed
obj_size_bytes <- object.size(df_out[rep(NA_integer_, 1000), ]) * rows / 1000
obj_size <- as.numeric(obj_size_bytes) / 1024

print(paste(c(res_size_after, res_size_before)/1e6,'Gb'))
#> [1] "3.977176 Gb" "0.686392 Gb"
pillar::num(c(res_size_after, res_size_before, obj_size), notation = "si")
#> <pillar_num(si)[3]>
#> [1]   3.98M 686.  k   1.47M
print((res_size_after - res_size_before) / obj_size)
#> [1] 2.235777

Created on 2025-01-25 with reprex v2.1.1

Query results released

options(conflicts.policy = list(warn = FALSE))
library(dplyr)
library(duckplyr)
#> The duckplyr package is configured to fall back to dplyr when it encounters an
#> incompatibility. Fallback events can be collected and uploaded for analysis to
#> guide future development. By default, data will be collected but no data will
#> be uploaded.
#> ℹ Automatic fallback uploading is not controlled and therefore disabled, see
#>   `?duckplyr::fallback()`.
#> ✔ Number of reports ready for upload: 4.
#> → Review with `duckplyr::fallback_review()`, upload with
#>   `duckplyr::fallback_upload()`.
#> ℹ Configure automatic uploading with `duckplyr::fallback_config()`.
#> ✔ Overwriting dplyr methods with duckplyr methods.
#> ℹ Turn off with `duckplyr::methods_restore()`.
library(readr)

res_size_kb <- function() {
  pid_switch <- if (Sys.info()[["sysname"]] == "Darwin") "-p" else "-q"
  cmd_line <- paste0("ps x -o rss ", pid_switch, " ", Sys.getpid())
  as.numeric(system(cmd_line, intern = TRUE)[[2]])
}

N <- 3000000

path <- paste0("test-", N, ".csv")

if (!file.exists(path)) {
  dd <- tibble(x = seq.int(N * 20), y = rep(1:20, N))
  write_csv(dd, path)
}

pillar::num(file.size(path), notation = "si")
#> <pillar_num(si)[1]>
#> [1] 682.M

db_exec("set memory_limit='1GB'")

df <- read_csv_duckdb(path)

df_stat <- df |>
  summarise(total = sum(x), .by = y)

df_join <- df |>
  left_join(y = df_stat, by = c("y"))

res_size_before <- res_size_kb()

df_out <-
  df_join |>
  collect()

# At the very latest, materialization happens here:
rows <- nrow(df_out)

# Releases the query result at DuckDB's end because all columns are now transformed to their R-representation
head(df_out) 
#> # A tibble: 6 × 3
#>       x     y   total
#>   <dbl> <dbl>   <dbl>
#> 1     1     1 9.00e13
#> 2     2     2 9.00e13
#> 3     3     3 9.00e13
#> 4     4     4 9.00e13
#> 5     5     5 9.00e13
#> 6     6     6 9.00e13

res_size_after <- res_size_kb()

# Approximation, for speed
obj_size_bytes <- object.size(df_out[rep(NA_integer_, 1000), ]) * rows / 1000
obj_size <- as.numeric(obj_size_bytes) / 1024

print(paste(c(res_size_after, res_size_before)/1e6,'Gb'))
#> [1] "2.983336 Gb" "0.6863 Gb"
pillar::num(c(res_size_after, res_size_before, obj_size), notation = "si")
#> <pillar_num(si)[3]>
#> [1]   2.98M 686.  k   1.47M
print((res_size_after - res_size_before) / obj_size)
#> [1] 1.560619

Created on 2025-01-25 with reprex v2.1.1

@larry77
Copy link
Author

larry77 commented Jan 26, 2025

This now flies on top of my head. If under some circumstances collect() doubles the memory usage, then this is a very serious issue.

@krlmlr
Copy link
Member

krlmlr commented Jan 26, 2025

Thanks for the investigation.

Can we somehow "rip apart" a MaterializedQueryResult and assign ownership of the column data to individual ALTREP objects? Then, as the data is requested from R, we would convert on the fly and free up the DuckDB memory.

@toppyy
Copy link
Contributor

toppyy commented Jan 26, 2025

An interesting idea!

A quick investigation: It seems that MaterializedQueryResult carries the data as data chunks which is a horizontal slice of the table. Each chunk has n vectors representing the columns. An entire column practically always spans over multiple chunks as the default chunk size is 2048 rows.

This makes it harder to assign the ownership & free memory of a specific column. Seems like it would get quite complex or what do you think?

@krlmlr
Copy link
Member

krlmlr commented Jan 29, 2025

The alternative would be to live with the x2 overhead in memory consumption, or to try to be more efficient only when getting data from a table or a Parquet file.

Let me think about it.

@toppyy
Copy link
Contributor

toppyy commented Jan 29, 2025

Sorry, maybe I was a bit unclear. I don't think we can't release the memory, just that it's hard(er) to do column-wise. I was wondering if the added complexity is worth when comparing to options presented below.

Two options come to mind:
1. Release the memory when all columns have been transformed, ie. head(df) or accessed one-by-one (this what I strived for with duckdb/duckdb-r#1027 )
2. Transform all columns when any of them is touched and release the memory

The downside of these compared to releasing memory per column is that we might transform columns to R-representation that otherwise would never get transformed. This can be somewhat mitigated by instructing users to select only columns needed (which makes sense anyways).

With both one could explore the possibility of releasing the memory consumed by data chunks on the fly. So something like:

for datachunk in queryresult:
    for column in columns:
        duckdb_r_transform(datachunk,column)
    datachunk.reset() # Free memory consumed by chunk

This has the benefit that at no point in time would we have the entire duckdb-query-result and it's R-representation in memory.

@krlmlr
Copy link
Member

krlmlr commented Jan 31, 2025

Thanks. The two options you outlined are a good compromise, but, to me, it looks like peak memory consumption still will be twice of the data size.

The package is still useful: memory usage can be controlled with the new concept of prudence (naming TBD, ex funneling). You can stay in DuckDB land for as long as the data is large, and collect() only summaries, where the overhead doesn't matter.

@krlmlr
Copy link
Member

krlmlr commented Feb 2, 2025

Looks like my claim about the x2 memory consumption is wrong. I need to better understand your PR.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
duckdb 🦆 Issues where work in the duckb package is needed
Projects
None yet
Development

No branches or pull requests

3 participants