Wednesday, December 04, 2013

streaming merge of sorted objects

A lot of software still seems to rely on being able to read big-ish data into memory. This is not possible (or at least not desirable) for much of the data that I work with. There are very nice tools in python to allow operating on chunks of data at a time. When combined with a decent data-layout, this can be very powerful, and simpler even than reading everything into memory. This can change working on big(ish) data into something like working on small data. The output of a tool that I'm using is a file of genomic positions and a value. Something like:
chr1 1234 0.9
chr1 1239 0.12
chr1 1249 0.12
That file is for a single sample and may contain about 10 million lines. That's not too much, but with 60 samples, this can become a problem. In addition, another sample may have sites that are not in that first sample:
chr1 1221 0.91
chr1 1239 0.13
chr1 1259 0.22
Many softwares will take a matrix with rows of genomic positions and 1 column per sample (e.g. R's limma, python's scikit-learn [after transpose of that shape]). Since we know the data is sorted we can use some of python's machinery to simplify creating that matrix. We could use merge in R or in pandas (both of those can operate on chunks but I think it's fair to say those faculties are not used much and would be difficult for this use-case), but this is a way to do it with very little memory use in a way that streams the input and the output so that it can be used immediately. Specifically, we will use heapq.merge. In this case, we need to make our data comparable using a class with a __cmp__ method
class Row(object):
    __slots__ = ('chrom', 'start', 'end', 'value', 'source')

    def __init__(self, toks, val_col=4, source=None):
        self.chrom = toks[0]
        self.start = int(toks[1])
        self.value = toks[val_col - 1]
        self.source = source

    def __cmp__(self, other):
        return cmp(self.chrom, other.chrom) or cmp(self.start, other.start)
That turns a each line from the file above into something order-able by chromosome and position. Since our data in each file is already sorted, this will be used to compare data across files. For each file we wish to sort, we create a lazy iterable like this:
def gen_iterable(fname, val_col):
    for toks in (x.rstrip("\r\n").split("\t") for x in xopen(fname)):
        yield Row(toks, val_col, source=fname)

and then a list of iterables as:
  
iterables = [gen_iterable(f, value_col) for f in bed_files]
where value_col would be 3 for the example data above (gets converted to a 0-based index). Since we want to have 1 output line for any position observed in any input, we can use itertools.groupby on the merged objects:
for loc, cgs in groupby(heapq.merge(*iterables),
                        lambda cg: (cg.chrom, cg.start)):

This groups the output of all the files by the location. We can know which files a represented in the cgs group by their .source attribute. In this case, I fill in empty values with 0 (this may be better set as NaN or some other value in some cases):
        present = dict((c.source, c) for c in cgs)

        # if a file doesn't have a record for here, just append 0
        values = [(present[s].value if s in present else '0') for s in bed_files]
        yield cgs[0].chrom, cgs[0].start, values
Where I have defined the source as the file where the value came. This is to ensure that the values are always in the same order regardless of which are missing. The yield sends back the chromosome, start and the list of values to the caller so that they can be manipulated as needed. At this point, that is small data. We have a single site (in this case a CpG site with methylation values) that we can operate on. This all happens on streaming data so the memory use is negligible. Similar ideas exist in BEDTools for sorted data (though it is more complicated to handle sorted interval data as opposed to here where we consider only points) but this is a simple way to build a custom setup for streaming arbitrary sorted data. The code the full code for this example is here: https://github.com/brentp/bio-playground/blob/master/methstuffs/bed-merge.py