Source code for fsspec.compression

"""Helper functions for a standard streaming compression API"""
from bz2 import BZ2File
from zipfile import ZipFile

import fsspec.utils
from fsspec.spec import AbstractBufferedFile

def noop_file(file, mode, **kwargs):
    return file

# TODO: files should also be available as contexts
# should be functions of the form func(infile, mode=, **kwargs) -> file-like
compr = {None: noop_file}

def register_compression(name, callback, extensions, force=False):
    """Register an "inferable" file compression type.

    Registers transparent file compression type for use with
    Compression can be specified by name in open, or "infer"-ed for any files
    ending with the given extensions.

        name: (str) The compression type name. Eg. "gzip".
        callback: A callable of form (infile, mode, **kwargs) -> file-like.
            Accepts an input file-like object, the target mode and kwargs.
            Returns a wrapped file-like object.
        extensions: (str, Iterable[str]) A file extension, or list of file
            extensions for which to infer this compression scheme. Eg. "gz".
        force: (bool) Force re-registration of compression type or extensions.

        ValueError: If name or extensions already registered, and not force.

    if isinstance(extensions, str):
        extensions = [extensions]

    # Validate registration
    if name in compr and not force:
        raise ValueError("Duplicate compression registration: %s" % name)

    for ext in extensions:
        if ext in fsspec.utils.compressions and not force:
            raise ValueError(
                "Duplicate compression file extension: %s (%s)" % (ext, name)

    compr[name] = callback

    for ext in extensions:
        fsspec.utils.compressions[ext] = name

def unzip(infile, mode="rb", filename=None, **kwargs):
    if "r" not in mode:
        filename = filename or "file"
        z = ZipFile(infile, mode="w", **kwargs)
        fo =, mode="w")
        fo.close = lambda closer=fo.close: closer() or z.close()
        return fo
    z = ZipFile(infile)
    if filename is None:
        filename = z.namelist()[0]
    return, mode="r", **kwargs)

register_compression("zip", unzip, "zip")
register_compression("bz2", BZ2File, "bz2")

try:  # pragma: no cover
    from isal import igzip

    def isal(infile, mode="rb", **kwargs):
        return igzip.IGzipFile(fileobj=infile, mode=mode, **kwargs)

    register_compression("gzip", isal, "gz")
except ImportError:
    from gzip import GzipFile

        "gzip", lambda f, **kwargs: GzipFile(fileobj=f, **kwargs), "gz"

    from lzma import LZMAFile

    register_compression("lzma", LZMAFile, "xz")
    register_compression("xz", LZMAFile, "xz", force=True)
except ImportError:

    import lzmaffi

    register_compression("lzma", lzmaffi.LZMAFile, "xz", force=True)
    register_compression("xz", lzmaffi.LZMAFile, "xz", force=True)
except ImportError:

class SnappyFile(AbstractBufferedFile):
    def __init__(self, infile, mode, **kwargs):
        import snappy

            fs=None, path="snappy", mode=mode.strip("b") + "b", size=999999999, **kwargs
        self.infile = infile
        if "r" in mode:
            self.codec = snappy.StreamDecompressor()
            self.codec = snappy.StreamCompressor()

    def _upload_chunk(self, final=False):
        out = self.codec.add_chunk(
        return True

    def seek(self, loc, whence=0):
        raise NotImplementedError("SnappyFile is not seekable")

    def seekable(self):
        return False

    def _fetch_range(self, start, end):
        """Get the specified set of bytes from remote"""
        data = - start)
        return self.codec.decompress(data)

    import snappy

    # Snappy may use the .sz file extension, but this is not part of the
    # standard implementation.
    register_compression("snappy", SnappyFile, [])

except (ImportError, NameError):

    import lz4.frame

    register_compression("lz4",, "lz4")
except ImportError:

    import zstandard as zstd

    def zstandard_file(infile, mode="rb"):
        if "r" in mode:
            cctx = zstd.ZstdDecompressor()
            return cctx.stream_reader(infile)
            cctx = zstd.ZstdCompressor(level=10)
            return cctx.stream_writer(infile)

    register_compression("zstd", zstandard_file, "zst")
except ImportError:

[docs]def available_compressions(): """Return a list of the implemented compressions.""" return list(compr)