Source code for fsspec.implementations.webhdfs

# https://hadoop.apache.org/docs/r1.0.4/webhdfs.html

import logging
import secrets
import shutil
import uuid
from contextlib import suppress
from urllib.parse import quote

import requests

from ..spec import AbstractBufferedFile, AbstractFileSystem
from ..utils import infer_storage_options

logger = logging.getLogger("webhdfs")


[docs]class WebHDFS(AbstractFileSystem): """ Interface to HDFS over HTTP using the WebHDFS API. Supports also HttpFS gateways. Three auth mechanisms are supported: insecure: no auth is done, and the user is assumed to be whoever they say they are (parameter `user`), or a predefined value such as "dr.who" if not given spnego: when kerberos authentication is enabled, auth is negotiated by requests_kerberos https://github.com/requests/requests-kerberos . This establishes a session based on existing kinit login and/or specified principal/password; paraneters are passed with ``kerb_kwargs`` token: uses an existing Hadoop delegation token from another secured service. Indeed, this client can also generate such tokens when not insecure. Note that tokens expire, but can be renewed (by a previously specified user) and may allow for proxying. """ tempdir = "/tmp" protocol = "webhdfs", "webHDFS"
[docs] def __init__( self, host, port=50070, kerberos=False, token=None, user=None, proxy_to=None, kerb_kwargs=None, data_proxy=None, use_https=False, **kwargs, ): """ Parameters ---------- host: str Name-node address port: int Port for webHDFS kerberos: bool Whether to authenticate with kerberos for this connection token: str or None If given, use this token on every call to authenticate. A user and user-proxy may be encoded in the token and should not be also given user: str or None If given, assert the user name to connect with proxy_to: str or None If given, the user has the authority to proxy, and this value is the user in who's name actions are taken kerb_kwargs: dict Any extra arguments for HTTPKerberosAuth, see `<https://github.com/requests/requests-kerberos/blob/master/requests_kerberos/kerberos_.py>`_ data_proxy: dict, callable or None If given, map data-node addresses. This can be necessary if the HDFS cluster is behind a proxy, running on Docker or otherwise has a mismatch between the host-names given by the name-node and the address by which to refer to them from the client. If a dict, maps host names `host->data_proxy[host]`; if a callable, full URLs are passed, and function must conform to `url->data_proxy(url)`. use_https: bool Whether to connect to the Name-node using HTTPS instead of HTTP kwargs """ if self._cached: return super().__init__(**kwargs) self.url = "{protocol}://{host}:{port}/webhdfs/v1".format( protocol="https" if use_https else "http", host=host, port=port ) self.kerb = kerberos self.kerb_kwargs = kerb_kwargs or {} self.pars = {} self.proxy = data_proxy or {} if token is not None: if user is not None or proxy_to is not None: raise ValueError( "If passing a delegation token, must not set " "user or proxy_to, as these are encoded in the" " token" ) self.pars["delegation"] = token if user is not None: self.pars["user.name"] = user if proxy_to is not None: self.pars["doas"] = proxy_to if kerberos and user is not None: raise ValueError( "If using Kerberos auth, do not specify the " "user, this is handled by kinit." ) self._connect()
def _connect(self): self.session = requests.Session() if self.kerb: from requests_kerberos import HTTPKerberosAuth self.session.auth = HTTPKerberosAuth(**self.kerb_kwargs) def _call(self, op, method="get", path=None, data=None, redirect=True, **kwargs): url = self.url + quote(path or "") args = kwargs.copy() args.update(self.pars) args["op"] = op.upper() logger.debug("sending %s with %s", url, method) out = self.session.request( method=method.upper(), url=url, params=args, data=data, allow_redirects=redirect, ) if out.status_code in [400, 401, 403, 404, 500]: try: err = out.json() msg = err["RemoteException"]["message"] exp = err["RemoteException"]["exception"] except (ValueError, KeyError): pass else: if exp in ["IllegalArgumentException", "UnsupportedOperationException"]: raise ValueError(msg) elif exp in ["SecurityException", "AccessControlException"]: raise PermissionError(msg) elif exp in ["FileNotFoundException"]: raise FileNotFoundError(msg) else: raise RuntimeError(msg) out.raise_for_status() return out def _open( self, path, mode="rb", block_size=None, autocommit=True, replication=None, permissions=None, **kwargs, ): """ Parameters ---------- path: str File location mode: str 'rb', 'wb', etc. block_size: int Client buffer size for read-ahead or write buffer autocommit: bool If False, writes to temporary file that only gets put in final location upon commit replication: int Number of copies of file on the cluster, write mode only permissions: str or int posix permissions, write mode only kwargs Returns ------- WebHDFile instance """ block_size = block_size or self.blocksize return WebHDFile( self, path, mode=mode, block_size=block_size, tempdir=self.tempdir, autocommit=autocommit, replication=replication, permissions=permissions, ) @staticmethod def _process_info(info): info["type"] = info["type"].lower() info["size"] = info["length"] return info @classmethod def _strip_protocol(cls, path): return infer_storage_options(path)["path"] @staticmethod def _get_kwargs_from_urls(urlpath): out = infer_storage_options(urlpath) out.pop("path", None) out.pop("protocol", None) if "username" in out: out["user"] = out.pop("username") return out def info(self, path): out = self._call("GETFILESTATUS", path=path) info = out.json()["FileStatus"] info["name"] = path return self._process_info(info) def ls(self, path, detail=False): out = self._call("LISTSTATUS", path=path) infos = out.json()["FileStatuses"]["FileStatus"] for info in infos: self._process_info(info) info["name"] = path.rstrip("/") + "/" + info["pathSuffix"] if detail: return sorted(infos, key=lambda i: i["name"]) else: return sorted(info["name"] for info in infos) def content_summary(self, path): """Total numbers of files, directories and bytes under path""" out = self._call("GETCONTENTSUMMARY", path=path) return out.json()["ContentSummary"] def ukey(self, path): """Checksum info of file, giving method and result""" out = self._call("GETFILECHECKSUM", path=path, redirect=False) if "Location" in out.headers: location = self._apply_proxy(out.headers["Location"]) out2 = self.session.get(location) out2.raise_for_status() return out2.json()["FileChecksum"] else: out.raise_for_status() return out.json()["FileChecksum"] def home_directory(self): """Get user's home directory""" out = self._call("GETHOMEDIRECTORY") return out.json()["Path"] def get_delegation_token(self, renewer=None): """Retrieve token which can give the same authority to other uses Parameters ---------- renewer: str or None User who may use this token; if None, will be current user """ if renewer: out = self._call("GETDELEGATIONTOKEN", renewer=renewer) else: out = self._call("GETDELEGATIONTOKEN") t = out.json()["Token"] if t is None: raise ValueError("No token available for this user/security context") return t["urlString"] def renew_delegation_token(self, token): """Make token live longer. Returns new expiry time""" out = self._call("RENEWDELEGATIONTOKEN", method="put", token=token) return out.json()["long"] def cancel_delegation_token(self, token): """Stop the token from being useful""" self._call("CANCELDELEGATIONTOKEN", method="put", token=token) def chmod(self, path, mod): """Set the permission at path Parameters ---------- path: str location to set (file or directory) mod: str or int posix epresentation or permission, give as oct string, e.g, '777' or 0o777 """ self._call("SETPERMISSION", method="put", path=path, permission=mod) def chown(self, path, owner=None, group=None): """Change owning user and/or group""" kwargs = {} if owner is not None: kwargs["owner"] = owner if group is not None: kwargs["group"] = group self._call("SETOWNER", method="put", path=path, **kwargs) def set_replication(self, path, replication): """ Set file replication factor Parameters ---------- path: str File location (not for directories) replication: int Number of copies of file on the cluster. Should be smaller than number of data nodes; normally 3 on most systems. """ self._call("SETREPLICATION", path=path, method="put", replication=replication) def mkdir(self, path, **kwargs): self._call("MKDIRS", method="put", path=path) def makedirs(self, path, exist_ok=False): if exist_ok is False and self.exists(path): raise FileExistsError(path) self.mkdir(path) def mv(self, path1, path2, **kwargs): self._call("RENAME", method="put", path=path1, destination=path2) def rm(self, path, recursive=False, **kwargs): self._call( "DELETE", method="delete", path=path, recursive="true" if recursive else "false", ) def rm_file(self, path, **kwargs): self.rm(path) def cp_file(self, lpath, rpath, **kwargs): with self.open(lpath) as lstream: tmp_fname = "/".join([self._parent(rpath), f".tmp.{secrets.token_hex(16)}"]) # Perform an atomic copy (stream to a temporary file and # move it to the actual destination). try: with self.open(tmp_fname, "wb") as rstream: shutil.copyfileobj(lstream, rstream) self.mv(tmp_fname, rpath) except BaseException: # noqa with suppress(FileNotFoundError): self.rm(tmp_fname) raise def _apply_proxy(self, location): if self.proxy and callable(self.proxy): location = self.proxy(location) elif self.proxy: # as a dict for k, v in self.proxy.items(): location = location.replace(k, v, 1) return location
class WebHDFile(AbstractBufferedFile): """A file living in HDFS over webHDFS""" def __init__(self, fs, path, **kwargs): super().__init__(fs, path, **kwargs) kwargs = kwargs.copy() if kwargs.get("permissions", None) is None: kwargs.pop("permissions", None) if kwargs.get("replication", None) is None: kwargs.pop("replication", None) self.permissions = kwargs.pop("permissions", 511) tempdir = kwargs.pop("tempdir") if kwargs.pop("autocommit", False) is False: self.target = self.path self.path = "/".join([tempdir, str(uuid.uuid4())]) def _upload_chunk(self, final=False): """Write one part of a multi-block file upload Parameters ========== final: bool This is the last block, so should complete file, if self.autocommit is True. """ out = self.fs.session.post( self.location, data=self.buffer.getvalue(), headers={"content-type": "application/octet-stream"}, ) out.raise_for_status() return True def _initiate_upload(self): """ Create remote file/upload """ kwargs = self.kwargs.copy() if "a" in self.mode: op, method = "APPEND", "POST" else: op, method = "CREATE", "PUT" kwargs["overwrite"] = "true" out = self.fs._call(op, method, self.path, redirect=False, **kwargs) location = self.fs._apply_proxy(out.headers["Location"]) if "w" in self.mode: # create empty file to append to out2 = self.fs.session.put( location, headers={"content-type": "application/octet-stream"} ) out2.raise_for_status() self.location = location.replace("CREATE", "APPEND") def _fetch_range(self, start, end): start = max(start, 0) end = min(self.size, end) if start >= end or start >= self.size: return b"" out = self.fs._call( "OPEN", path=self.path, offset=start, length=end - start, redirect=False ) out.raise_for_status() if "Location" in out.headers: location = out.headers["Location"] out2 = self.fs.session.get(self.fs._apply_proxy(location)) return out2.content else: return out.content def commit(self): self.fs.mv(self.path, self.target) def discard(self): self.fs.rm(self.path)