Source code for fsspec.implementations.zip

from __future__ import absolute_import, division, print_function

import datetime
import zipfile

import fsspec
from fsspec.archive import AbstractArchiveFileSystem
from fsspec.utils import DEFAULT_BLOCK_SIZE


[docs]class ZipFileSystem(AbstractArchiveFileSystem): """Read contents of ZIP archive as a file-system Keeps file object open while instance lives. This class is pickleable, but not necessarily thread-safe """ root_marker = "" protocol = "zip" cachable = False
[docs] def __init__( self, fo="", mode="r", target_protocol=None, target_options=None, block_size=DEFAULT_BLOCK_SIZE, **kwargs, ): """ Parameters ---------- fo: str or file-like Contains ZIP, and must exist. If a str, will fetch file using :meth:`~fsspec.open_files`, which must return one file exactly. mode: str Currently, only 'r' accepted target_protocol: str (optional) If ``fo`` is a string, this value can be used to override the FS protocol inferred from a URL target_options: dict (optional) Kwargs passed when instantiating the target FS, if ``fo`` is a string. """ super().__init__(self, **kwargs) if isinstance(fo, str): fo = fsspec.open( fo, mode=mode + "b", protocol=target_protocol, **(target_options or {}) ) self.of = fo self.fo = fo.__enter__() # the whole instance is a context self.zip = zipfile.ZipFile(self.fo, mode=mode) self.block_size = block_size self.dir_cache = None
@classmethod def _strip_protocol(cls, path): # zip file paths are always relative to the archive root return super()._strip_protocol(path).lstrip("/") def __del__(self): if hasattr(self, "zip"): self.close() def close(self): "Commits any write changes to the file. Done on ``del`` too." self.zip.close() def _get_dirs(self): if self.dir_cache is None: files = self.zip.infolist() self.dir_cache = { dirname + "/": {"name": dirname + "/", "size": 0, "type": "directory"} for dirname in self._all_dirnames(self.zip.namelist()) } for z in files: f = {s: getattr(z, s, None) for s in zipfile.ZipInfo.__slots__} f.update( { "name": z.filename, "size": z.file_size, "type": ("directory" if z.is_dir() else "file"), } ) self.dir_cache[f["name"]] = f def pipe_file(self, path, value, **kwargs): # override upstream, because we know the exact file size in this case info = zipfile.ZipInfo(path, datetime.datetime.now().timetuple()) info.file_size = len(value) with self.zip.open(path, "w") as f: f.write(value) def _open( self, path, mode="rb", block_size=None, autocommit=True, cache_options=None, **kwargs, ): path = self._strip_protocol(path) out = self.zip.open(path, mode.strip("b")) if "r" in mode: info = self.info(path) out.size = info["size"] out.name = info["name"] return out