Source code for fsspec.implementations.data

import base64
import io
from urllib.parse import unquote

from fsspec import AbstractFileSystem


[docs]class DataFileSystem(AbstractFileSystem): """A handy decoder for data-URLs Example ------- >>> with fsspec.open("data:,Hello%2C%20World%21") as f: ... print(f.read()) b"Hello, World!" """ protocol = "data"
[docs] def __init__(self, **kwargs): """No parameters for this filesystem""" super().__init__(**kwargs)
def cat_file(self, path, start=None, end=None, **kwargs): pref, data = path.split(",", 1) if pref.endswith("base64"): return base64.b64decode(data)[start:end] return unquote(data).encode()[start:end] def info(self, path, **kwargs): pref, name = path.split(",", 1) data = self.cat_file(path) mime = pref.split(":", 1)[1].split(";", 1)[0] return {"name": name, "size": len(data), "type": "file", "mimetype": mime} def _open( self, path, mode="rb", block_size=None, autocommit=True, cache_options=None, **kwargs, ): if "r" not in mode: raise ValueError("Read only filesystem") return io.BytesIO(self.cat_file(path))