Showing 1 of 4 files from the diff.

@@ -21,7 +21,7 @@
Loading
21 21
22 22
23 23
# Add a single observation to benchmark
24 -
observation <- function(bench, mode, format_id, data_id, compression, size, time, orig_size) {
24 +
observation <- function(bench, mode, format_id, data_id, compression, size, time, nr_of_rows, orig_size) {
25 25
  rbindlist(list(bench, data.table(
26 26
    Mode = mode,
27 27
    ID = format_id,
@@ -29,26 +29,27 @@
Loading
29 29
    Compression = compression,
30 30
    Size = size,
31 31
    Time = time,
32 +
    NrOfRows = nr_of_rows,
32 33
    OrigSize = orig_size)))
33 34
}
34 35
35 36
36 -
#' Runs benchmarks
37 +
#' Run serialization benchmarks
37 38
#'
39 +
#' @param generator function f(nr_of_rows) that generates the data.frame
40 +
#' @param table_streamers a single tablestreamer object generated with table_streamer().
41 +
#' Could also be a list of tablestreamer objects to benchmark various streamers.
42 +
#' @param nr_of_rows vector of number of rows values to use in the benchmark
38 43
#' @param nr_of_runs repeat the benchmark for statistics
39 44
#' @param cycle_size create cycly_size files before overwriting
40 -
#' @param generator function f(nr_of_rows) that generates the data.frame
41 -
#' @param nr_of_rows number of rows to use in the benchmark
42 45
#' @param compression vector of compression values to use for benchmarking
43 46
#' @param result_folder folder to use for temporal storage of results
44 47
#' @param bench_id 
45 -
#' @param table_streamers a single tablestreamer object generated with table_streamer().
46 -
#' Could also be a list of tablestreamer objects to benchmark various streamers.
47 48
#'
48 49
#' @return benchmarks results
49 50
#' @export
50 51
synthetic_bench <- function(generator, table_streamers, nr_of_rows,
51 -
  compression, nr_of_runs = 100, cycle_size = 10, result_folder = "results") {
52 +
  nr_of_runs = 10, cycle_size = 10, compression = NULL, result_folder = "results", progress = TRUE) {
52 53
53 54
  # verify table streamers
54 55
  if (class(table_streamers) == "tablestreamer") {
@@ -65,69 +66,101 @@
Loading
65 66
66 67
  results <- NULL
67 68
68 -
  for (compress_count in 1:length(compression)) {
69 +
  # define progress bar
70 +
  if (progress) {
69 71
70 -
    cat("\ncompression", compression[compress_count])
72 +
    compression_steps <- length(table_streamers)
71 73
72 -
    for (run_id in 1:nr_of_runs) {
74 +
    if (!is.null(compression)) {
75 +
      compressors <- 0
76 +
      for (table_streamer in table_streamers) {
77 +
        if (table_streamer$variable_compression) compressors <- compressors + 1
78 +
      }
73 79
74 -
      cat("\nwriting ...")
80 +
      compression_steps <- compressors * length(compression) + length(table_streamers) - compressors
81 +
    }
75 82
76 -
      # write to disk
83 +
    nr_of_measurements <- 2 * compression_steps * nr_of_runs * cycle_size * length(nr_of_rows)
84 +
    row_weights <- length(nr_of_rows) * nr_of_rows / sum(nr_of_rows)
85 +
    measurement_count <- 0
77 86
78 -
      # write cycle_size files
79 -
      for (id in 1:cycle_size) {
87 +
    pb <- progress_bar$new("[:bar] :percent :eta", total = 100)
88 +
  }
80 89
81 -
        cat(".")
90 +
  # create a length 1 vector
91 +
  if (is.null(compression)) {
92 +
    compression <- -1
93 +
  }
82 94
83 -
        # generate dataset once for all generators
84 -
        x <- generator$generator(nr_of_rows)
95 +
  for (nr_of_rows_index in 1:length(nr_of_rows)) {
85 96
86 -
        # disk warmup (to avoid a sleeping disk after data creation)
87 -
        saveRDS("warmup disk", paste0(result_folder, "/", "warmup.rds"))
97 +
    cur_nr_of_rows <- nr_of_rows[nr_of_rows_index]
88 98
89 -
        # iterate
90 -
        for (table_streamer in table_streamers[sample(1:length(table_streamers))]) {
99 +
    for (compress_count in 1:length(compression)) {
91 100
92 -
          # don't repeat identical measurements
93 -
          if (!table_streamer$variable_compression && compress_count > 1) next
101 +
      write_compression <- compression[compress_count]
102 +
      if (write_compression == -1) write_compression <- NULL
94 103
95 -
          file_name <- paste0(result_folder, "/", "dataset_", table_streamer$id, "_", id)
104 +
      for (run_id in 1:nr_of_runs) {
96 105
97 -
          # Only a single iteration is used to avoid disk caching effects
98 -
          # Due to caching measured speeds are higher and create a unrealistic benchmark
99 -
          res <- microbenchmark({
100 -
            table_streamer$table_writer(x, file_name, compression[compress_count])
101 -
          },
102 -
          times = 1)
106 +
        # write cycle_size files
107 +
        for (id in 1:cycle_size) {
103 108
104 -
          results <- observation(results, "write", table_streamer$id, generator$id,
105 -
            compression[compress_count], file.info(file_name)$size, res$time, object.size(x))
106 -
        }
107 -
      }
109 +
          # generate dataset once for all generators
110 +
          x <- generator$generator(cur_nr_of_rows)
108 111
109 -
      # read from disk
110 -
      cat("   reading ...")
112 +
          # disk warmup (to avoid a sleeping disk after data creation)
113 +
          saveRDS("warmup disk", paste0(result_folder, "/", "warmup.rds"))
111 114
112 -
      for (id in 1:cycle_size) {
115 +
          # iterate
116 +
          for (table_streamer in table_streamers[sample(1:length(table_streamers))]) {
113 117
114 -
        cat(".")
118 +
            # don't repeat identical measurements
119 +
            if (!table_streamer$variable_compression && compress_count > 1) next
115 120
116 -
        # iterate
117 -
        for (table_streamer in table_streamers[sample(1:length(table_streamers))]) {
121 +
            file_name <- paste0(result_folder, "/", "dataset_", table_streamer$id, "_", id)
118 122
119 -
          # don't repeat identical measurements
120 -
          if (!table_streamer$variable_compression && compress_count > 1) next
121 -
122 -
          file_name <- paste0(result_folder, "/", "dataset_", table_streamer$id, "_", id)
123 -
124 -
          res <- microbenchmark({
125 -
              y <- table_streamer$table_reader(file_name)
123 +
            # Only a single iteration is used to avoid disk caching effects
124 +
            # Due to caching measured speeds are higher and create a unrealistic benchmark
125 +
            res <- microbenchmark({
126 +
              table_streamer$table_writer(x, file_name, write_compression)
126 127
            },
127 128
            times = 1)
128 129
129 -
          results <- observation(results, "read", table_streamer$id, generator$id,
130 -
            compression[compress_count], file.info(file_name)$size, res$time, object.size(y))
130 +
            results <- observation(results, "write", table_streamer$id, generator$id,
131 +
              write_compression, file.info(file_name)$size, res$time, cur_nr_of_rows, object.size(x))
132 +
133 +
            if (progress) {
134 +
              measurement_count <- measurement_count + row_weights[nr_of_rows_index]
135 +
              pb$update(measurement_count / nr_of_measurements)
136 +
            }
137 +
          }
138 +
        }
139 +
140 +
        # read from disk
141 +
        for (id in 1:cycle_size) {
142 +
143 +
          # iterate
144 +
          for (table_streamer in table_streamers[sample(1:length(table_streamers))]) {
145 +
146 +
            # don't repeat identical measurements
147 +
            if (!table_streamer$variable_compression && compress_count > 1) next
148 +
149 +
            file_name <- paste0(result_folder, "/", "dataset_", table_streamer$id, "_", id)
150 +
151 +
            res <- microbenchmark({
152 +
                y <- table_streamer$table_reader(file_name)
153 +
              },
154 +
              times = 1)
155 +
156 +
            results <- observation(results, "read", table_streamer$id, generator$id,
157 +
              write_compression, file.info(file_name)$size, res$time, cur_nr_of_rows, object.size(y))
158 +
159 +
            if (progress) {
160 +
              measurement_count <- measurement_count + row_weights[nr_of_rows_index]
161 +
              pb$update(measurement_count / nr_of_measurements)
162 +
            }
163 +
          }
131 164
        }
132 165
      }
133 166
    }
Files Coverage
R 74.76%
Project Totals (7 files) 74.76%
Sunburst
The inner-most circle is the entire project, moving away from the center are folders then, finally, a single file. The size and color of each slice is representing the number of statements and the coverage, respectively.
Icicle
The top section represents the entire project. Proceeding with folders and finally individual files. The size and color of each slice is representing the number of statements and the coverage, respectively.
Grid
Each block represents a single file in the project. The size and color of each block is represented by the number of statements and the coverage, respectively.
Loading