import dask
from distributed.client import Client, _get_global_client
from distributed.worker import Worker
from fsspec import filesystem
from fsspec.spec import AbstractBufferedFile, AbstractFileSystem
from fsspec.utils import infer_storage_options
def _get_client(client):
if client is None:
return _get_global_client()
elif isinstance(client, Client):
return client
else:
# e.g., connection string
return Client(client)
def _in_worker():
return bool(Worker._instances)
[docs]
class DaskWorkerFileSystem(AbstractFileSystem):
"""View files accessible to a worker as any other remote file-system
When instances are run on the worker, uses the real filesystem. When
run on the client, they call the worker to provide information or data.
**Warning** this implementation is experimental, and read-only for now.
"""
[docs]
def __init__(
self, target_protocol=None, target_options=None, fs=None, client=None, **kwargs
):
super().__init__(**kwargs)
if not (fs is None) ^ (target_protocol is None):
raise ValueError(
"Please provide one of filesystem instance (fs) or"
" target_protocol, not both"
)
self.target_protocol = target_protocol
self.target_options = target_options
self.worker = None
self.client = client
self.fs = fs
self._determine_worker()
@staticmethod
def _get_kwargs_from_urls(path):
so = infer_storage_options(path)
if "host" in so and "port" in so:
return {"client": f"{so['host']}:{so['port']}"}
else:
return {}
def _determine_worker(self):
if _in_worker():
self.worker = True
if self.fs is None:
self.fs = filesystem(
self.target_protocol, **(self.target_options or {})
)
else:
self.worker = False
self.client = _get_client(self.client)
self.rfs = dask.delayed(self)
def mkdir(self, *args, **kwargs):
if self.worker:
self.fs.mkdir(*args, **kwargs)
else:
self.rfs.mkdir(*args, **kwargs).compute()
def rm(self, *args, **kwargs):
if self.worker:
self.fs.rm(*args, **kwargs)
else:
self.rfs.rm(*args, **kwargs).compute()
def copy(self, *args, **kwargs):
if self.worker:
self.fs.copy(*args, **kwargs)
else:
self.rfs.copy(*args, **kwargs).compute()
def mv(self, *args, **kwargs):
if self.worker:
self.fs.mv(*args, **kwargs)
else:
self.rfs.mv(*args, **kwargs).compute()
def ls(self, *args, **kwargs):
if self.worker:
return self.fs.ls(*args, **kwargs)
else:
return self.rfs.ls(*args, **kwargs).compute()
def _open(
self,
path,
mode="rb",
block_size=None,
autocommit=True,
cache_options=None,
**kwargs,
):
if self.worker:
return self.fs._open(
path,
mode=mode,
block_size=block_size,
autocommit=autocommit,
cache_options=cache_options,
**kwargs,
)
else:
return DaskFile(
fs=self,
path=path,
mode=mode,
block_size=block_size,
autocommit=autocommit,
cache_options=cache_options,
**kwargs,
)
def fetch_range(self, path, mode, start, end):
if self.worker:
with self._open(path, mode) as f:
f.seek(start)
return f.read(end - start)
else:
return self.rfs.fetch_range(path, mode, start, end).compute()
class DaskFile(AbstractBufferedFile):
def __init__(self, mode="rb", **kwargs):
if mode != "rb":
raise ValueError('Remote dask files can only be opened in "rb" mode')
super().__init__(**kwargs)
def _upload_chunk(self, final=False):
pass
def _initiate_upload(self):
"""Create remote file/upload"""
pass
def _fetch_range(self, start, end):
"""Get the specified set of bytes from remote"""
return self.fs.fetch_range(self.path, self.mode, start, end)