R/aggregation.R
54a0f1cc
 #' readPipelineResults
 #'
886ad4f9
 #' @param path The path (e.g. folder or prefix) to the results. Either `path` 
 #' or `resfiles` should be given.
54a0f1cc
 #' @param resfiles A vector of paths to `*.evaluation.rds` files. Either `path` 
 #' or `resfiles` should be given.
 #'
 #' @return A list of results.
 #' @examples
 #' # we produce mock pipeline results:
 #' pip <- mockPipeline()
 #' datasets <- list( ds1=1:3, ds2=c(5,10,15) )
886ad4f9
 #' tmpdir1 <- paste0(tempdir(),'/')
20c06813
 #' res <- runPipeline(datasets, pipelineDef=pip, output.prefix=tmpdir1,
 #'                    alternatives=list() )
54a0f1cc
 #' # we read the evaluation files:
 #' res <- readPipelineResults(tmpdir1)
118eb482
 #' @export
886ad4f9
 readPipelineResults <- function(path = NULL, resfiles = NULL) {
   if ( (!is.null(path) && !is.null(resfiles)) || 
        (is.null(path) && is.null(resfiles)) ) 
4bfc4130
     stop("Exactly one of `path` or `resfiles` should be given.")
886ad4f9
   if (!is.null(path)) {
4bfc4130
     resfiles <- list.files(path, pattern="evaluation\\.rds", full.names=TRUE)
886ad4f9
     if (length(resfiles) == 0) 
       resfiles <- list.files(dirname(path), full.names=TRUE, 
4bfc4130
                              pattern=paste0(gsub("\\.","\\\\.",basename(path)),
886ad4f9
                                             ".*\\.evaluation\\.rds$"))
     if (length(resfiles) == 0) 
       stop("Could not find evaluation files.")
4bfc4130
   }
886ad4f9
   ds <- vapply(strsplit(basename(resfiles), "\\."), FUN=function(x) rev(x)[3], 
                FUN.VALUE=character(1))
   if (any(duplicated(ds))) 
     stop("Some datasets appear to occur in more than one file. If these are ",
           "the result of multiple `runPipeline` calls, please read them ",
           "separately and use `mergePipelineResults()`.")
4bfc4130
   names(resfiles) <- ds
   lapply(resfiles, readRDS)
 }
 
 
 #' aggregatePipelineResults
94d221cd
 #'
886ad4f9
 #' Aggregates the evaluation and running times of `runPipeline` results. 
 #' Results should be indicated either as a `path`` prefix or as a vector of 
4bfc4130
 #' paths to `evaluation\\.rds` files (`resfiles`).
94d221cd
 #'
4bfc4130
 #' @param res A (named) list of results (per dataset), as produced by 
25bb1639
 #' \code{\link{readPipelineResults}} (or `mergePipelineResults`).
 #' @param pipDef An optional \code{\link{PipelineDefinition}} containing the 
 #' aggregation methods. If omitted, that from the results will be used.
94d221cd
 #'
 #' @return A list with a slot for each step for which there is an aggregation 
 #' method, or (if no aggregation method available) a list of the 
 #' `stepIntermediateReturnObjects` of `runPipeline`
4bfc4130
 #'
 #' @import S4Vectors
94d221cd
 #' @export
54a0f1cc
 #' @examples
 #' # we produce mock pipeline results:
 #' pip <- mockPipeline()
 #' datasets <- list( ds1=1:3, ds2=c(5,10,15) )
886ad4f9
 #' tmpdir1 <- paste0(tempdir(),'/')
d29e21a7
 #' res <- runPipeline(datasets, pipelineDef=pip, output.prefix=tmpdir1,
 #'                    alternatives=list() )
54a0f1cc
 #' # we read the evaluation files:
 #' res <- readPipelineResults(tmpdir1)
 #' # we aggregate the results (equivalent to the output of `runPipeline`):
 #' res <- aggregatePipelineResults(res)
886ad4f9
 aggregatePipelineResults <- function(res, pipDef = NULL) {
   if (length(res) == 2 && all(names(res) == c("evaluation", "elapsed"))) 
     stop("`res` appears to be already aggregated, or the results of a single", 
54a0f1cc
          " dataset")
886ad4f9
   .checkRes(res, requirePDidentity = is.null(pipDef))
   if (is.null(pipDef)) 
     pipDef <- metadata(res[[1]])$PipelineDefinition
   if (is.null(pipDef)) 
     stop("No PipelineDefinition found!")
7cc6ac5e
   
4bfc4130
   # aggregating elapsed time
   names(steps) <- steps <- names(res[[1]]$elapsed$stepwise)
886ad4f9
   reso <- SimpleList(
     evaluation=NULL, 
     elapsed=list(stepwise=lapply(steps, FUN=function(step){
         .aggElapsed(lapply(res, FUN = function(x) x$elapsed$stepwise[[step]]))
       }),
       total=.aggElapsed(lapply(res, FUN=function(x) x$elapsed$total)))
   )
4bfc4130
   metadata(reso)$PipelineDefinition <- pipDef
932ec5c4
   
886ad4f9
   isn <- vapply(stepFn(pipDef, type = "aggregation"), is.null, logical(1))
   if (all(isn)) {
     warning("No aggregation defined in the pipelineDefinition; returning only",
     " running times.")
4bfc4130
     return(reso)
94d221cd
   }
4bfc4130
   
94d221cd
   names(isn) <- isn <- names(isn)[!isn]
886ad4f9
   res <- lapply(res, FUN = function(x) x$evaluation)
4bfc4130
   
94d221cd
   fullnames <- parsePipNames(names(res[[1]][[length(res[[1]])]]))
886ad4f9
   reso$evaluation <- lapply(isn, FUN = function(x) {
54a0f1cc
     message("Aggregating evaluation results for: ", x)
886ad4f9
     stepFn(pipDef,type="aggregation")[[x]](lapply(res, FUN=function(y) y[[x]]))
94d221cd
   })
4bfc4130
   reso
 }
 
886ad4f9
 .aggElapsed <- function(res) {
4bfc4130
   el.tot <- parsePipNames(names(res[[1]]))
886ad4f9
   el.tot <- el.tot[rep(seq_len(nrow(el.tot)), length(res)), , drop = FALSE]
4bfc4130
   row.names(el.tot) <- NULL
886ad4f9
   el.tot$dataset <- factor(rep(names(res), each = length(res[[1]])))
4bfc4130
   el.tot$elapsed <- as.numeric(unlist(res))
   el.tot
 }
 
886ad4f9
 .checkRes <- function(res1, res2 = NULL, requirePDidentity = TRUE) {
   res1 <- lapply(res1, FUN = function(x) x$evaluation)
4bfc4130
   # check that all datasets have the same steps and number of analyses
886ad4f9
   if (!all(table(unlist(lapply(res1, names))) == length(res1))) 
4bfc4130
     stop("The different datasets were not produced with the same pipeline!")
886ad4f9
   for (step in names(res1[[1]])) {
     tt <- table(unlist(lapply(res1, FUN = function(x) names(x[[step]]))))
     if (!all(tt == length(res1))) 
       stop("The different datasets do not have the same runs, i.e. they ",
           "include different sets of alternative parameters.")
4bfc4130
   }
886ad4f9
   if (!is.null(res2)) {
4bfc4130
     .checkRes(res2)
886ad4f9
     res2 <- lapply(res2, FUN = function(x) x$evaluation)
     if (names(res1)[[1]] != names(res2)[[1]]) 
4bfc4130
       stop("The two sets of results were not produced with the same pipeline.")
886ad4f9
     if (is(res1,"SimpleList") && !is.null(metadata(res1)$PipelineDefinition) && 
         is(res2,"SimpleList") && !is.null(metadata(res2)$PipelineDefinition)) {
4bfc4130
       # we require that the evaluation functions be the same
       pd1 <- metadata(res1)$PipelineDefinition
       pd2 <- metadata(res2)$PipelineDefinition
886ad4f9
       if (!identical( stepFn(pd1, type = "evaluation"),
                       stepFn(pd2, type = "evaluation") )) {
         msg <- paste("The evaluation functions of the PipelineDefinitions are", 
7cc6ac5e
                      "not identical")
886ad4f9
         if (requirePDidentity) 
           stop(msg)
4bfc4130
         warning(msg)
       }
     }
   }
 }
 
 #' mergePipelineResults
25bb1639
 #' 
 #' Merges the (non-aggregated) results of any number of runs of `runPipeline`
 #' using the same \code{\link{PipelineDefinition}} (but on different datasets 
 #' and/or using different parameters). First read the different sets of results 
 #' using \code{\link{readPipelineResults}}, and pass them to this function.
 #' 
4bfc4130
 #'
25bb1639
 #' @param ... Any number of lists of pipeline results, each as produced by 
 #' \code{\link{readPipelineResults}}
 #' @param rr Alternatively, the pipeline results can be passed as a list (in 
 #' which case `...` is ignored)
 #' @param verbose Whether to print processing information
4bfc4130
 #'
25bb1639
 #' @return A list of merged pipeline results.
54a0f1cc
 #' @examples
 #' # we produce 2 mock pipeline results:
 #' pip <- mockPipeline()
 #' datasets <- list( ds1=1:3, ds2=c(5,10,15) )
886ad4f9
 #' tmpdir1 <- paste0(tempdir(),'/')
d29e21a7
 #' res <- runPipeline(datasets, pipelineDef=pip, output.prefix=tmpdir1,
 #'                    alternatives=list() )
886ad4f9
 #' alternatives <- list(meth1=c('log2','sqrt'), meth2='cumsum')
 #' tmpdir2 <- paste0(tempdir(),'/')
fd64ca31
 #' res <- runPipeline(datasets, alternatives, pip, output.prefix=tmpdir2)
54a0f1cc
 #' # we read the evaluation files:
 #' res1 <- readPipelineResults(tmpdir1)
 #' res2 <- readPipelineResults(tmpdir2)
 #' # we merge them:
 #' res <- mergePipelineResults(res1,res2)
 #' # and we aggregate:
 #' res <- aggregatePipelineResults(res)
118eb482
 #' @export
25bb1639
 mergePipelineResults <- function(..., rr=NULL, verbose=TRUE){
   if(is.null(rr)) rr <- list(...)
   if(any(vapply(rr, FUN.VALUE=logical(1L), 
                 FUN=function(x) is.null(x[[1]]$evaluation))))
     stop("Some of the objects appear not to be pipeline results.")
   if(is.null(names(rr))) names(rr) <- paste0("res",1:length(rr))
   for(ds in rr) .checkSameSteps(rr)
   .checkSameSteps(lapply(rr, FUN=function(x) x[[1]]))
   if(verbose) print(lapply(rr, FUN=function(res){
     vapply(res, FUN.VALUE=integer(1L), 
            FUN=function(x) length(rev(x$evaluation)[[1]]))}))
   
   # concatenate results
   names(ds) <- ds <- unique(unlist(lapply(rr, names)))
   hasdup <- FALSE
   res <- lapply(ds, FUN=function(dataset){
     r2 <- lapply(rr, FUN=function(x) x[[dataset]])
     r2 <- r2[!vapply(r2,is.null,logical(1L))]
     if(length(r2)==1) return(r2[[1]])
     if(!hasdup){
       rn <- unlist(lapply(r2, FUN=function(x) names(rev(x$evaluation)[[1]])))
       if(any(duplicated(rn))) hasdup <- TRUE
     }
     res <- r2[[1]]
     res$errors <- NULL
     for(i in seq(2,length(r2))){
       x <- setdiff(names(r2[[i]]$elapsed$total), names(res$elapsed$total))
       res$elapsed$total <- c(res$elapsed$total, r2[[i]]$elapsed$total[x])
       for(step in names(res$evaluation)){
         x <- setdiff(names(r2[[i]]$evaluation[[step]]), 
                      names(res$evaluation[[step]]))
         res$evaluation[[step]] <- c(res$evaluation[[step]], 
                                     r2[[i]]$evaluation[[step]][x])
         res$elapsed$stepwise[[step]] <- c(res$elapsed$stepwise[[step]], 
                                           r2[[i]]$elapsed$stepwise[[step]][x])
       }
     }
     res
f2eff6f1
   })
25bb1639
   rm(rr)
   if(hasdup && verbose) 
     message("Some analyses were redundant across results sets, ",
             "and only the first given was kept.")
7cc6ac5e
   
25bb1639
   # retain only complete parameter sets
   hasmissing <- FALSE
   for(step in names(res[[1]]$evaluation)){
     a <- table(unlist(lapply(res,FUN=function(x) names(x$evaluation[[step]]))))
     if(length(a)>0){
       if(!all(w <- a==length(res))) hasmissing <- TRUE
       a <- sort(names(a)[w])
       if(length(a)==0) stop("No complete parameter set!")
       res <- lapply(res, FUN=function(x){
         x$evaluation[[step]] <- x$evaluation[[step]][a]
         x$elapsed$stepwise[[step]] <- x$elapsed$stepwise[[step]][a]
         if(step==rev(names(x$evaluation))[1])
           x$elapsed$total <- x$elapsed$total[a]
         x
       })
     }
f2eff6f1
   }
25bb1639
   if(hasmissing && verbose) 
     warning("Some parameter combinations were not available for all datasets ",
             "and were dropped.")
   
   if(verbose) print(list(merged=vapply(res, FUN.VALUE=integer(1L), 
            FUN=function(x) length(rev(x$evaluation)[[1]]))))
f2eff6f1
   res
94d221cd
 }
54a0f1cc
 
25bb1639
 .checkSameSteps <- function(x, stopifnot=TRUE){
   steps <- table(unlist(lapply(x, FUN=function(x) names(x$evaluation))))
   if(all(steps==length(x))) return(TRUE)
   if(stopifnot) stop("Some datasets have different steps - were then run using",
                       " the same PipelineDefinition?")
   FALSE
 }
 
554a0cf9
 .dsMergeResults <- function(res1, res2) {
   names(steps) <- steps <- names(res1$evaluation)
   esteps <- vapply(res1$evaluation, FUN = function(x) {
     !is.null(x) && length(x) > 0
   }, logical(1))
   edif <- setdiff(names(res2$elapsed$total), names(res1$elapsed$total))
   res <- SimpleList(evaluation = lapply(steps[esteps], FUN = function(step) {
     r1 <- res1$evaluation[[step]]
     r2 <- res2$evaluation[[step]]
     dif <- setdiff(names(r2), names(r1))
     c(r1, r2[dif])
   }), elapsed = list(stepwise = lapply(steps, FUN = function(step) {
     r1 <- res1$elapsed$stepwise[[step]]
     r2 <- res2$elapsed$stepwise[[step]]
     dif <- setdiff(names(r2), names(r1))
     c(r1, r2[dif])
   }), total = c(res1$elapsed$total, res2$elapsed$total[edif])))
   if (!is.null(metadata(res1)$PipelineDefinition)) {
     metadata(res)$PipelineDefinition <- metadata(res1)$PipelineDefinition
   } else if (!is.null(metadata(res2)$PipelineDefinition)) {
     metadata(res)$PipelineDefinition < metadata(res2)$PipelineDefinition
   }
   res
 }
54a0f1cc
 
 #' defaultStepAggregation
 #'
a53f1220
 #' @param x A list of results per dataset, each containing a list (1 element 
 #' per combination of parameters) of evaluation metrics (coercible to vectors 
 #' or matrix).
54a0f1cc
 #'
 #' @return A data.frame.
 #' @export
 #' @importFrom dplyr bind_rows
886ad4f9
 defaultStepAggregation <- function(x) {
7558b7b2
   y <- x[[1]][[1]]
886ad4f9
   if (is(y, "list") || is(y, "SimpleList")) {
     res <- lapply(seq_along(y), FUN = function(i) {
       defaultStepAggregation(lapply(x, FUN = function(x) {
         lapply(x, FUN = function(x) x[[i]])
7558b7b2
       }))
     })
     names(res) <- names(y)
     return(res)
   }
886ad4f9
   dplyr::bind_rows(lapply(x, FUN = function(x) {
7cc6ac5e
     x <- cbind(parsePipNames(names(x)), do.call(rbind, x))
     row.names(x) <- NULL
     x
886ad4f9
   }), .id = "dataset")
54a0f1cc
 }