Source code for dfcompare

import functools
import heapq
import os.path
import pandas as pd
import shutil
import tempfile

try:
    from itertools import zip_longest
except ImportError:
    from itertools import izip_longest as zip_longest

@functools.total_ordering
class BufferedIterator(object):

    def __init__(self, it):
        self._it = it
        self._buffered = False

    def head(self):
        if not self._buffered:
            self._head = next(self._it)
            self._buffered = True
        return self._head

    def __next__(self):
        if self._buffered:
            self._buffered = False
            return self._head
        else:
            return next(self._it)

    next = __next__

    def __eq__(self, other):
        return self.head() == other.head()

    def __lt__(self, other):
        return self.head() < other.head()

    def __bool__(self):
        try:
            self.head()
            return True
        except StopIteration:
            return False

    __nonzero__ = __bool__

def _external_sort(dfs):
    dir = tempfile.mkdtemp()
    try:
        q = []
        for df in dfs:
            if isinstance(df.index, pd.MultiIndex):
                index_col = list(range(len(df.index.levels)))
            else:
                index_col = 0
            df = df.sort_index()
            path = os.path.join(dir, str(len(q)) + '.csv')
            df.to_csv(path)
            chunks = pd.read_csv(path, chunksize=16, index_col=index_col)
            rows = BufferedIterator(row for df in chunks for row in df.itertuples())
            if rows:
                q.append(rows)
        heapq.heapify(q)
        while q:
            rows = heapq.heappop(q)
            yield next(rows)
            if rows:
                heapq.heappush(q, rows)
    finally:
        shutil.rmtree(dir)

[docs]class Identical(object): """ Indicate that two rows are identical :ivar left: left row :ivar right: right row """ def __init__(self, l, r): self.left, self.right = l, r
[docs]class Different(object): """ Indicate that two rows are different :ivar left: left row :ivar right: right row :ivar diff: indices of columns that are different, exclude the index (first) column """ def __init__(self, l, r, diff): self.left, self.right = l, r self.diff = diff
[docs]class Unmatched(object): """ Indicate that this row is in one side only :ivar row: row content :ivar side: 0 means in left only, 1 means in right only """ def __init__(self, row, side): self.row = row self.side = side
def _compare_row(l, r): if r is None: return Unmatched(l, 0) if l is None: return Unmatched(r, 1) assert len(l) == len(r) diff = [] for i in range(1, len(l)): if l[i] != r[i]: diff.append(i-1) return Different(l, r, diff) if diff else Identical(l, r)
[docs]def compare(left, right, iterator=False, sort=True): """ Compare 2 data sets :param left: data set to be compared on left side :type left: DataFrame or iterable of DataFrame :param right: data set to be compared on right side :type right: DataFrame or iterable of DataFrame :param bool iterator: whether *left* and *right* are iterable or single DataFrames :param bool sort: whether sort the inputs before comparing :return: iterator of diff results, which consist of :class:`Identical`, :class:`Different`, or :class:`Unmatched` """ def to_iter(df): if iterator: dfs = df if sort: return BufferedIterator(_external_sort(dfs)) else: return BufferedIterator(row for df in dfs for row in df.itertuples()) else: if sort: df = df.sort_index() return BufferedIterator(df.itertuples()) left, right = to_iter(left), to_iter(right) while left and right: index = min(left.head()[0], right.head()[0]) rows_l, rows_r = [], [] while left and left.head()[0] == index: rows_l.append(next(left)) while right and right.head()[0] == index: rows_r.append(next(right)) for l, r in zip_longest(rows_l, rows_r): yield _compare_row(l, r) while left: yield _compare_row(next(left), None) while right: yield _compare_row(None, next(right))
if __name__ == '__main__' and '__file__' in globals(): pass