Source code for fsspec.callbacks

[docs]class Callback: """ Base class and interface for callback mechanism This class can be used directly for monitoring file transfers by providing ``callback=Callback(hooks=...)`` (see the ``hooks`` argument, below), or subclassed for more specialised behaviour. Parameters ---------- size: int (optional) Nominal quantity for the value that corresponds to a complete transfer, e.g., total number of tiles or total number of bytes value: int (0) Starting internal counter value hooks: dict or None A dict of named functions to be called on each update. The signature of these must be ``f(size, value, **kwargs)`` """ def __init__(self, size=None, value=0, hooks=None, **kwargs): self.size = size self.value = value self.hooks = hooks or {} self.kw = kwargs
[docs] def set_size(self, size): """ Set the internal maximum size attribute Usually called if not initially set at instantiation. Note that this triggers a ``call()``. Parameters ---------- size: int """ self.size = size self.call()
[docs] def absolute_update(self, value): """ Set the internal value state Triggers ``call()`` Parameters ---------- value: int """ self.value = value self.call()
[docs] def relative_update(self, inc=1): """ Delta increment the internal counter Triggers ``call()`` Parameters ---------- inc: int """ self.value += inc self.call()
[docs] def call(self, hook_name=None, **kwargs): """ Execute hook(s) with current state Each function is passed the internal size and current value Parameters ---------- hook_name: str or None If given, execute on this hook kwargs: passed on to (all) hook(s) """ if not self.hooks: return kw = self.kw.copy() kw.update(kwargs) if hook_name: if hook_name not in self.hooks: return return self.hooks[hook_name](self.size, self.value, **kw) for hook in self.hooks.values() or []: hook(self.size, self.value, **kw)
[docs] def wrap(self, iterable): """ Wrap an iterable to call ``relative_update`` on each iterations Parameters ---------- iterable: Iterable The iterable that is being wrapped """ for item in iterable: self.relative_update() yield item
[docs] def branch(self, path_1, path_2, kwargs): """ Set callbacks for child transfers If this callback is operating at a higher level, e.g., put, which may trigger transfers that can also be monitored. The passed kwargs are to be *mutated* to add ``callback=``, if this class supports branching to children. Parameters ---------- path_1: str Child's source path path_2: str Child's destination path kwargs: dict arguments passed to child method, e.g., put_file. Returns ------- """ return None
def no_op(self, *_, **__): pass def __getattr__(self, item): """ If undefined methods are called on this class, nothing happens """ return self.no_op
[docs] @classmethod def as_callback(cls, maybe_callback=None): """Transform callback=... into Callback instance For the special value of ``None``, return the global instance of ``NoOpCallback``. This is an alternative to including ``callback=_DEFAULT_CALLBACK`` directly in a method signature. """ if maybe_callback is None: return _DEFAULT_CALLBACK return maybe_callback
[docs]class NoOpCallback(Callback): """ This implementation of Callback does exactly nothing """
[docs] def call(self, *args, **kwargs): return None
[docs]class DotPrinterCallback(Callback): """ Simple example Callback implementation Almost identical to Callback with a hook that prints a char; here we demonstrate how the outer layer may print "#" and the inner layer "." """ def __init__(self, chr_to_print="#", **kwargs): self.chr = chr_to_print super().__init__(**kwargs)
[docs] def branch(self, path_1, path_2, kwargs): """Mutate kwargs to add new instance with different print char""" kwargs["callback"] = DotPrinterCallback(".")
[docs] def call(self, **kwargs): """Just outputs a character""" print(self.chr, end="")
[docs]class TqdmCallback(Callback): """ A callback to display a progress bar using tqdm Parameters ---------- tqdm_kwargs : dict, (optional) Any argument accepted by the tqdm constructor. See the `tqdm doc <https://tqdm.github.io/docs/tqdm/#__init__>`_. Will be forwarded to tqdm. Examples -------- >>> import fsspec >>> from fsspec.callbacks import TqdmCallback >>> fs = fsspec.filesystem("memory") >>> path2distant_data = "/your-path" >>> fs.upload( ".", path2distant_data, recursive=True, callback=TqdmCallback(), ) You can forward args to tqdm using the ``tqdm_kwargs`` parameter. >>> fs.upload( ".", path2distant_data, recursive=True, callback=TqdmCallback(tqdm_kwargs={"desc": "Your tqdm description"}), ) """ def __init__(self, tqdm_kwargs=None, *args, **kwargs): try: import tqdm self._tqdm = tqdm except ImportError as exce: raise ImportError( "Using TqdmCallback requires tqdm to be installed" ) from exce self._tqdm_kwargs = tqdm_kwargs or {} super().__init__(*args, **kwargs)
[docs] def set_size(self, size): self.tqdm = self._tqdm.tqdm(total=size, **self._tqdm_kwargs)
[docs] def relative_update(self, inc=1): self.tqdm.update(inc)
def __del__(self): self.tqdm.close() self.tqdm = None
_DEFAULT_CALLBACK = NoOpCallback()