1
#' Process the EHR data in table format
2
#'
3
#' ccRecord data are re-arranged into tables where the columns stands for 
4
#' data fields (e.g. heart rate, blood pressure) and the rows stands for 
5
#' each data record within a unique cadence. See ccTable_create_cctable.
6
#' ccTable is the data processing platform. It stores both original data 
7
#' and processed data alongside with the process details. It also contains 
8
#' various commonly used data filters. 
9
#' @field record ccRecord.
10
#' @field conf the YAML style configuration.
11
#' @field torigin the original data table.
12
#' @field tclean the data table after cleaning processes. 
13
#' @field dfilter list contains data filtering information.
14
#' @field dquality list contains data quality.
15
#' @field summary list
16
#' @field base_cadence the base cadence is specified in hours
17
#' @include ccRecord.R
18
#' @examples
19
#' rec <- ccRecord()
20
#' cctable <- create_cctable(rec, freq=1)
21
#' cctable <- cctable$clean()
22
#' #table <- cctable$tclean 
23
#' @exportClass ccTable
24
ccTable <- setRefClass("ccTable", 
25
                       fields=c(
26
                                record="ccRecord", 
27
                                conf="list",
28
                                torigin="data.table", 
29
                                tclean="data.table",
30
                                dfilter="list",
31
                                dquality="list",
32
                                summary="list",
33
                                base_cadence="numeric",
34
                                .rindex="data.table", 
35
                                .epindex="data.table",
36
                                items="character"))
37
ccTable$methods(
38
                show = function() {
39 0
                    cat("$tclean", "\n")
40
                    #print(.self$tclean)
41 0
                    cat("Data entry (origin) = ", nrow(.self$torigin), "\n")
42 0
                    uniepisode <- .self$torigin[,1,by=c("episode_id", "site")]
43 0
                    cat("Episode number (origin) = ", nrow(uniepisode), "\n")
44 0
                    cat("The base cadence is ", .self$base_cadence, " hour.\n")
45
                })
46

47
#' Create a ccTable object
48
#'
49
#' Re-arrange the ccRecord object to table format where each column stands 
50
#' for a variable and each row a record data point. The number of rows will 
51
#' depend on the sampling frequency set in this function. If the original data
52
#' has a higher recording frequency than the set frequency (freq), the closest 
53
#' data point will be taken. It is suggested the `freq` should not be set 
54
#' lower than the maximum recording frequency in the original dataset. 
55
#' @param rec ccRecord
56
#' @param conf either the path of YAML configuration file or the configuration
57
#' @param freq a unique sampling frequency (in hours) for all variables. e.g. if freq is set to 
58
#' 1, each row in ccTable will represent a record of one hour. 
59
#' @return ccTable
60
#' @export
61
create_cctable <- function(rec, conf=NULL, freq=1) {
62 1
    if (is.null(conf)) 
63 0
        conf <- ITEM_REF
64
    else { 
65 1
        if (is.character(conf))
66 1
            conf <- yaml.load_file(conf)
67
    }
68

69 1
    cct <- ccTable(record=rec, conf=conf)
70 1
    cct$create_table(freq)
71 1
    return(cct)
72
}
73

74
#' Create a ccTable object
75
#'
76
#' This is a member function of ccTable-class. Using create_cctable is a safer and 
77
#' easier way to create the ccTable. See create_cctable. 
78
#' @name ccTable_create_cctable
79
NULL
80
ccTable$methods(
81
                create_table = function(freq){
82 1
                    "Create a table contains the selected items in the conf with a given
83 1
                    frequency (in hour)"
84 1
                    .self$items <- names(.self$conf)
85 1
                    .self$torigin <- ccd_select_table(record=record, items_opt=items, freq=freq)
86 1
                    if (nrow(.self$torigin) != 0) {
87 1
                        setkey(.self$torigin, "site", "episode_id")
88 1
                        .self$tclean <- .self$torigin
89 1
                        setkey(.self$torigin, "site", "episode_id")
90 1
                        .self$base_cadence <- freq
91

92 1
                        .self$.rindex <- .self$torigin
93 1
                        for(i in .self$items) .self$.rindex[[i]] <- TRUE
94

95 1
                        .self$.epindex <- .self$torigin[, TRUE, by=c("site", "episode_id")]
96 1
                        setnames(.self$.epindex, c("site", "episode_id", "index"))
97
                    } else 
98 0
                        .self$torigin <- data.table(site=character(), 
99 0
                                                    episode_id=character(),
100 0
                                                    time=integer())
101
                })
102

103
ccTable$methods(
104
                update.entry = function(){
105 1
                    for (i in .self$items) 
106 1
                        .self$tclean[[i]][!.self$.rindex[[i]]] <- NA
107
                })
108

109
ccTable$methods(
110
                update.episode = function(){
111 1
                    sep <- .self$.epindex[index==TRUE]
112 1
                    .self$tclean <- merge(.self$tclean, sep, by=c("site", "episode_id"))
113 1
                    .self$tclean[["index"]] <- NULL
114
                })
115

116
#' Apply all the setup filters. 
117
#'
118
#' Once filters are applied, the processed data will be stored in tclean. Note, 
119
#' running filtering function before apply_filters is necessary. This function 
120
#' will have no effect on tclean if no filter is ran prior.
121
#' Filters will decide to preserve or remove particular entries or episodes. 
122
#' @param warnings logical value to indicate more or less messages with an 
123
#' default value TRUE. 
124
#' @name ccTable_apply_filters
125
#' @examples
126
#' \dontrun{
127
#' tb <- create_cctable(ccd, conf, 1)
128
#' tb$range_filter() 
129
#' tb$apply_filter() # apply only the range filter ragardless of the conf. 
130
#' }
131
NULL
132
ccTable$methods(
133
                apply_filters = function(warnings=TRUE) {
134 1
                    "Apply all filters specified in the configuration to update the clean
135 1
                    table (tclean)"
136

137 1
                    spec2function <- function(item.name, filter.name) {
138 1
                        spec <- .self$conf[[item.name]][[filter.name]]$apply
139 1
                        spec <- as.character(as.vector(spec))
140 1
                        switch(spec, 
141 1
                               "drop_entry"=.self$drop_entry,
142 1
                               "drop_episode"=.self$drop_episode,
143 0
                               "NA"=function(nmitem, dq){}, 
144 0
                               "NULL"=function(nmitem, dq){},
145 0
                               stop("functions for applying filters can only be 'drop_entry' or 'drop_episode'. "))
146
                    }
147

148 1
                    ops <- strsplit(grep('apply', names(unlist(.self$conf)), value=TRUE), "[.]") 
149 1
                    for (i in ops) {
150 1
                        item <- i[1]
151 1
                        filter <- i[2]
152 1
                        tryCatch(spec2function(item, filter)(item,
153 1
                                                                   .self$dfilter[[filter]]), 
154 1
                                 error = function(e) {
155 0
                                     if (is.null(.self$dfilter[[filter]])) {
156 0
                                         if (warnings)
157 0
                                             warning(paste(item, "filter", filter, 
158 0
                                                           "has been specified in the configuration but has not been ran."))
159
                                     }
160
                                     else {
161 0
                                         cat(paste(item, filter, "\n"))
162 0
                                         stop(e)
163
                                     }
164
                                 })
165
                    }
166 1
                    .self$update.entry()
167 1
                    .self$update.episode()
168
                })
169

170
ccTable$methods(
171
                drop_entry = function(nmitem, dq){
172 1
                    .self$.rindex[[nmitem]] <- 
173 1
                        .self$.rindex[[nmitem]] & dq$entry[[nmitem]]
174
                })
175

176
ccTable$methods(
177
                drop_episode = function(nmitem, dq){
178 1
                    .self$.epindex[["index"]] <- 
179 1
                        .self$.epindex[["index"]] & dq$episode[["select_index"]]
180
                })
181

182
#' Reload the YAML configuration file
183
#'
184
#' Note, this function will also reset all the operations and 
185
#' remove the tclean. 
186
#' @name ccTable_reload_conf
187
#' @param conf full path of the YAML configuration file or the parsed config list. 
188
#' @examples 
189
#' \dontrun{
190
#' tb$reload_conf("REF.yaml")
191
#' }
192
NULL
193
ccTable$methods(
194
                reload_conf = function(conf) {
195 0
                    "reload yaml configuration."
196 0
                    if (is.character(conf))
197 0
                        .self$conf <- yaml.load_file(conf)
198 0
                    if (!is.list(conf))
199 0
                        stop("conf must be a list or the full path to a YAML file.")
200
                    else 
201 0
                        .self$conf <- conf
202 0
                    .self$reset()
203
                })
204

205
#' Reset the ccTable
206
#' 
207
#' Restore the object to its initial status. All the filters, quality and the 
208
#' cleaned table will be removed.
209
#' @name ccTable_reset
210
NULL
211
ccTable$methods(
212
                reset = function() {
213 0
                    .self$dfilter <- list()
214 0
                    .self$dquality <- list()
215 0
                    .self$tclean <- .self$torigin
216
                })
217

218
#' Export the clean table as a CSV file
219
#' 
220
#' Export tclean as a CSV file.
221
#' @name ccTable_export_csv
222
#' @param file the full path of the output CSV file. 
223
NULL
224
ccTable$methods(
225
                export_csv = function(file=NULL) {
226 0
                    "Export the cleaned table to a CSV file."
227 0
                    if (is.null(file))
228 0
                        return(.self$tclean)
229

230 0
                    write.csv(.self$tclean, file=file)
231
                })
232

233
#' Apply all the filters
234
#'
235
#' All the filters in configuration will be applied to create the 
236
#' clean dataset. The filters include range, categories, missingness, 
237
#' no_data. 
238
#' @name ccTable_clean
239
#' @examples 
240
#' \dontrun{
241
#' tb <- create_cctable(ccd, conf, 1)
242
#' tb$clean()
243
#' }
244
NULL
245
ccTable$methods(
246
                clean = function() {
247 1
                    if (nrow(.self$torigin) != 0 ) {
248 1
                        .self$filter_range()
249 1
                        .self$filter_categories()
250 1
                        .self$filter_missingness()
251 1
                        .self$filter_nodata()
252 1
                        .self$apply_filters()
253
                    }
254
                    else 
255 0
                        warning("The original table is NULL, hence no cleaning process has been performed.")
256
                })
257

258
itemsToDataFrame <- function(ep, items, period_length, freq) {
259 1
    listmatrix <- list()
260 1
    time <- seq(0, period_length, freq)
261

262 1
    listmatrix[["time"]] <- time
263

264 1
    for (i in items) {
265 1
        if (length(ep@data[[i]]) == 1) {
266 1
            listmatrix[[i]] <- rep(ep@data[[i]], length(time))
267
        }
268
        else {
269 1
            if ("time" %in% names(ep@data[[i]])) {
270 1
                new <- reallocateTime(ep@data[[i]], period_length, freq)
271 1
                listmatrix[[i]] <- new$val
272 1
                if ("meta" %in% names(ep@data[[i]]))
273 1
                    listmatrix[[paste(i, "meta", sep=".")]] <- new$meta
274
            }
275
            else
276 1
                listmatrix[[i]] <- rep("NA", length(time))
277
        }
278
    }
279 1
    return(listmatrix)
280
}
281

282
#' Create the table for ccTable from ccRecord
283
#' 
284
#' @param record ccRecord
285
#' @param items_opt character vectors. Items (HIC code) selected in item_opt are optional items, which will be automatically 
286
#' filled when item is missing. 
287
#' @param items_obg obligatory items that is obligatory; Any episode that does not contain
288
#' item in this vector will be removed.
289
#' @param freq numeric cadence in hour. 
290
#' @param return_list logical if TRUE return as a list.  
291
#' @return data.table
292
ccd_select_table <- function(record, items_opt=NULL, items_obg=NULL, freq,
293
                        return_list=FALSE) {
294

295 1
    all_items <- c(items_opt, items_obg)
296 1
    if (is.null(all_items))
297 0
        stop('both items_opt and items_obg are NULL')
298

299 1
    env <- environment()
300 1
    lt <- list()
301 1
    stopifnot(is.list(env$lt)) # totally redundent, just to avoid an anonying
302
    # note says env is assigned but not used!
303 1
    for_each_episode(record,
304 1
                     function(ep) {
305 1
                         if (all(items_obg %in% names(ep@data))) {
306 1
                             result <- list()
307 1
                             period_length <- getEpisodePeriod(ep)
308
                             # getEpisodePeriod will return NULL when no 2D
309
                             # data been found. 
310 1
                             if (!is.null(period_length)) {
311 1
                                 if (period_length > 0 ) {
312 1
                                     result <- append(result,
313 1
                                                      itemsToDataFrame(ep, all_items,
314 1
                                                                       period_length,
315 1
                                                                       freq))
316 1
                                     nlength <- length(result[["time"]])
317 1
                                     result[["site"]] <- rep(ep@site_id, nlength)
318 1
                                     result[["episode_id"]] <- rep(ep@episode_id, nlength)
319 1
                                     env$lt[[length(lt) + 1]]<- .simple.data.frame(result)
320
                                 }
321
                             }
322
                         }
323
                     })
324 1
    if (return_list)
325 0
        return(lt)
326

327
    # fill is true because meta data column can be missing. 
328 1
    dt <- rbindlist(lt, fill=TRUE) 
329

330

331
    # Adding missing meta columns to keep the 2d wide consistent. 
332 1
    code.has.meta <- names(unlist(sapply(ITEM_REF, function(x) x$NHICmetaCode)))
333 1
    for (i in all_items) {
334 1
        meta.code <- paste(i, "meta", sep=".")
335 1
        if (i %in% code.has.meta & !(meta.code %in% names(dt))) {
336 1
            dt[[meta.code]] <- rep("NA", nrow(dt))
337
        }
338
    }
339

340
    # convert data type 
341 1
    for (i in all_items)
342 1
        dt[[i]] <- suppressWarnings(.which.datatype(i)(as.character(dt[[i]])))
343

344

345 1
    return(dt)
346
}
347

348
#' Clean table - low memory
349
#' 
350
#' The cleaning process is specified by the YAML configuration. All the filters
351
#' presented in the configuration will be applied. It returns only the cleaned
352
#' data. However all the data quality information will be lost. This function
353
#' is useful when the memory is not sufficiently enough to hold all the
354
#' information. 
355
#' @param record ccRecord
356
#' @param config the full path of the YAML configuration file
357
#' @param freq table cadence
358
#' @param nchunks integer number. The larger the nchunks the less memory
359
#' requirement. 
360
#' @return A cleaned 2d wide table
361
#' @export create2dclean
362
create2dclean <- function(record, config, freq=1, nchunks=1) {
363 0
    .create2dclean <- function(record, config, freq) {
364 0
        dt.sofa <- create_cctable(rec=record, conf=config, freq=freq)
365 0
        dt.sofa$filter_range()
366 0
        dt.sofa$filter_categories()
367 0
        dt.sofa$filter_missingness()
368 0
        dt.sofa$filter.nodata()
369 0
        dt.sofa$apply_filters()
370 0
        return(dt.sofa)
371
    }
372

373

374 0
    if (is.character(config))
375 0
        config <- yaml.load_file(config)
376

377 0
    stopifnot(nchunks > 0 & nchunks < record@nepisodes)
378

379 0
    if (nchunks == 1)
380 0
        return(.create2dclean(record, config, freq)$tclean)
381

382 0
    op.seq <- round(seq(1, record@nepisodes + 1, length.out=nchunks + 1))
383

384 0
    tclean <- list()
385

386 0
    for (i in seq(length(op.seq) - 1)) {
387 0
        rc <- record[seq(op.seq[i], op.seq[i+1] - 1)]
388 0
        tclean[[i]] <- .create2dclean(rc, config, freq)$tclean
389 0
        gc()
390
    }
391

392 0
    tclean <- rbindlist(tclean, fill=TRUE)
393 0
    return(tclean)
394
}
395

396

397

398
#' @importFrom Rcpp evalCpp
399
#' @useDynLib cleanEHR 
400
reallocateTime <- function(d, t_discharge, frequency) {
401 1
    d_ <- d
402 1
    stopifnot(any(names(d) == "time"))
403 1
    stopifnot(any(names(d) == "item2d"))
404 1
    stopifnot(class(d$time) == "numeric")
405 1
    return(reallocateTime_(d_, t_discharge, frequency))
406
}
407

408

409
findMaxTime <- function(episode) {
410 1
    get2dTime <- function(episode){
411 1
        time_lt <- 
412 1
            lapply(episode@data, 
413 1
                   function(item){
414 1
                       if(length(item) > 1) {
415 1
                           if (!is.numeric(item$time))
416 0
                               item$time <-
417 0
                                   as.numeric(as.character(item$time))
418 1
                               return(max(item$time))
419
                       }
420
                   })
421 1
        tm <- unlist(time_lt)
422 1
        tm
423
    }
424 1
    tm <- get2dTime(episode)
425 1
    if (is.null(tm))
426 0
        return(NULL)
427
    else
428 1
        return(max(tm))
429
}
430

431

432
#' Get the length of stay based on the first and the last data point. 
433
#' 
434
#' @param e ccEpisode object.
435
#' @param unit character string.  Units in which the results are desired. Can be abbreviated.
436
#' @return length of stay
437
#' @export getEpisodePeriod
438
getEpisodePeriod <- function (e, unit="hours") {
439
    # pseudo delta period, see addPseudoTime()
440 1
    if (class(e@t_discharge)[1] == "numeric")
441 0
        return(e@t_discharge)
442

443 1
    if (class(e@t_admission)[1] != "POSIXct")
444 0
        tadm <- xmlTime2POSIX(as.character(e@t_admission), allow=TRUE)
445
    else 
446 1
        tadm <- e@t_admission
447 1
    if (class(e@t_discharge)[1] != "POSIXct")
448 0
        tdisc <- xmlTime2POSIX(as.character(e@t_discharge), allow=TRUE)
449
    else 
450 1
        tdisc <- e@t_discharge
451

452
    # The failure of POSIX conversion indicates that this episode is either 
453
    # anonymised or has a missing or incorrect value of discharge or admission
454
    # time. 
455 1
    if (is.na(tadm) || is.na(tdisc))
456 1
        period_length <- findMaxTime(e)
457
    else {
458 1
        if (any(is.null(tdisc), is.null(tadm)))
459 0
            period_length <- NULL
460
        else
461 1
            period_length <- as.numeric(tdisc - tadm,
462 1
                                        units=unit)
463
    }
464
    # in cases that tdisc == tadm
465 1
    if (!is.null(period_length)) {
466 1
        if (period_length == 0)
467 0
            period_length <- period_length + 1
468
    }
469

470 1
    if (is.null(period_length))
471 0
        warning("This episode does not have any time series data: ", 
472 0
                " episode_id = ", e@episode_id, 
473 0
                " nhs_number = ", e@nhs_number, 
474 0
                " pas_number = ", e@pas_number,
475 0
                " period_length = ", period_length, "\n")
476

477

478 1
    return(period_length)
479
}
480

481
#' Propagate a numerical delta time interval record.
482
#'
483
#' @param record ccRecord
484
#' @param delta time frequency in hours
485
#' @details when discharge time and admission time are missing, the latest  and
486
#' the earliest data time stamp will be used instead.
487
reallocateTimeRecord <- function(record, delta=0.5) {
488 1
    reallocate.episode <- function(e) {
489 1
        env <- environment()
490
        # make sure admin and disc time is correct
491 1
        period_length <- getEpisodePeriod(e)
492 0
        if (period_length < 0)  warning("period length < 0")
493

494
        # calling reallocateTime for each data item
495 1
        new.episode(lapply(e@data, 
496 1
                           function(d) {
497 1
                               if (length(d) > 1) {
498 1
                                   return(reallocateTime(d, env$period_length, delta))
499
                               } else 
500 1
                                   return(d)
501
                           }))
502
    }
503 1
    newdata <- for_each_episode(record, reallocate.episode)
504 1
    return(ccRecord() + newdata)
505
}

Read our documentation on viewing source code .

Loading