import asyncio
import logging
import weakref
from datetime import datetime
from urllib.parse import quote
import aiohttp
import yarl
from fsspec import FSTimeoutError
from fsspec.asyn import AsyncFileSystem, sync, sync_wrapper
from fsspec.callbacks import DEFAULT_CALLBACK
from fsspec.implementations.http import HTTPFile, HTTPStreamFile, get_client
from fsspec.utils import DEFAULT_BLOCK_SIZE
from urlpath import URL
logger = logging.getLogger(__name__)
DCACHE_FILE_TYPES = {"REGULAR": "file", "DIR": "directory"}
def _get_details(path, data):
"""Extract details from the metadata returned by the dCache API.
:param path: (str) file or directory path
:param data: (dict) metadata as provided by the API
:return: (dict) parsed metadata
"""
path = URL(path)
name = data.get("fileName") # fileName might be missing
name = path / name if name is not None else path
name = name.path
element_type = data.get("fileType")
element_type = DCACHE_FILE_TYPES.get(element_type, "other")
created = data.get("creationTime") # in ms
created = datetime.fromtimestamp(created / 1000.0)
modified = data.get("mtime") # in ms
modified = datetime.fromtimestamp(modified / 1000.0)
return dict(
name=name,
size=data.get("size"),
type=element_type,
created=created,
modified=modified,
)
def _encode(path):
return quote(path, safe="")
[docs]
class dCacheFileSystem(AsyncFileSystem): # noqa: N801
"""File system interface for a dCache storage instance.
Inspired by the fsspec HTTPFileSystem implementation, specific methods
interacts with the dCache system either via its API or via the WebDAV
protocol.
:param api_url: (str, optional) dCache API URL
:param webdav_url: (str, optional) WebDAV door URL
:param username: (str, optional) username for basic authentication
:param password: (str, optional) password for basic authentication
:param token: (str, optional) token for bearer-token authentication
:param client_kwargs: (dict, optional) keyword arguments passed on to
`aiohttp.ClientSession`, see
https://docs.aiohttp.org/en/stable/client_reference.html .
For example, `{'auth': aiohttp.BasicAuth('user', 'pass')}`
:param request_kwargs: (dict, optional) keyword arguments passed on to the
`request` method of `aiohttp.ClientSession` (also see `client_kwargs`)
:param block_size: (int, optional) when creating a file-like object, this
is the buffer size (in bytes) for reading and writing. when reading,
this is also the size downloaded in one request. if 0, will default to
raw requests file-like objects
:param asynchronous: (bool, optional) use in asynchronous mode
:param loop: (optional) if asynchronous, event loop where to run coroutines
:param batch_size: (int, optional) if asynchronous, number of coroutines to
submit/wait on simultaneously
:param encoded: use encoded strings when formatting URLs
:param storage_options: (dict, optional) keyword arguments passed on to the
super-class
"""
def __init__(
self,
api_url=None,
webdav_url=None,
username=None,
password=None,
token=None,
client_kwargs=None,
request_kwargs=None,
block_size=None,
asynchronous=False,
loop=None,
batch_size=None,
encoded=True,
**storage_options,
):
super().__init__(
self,
asynchronous=asynchronous,
loop=loop,
batch_size=batch_size,
**storage_options,
)
self.api_url = api_url
self.webdav_url = webdav_url
self.client_kwargs = {} if client_kwargs is None else client_kwargs
self.request_kwargs = {} if request_kwargs is None else request_kwargs
self.encoded = encoded
if (username is None) ^ (password is None):
raise ValueError("Username or password not provided")
if (username is not None) and (password is not None):
self.client_kwargs.update(auth=aiohttp.BasicAuth(username, password))
if token is not None:
if password is not None:
raise ValueError("Provide either token or username/password")
headers = self.client_kwargs.get("headers", {})
headers.update(Authorization=f"Bearer {token}")
self.client_kwargs.update(headers=headers)
block_size = DEFAULT_BLOCK_SIZE if block_size is None else block_size
self.block_size = block_size
self._session = None
if not asynchronous:
sync(self.loop, self.set_session)
[docs]
@staticmethod
def close_session(loop, session):
"""Close the client session."""
if loop is not None and loop.is_running():
try:
sync(loop, session.close, timeout=0.1)
return
except (TimeoutError, FSTimeoutError):
pass
connector = getattr(session, "_connector", None)
if connector is not None:
# close after loop is dead
connector._close()
[docs]
def encode_url(self, url):
"""Build URL, optionally encoded."""
return yarl.URL(url, encoded=self.encoded)
[docs]
async def set_session(self):
"""Set the client session, compatibly with both async/sync execution."""
if self._session is None:
self._session = await get_client(loop=self.loop, **self.client_kwargs)
if not self.asynchronous:
weakref.finalize(self, self.close_session, self.loop, self._session)
return self._session
@property
def api_url(self):
"""dCache API URL.""" # noqa: D403
if self._api_url is None:
raise ValueError("dCache API URL not set!")
return self._api_url
@api_url.setter
def api_url(self, api_url):
self._api_url = api_url
@property
def webdav_url(self):
"""WebDAV door URL."""
if self._webdav_url is None:
raise ValueError("WebDAV door not set!")
return self._webdav_url
@webdav_url.setter
def webdav_url(self, webdav_url):
self._webdav_url = webdav_url
@classmethod
def _strip_protocol(cls, path):
"""Turn path from fully-qualified to file-system-specific.
:param path: (str or list) target path(s)
:return: (str or list) target path(s) stripped from protocol and WebDAV
door
"""
if isinstance(path, list):
return [cls._strip_protocol(p) for p in path]
url = URL(path)
return url.path if "http" in url.scheme else path.split("://")[-1]
@classmethod
def _get_kwargs_from_urls(cls, path):
"""Extract keyword arguments encoded in the urlpath.
:param path: (str) target path
:return: (dict) arguments include the WebDAV door URL, if part of the
input target path
"""
webdav_url = cls._get_webdav_url(path)
return {"webdav_url": webdav_url} if webdav_url is not None else {}
@classmethod
def _get_webdav_url(cls, path):
"""Extract WebDAV access point from the urlpath(s).
:param path: (str or list) target path(s). If list, extract the URL
from the first element
:return: (str) WebDAV door URL
"""
if isinstance(path, list):
return cls._get_webdav_url(path[0])
url = URL(path)
return url.drive if "http" in url.scheme else None
async def _get_info(self, path, children=False, limit=None, **kwargs):
"""Request file or directory metadata to the API.
:param path: (str) target path
:param children: (bool, optional) if True, return metadata of the
children paths as well
:param limit: (int, optional) if provided and children is True, set
limit to the number of children returned
:param kwargs: (dict, optional) arguments passed on to requests
:return: (dict) path metadata
"""
url = URL(self.api_url) / "namespace" / _encode(path)
url = url.with_query(children=children)
if limit is not None and children:
url = url.add_query(limit=f"{limit}")
url = url.as_uri()
request_kwargs = self.request_kwargs.copy()
request_kwargs.update(kwargs)
session = await self.set_session()
async with session.get(url, **request_kwargs) as r:
if r.status == 404:
raise FileNotFoundError(url)
r.raise_for_status()
return await r.json()
async def _ls(self, path, detail=True, limit=None, **kwargs):
"""List path content.
:param path: (str) target path (file or directory)
:param detail: (bool, optional) if True, return a list of dictionaries
with the (children) path(s) info. If False, return a list of paths
:param limit: (int, optional) set the maximum number of children paths
returned to this value
:param kwargs: (dict, optional) arguments passed on to requests
:return: (list) if detail is True, list of dictionaries. List of
strings otherwise
"""
path = self._strip_protocol(path)
info = await self._get_info(path, children=True, limit=limit, **kwargs)
details = _get_details(path, info)
if details["type"] == "directory":
elements = info.get("children") or []
details = [_get_details(path, el) for el in elements]
else:
details = [details]
if detail:
return details
else:
return [d.get("name") for d in details]
ls = sync_wrapper(_ls)
async def _cat_file(self, path, start=None, end=None, **kwargs):
"""Get the content of a file.
:param path: (str) target file path
:param start: (int, optional) First byte for file read using range
request
:param end: (int, optional) Last byte for file read using range request
:param kwargs: (dict, optional) arguments passed on to requests
"""
webdav_url = self._get_webdav_url(path) or self.webdav_url
path = self._strip_protocol(path)
url = URL(webdav_url) / path
url = url.as_uri()
request_kwargs = self.request_kwargs.copy()
request_kwargs.update(kwargs)
if (start is None) ^ (end is None):
raise ValueError("Give start and end or neither")
if start is not None:
headers = request_kwargs.pop("headers", {}).copy()
headers["Range"] = f"bytes={start:d}-{end - 1:d}"
request_kwargs["headers"] = headers
session = await self.set_session()
async with session.get(url, **request_kwargs) as r:
if r.status == 404:
raise FileNotFoundError(url)
r.raise_for_status()
out = await r.read()
return out
async def _get_file(
self, rpath, lpath, chunk_size=5 * 2**20, callback=DEFAULT_CALLBACK, **kwargs
):
"""Copy file to local.
:param rpath: (str) remote target file path
:param lpath: (str) local file path where to copy the target file
:param chunk_size: (int, optional) number of bytes read in memory at
once
:param kwargs: (dict, optional) arguments passed on to requests
"""
webdav_url = self._get_webdav_url(rpath) or self.webdav_url
path = self._strip_protocol(rpath)
url = URL(webdav_url) / path
url = url.as_uri()
request_kwargs = self.request_kwargs.copy()
request_kwargs.update(kwargs)
session = await self.set_session()
async with session.get(url, **request_kwargs) as r:
if r.status == 404:
raise FileNotFoundError(rpath)
r.raise_for_status()
with open(lpath, "wb") as fd:
chunk = True
while chunk:
chunk = await r.content.read(chunk_size)
fd.write(chunk)
async def _put_file(self, lpath, rpath, callback=DEFAULT_CALLBACK, **kwargs):
"""Copy file from local.
:param rpath: (str) local target file path
:param lpath: (str) remote file path where to copy the target file
:param kwargs: (dict, optional) arguments passed on to requests
"""
webdav_url = self._get_webdav_url(rpath) or self.webdav_url
path = self._strip_protocol(rpath)
url = URL(webdav_url) / path
url = url.as_uri()
request_kwargs = self.request_kwargs.copy()
request_kwargs.update(kwargs)
session = await self.set_session()
with open(lpath, "rb") as fd:
r = await session.put(url, data=fd, **request_kwargs)
r.raise_for_status()
async def _cp_file(self, path1, path2, **kwargs):
raise NotImplementedError
async def _pipe_file(self, path, value, **kwargs):
"""Write data into a remote file.
:param path: (str) target file path
:param value: dict, list of tuples, bytes or file-like object to write
:param kwargs: (dict, optional) arguments passed on to requests
"""
webdav_url = self._get_webdav_url(path) or self.webdav_url
path = self._strip_protocol(path)
url = URL(webdav_url) / path
url = url.as_uri()
request_kwargs = self.request_kwargs.copy()
request_kwargs.update(kwargs)
session = await self.set_session()
async with session.put(url, data=value, **request_kwargs) as r:
r.raise_for_status()
async def _mv(self, path1, path2, **kwargs):
"""Rename path1 to path2.
:param path1: (str) source path
:param path2: (str) destination path
:param kwargs: (dict, optional) arguments passed on to requests
"""
path1 = self._strip_protocol(path1)
path2 = self._strip_protocol(path2)
url = URL(self.api_url) / "namespace" / _encode(path1)
url = url.as_uri()
data = dict(action="mv", destination=path2)
request_kwargs = self.request_kwargs.copy()
request_kwargs.update(kwargs)
session = await self.set_session()
async with session.post(url, json=data, **request_kwargs) as r:
if r.status == 404:
raise FileNotFoundError(url)
r.raise_for_status()
return await r.json()
mv = sync_wrapper(_mv)
async def _rm_file(self, path, **kwargs):
"""Remove file or directory (must be empty).
:param path: (str) target path
:param kwargs: (dict, optional) arguments passed on to requests
"""
url = URL(self.api_url) / "namespace" / _encode(path)
url = url.as_uri()
request_kwargs = self.request_kwargs.copy()
request_kwargs.update(kwargs)
session = await self.set_session()
async with session.delete(url, **request_kwargs) as r:
if r.status == 404:
raise FileNotFoundError(url)
r.raise_for_status()
async def _rm(self, path, recursive=False, **kwargs):
"""Remove file or directory tree.
:param path: (str) target path
:param recursive: (bool, optional) if True, and the target path is a
directory, remove all subdirectories and their files
:param kwargs: (dict, optional) arguments passed on to requests
"""
path = await self._expand_path(path, recursive=recursive)
# Delete elements from branches towards root, awaiting tasks
for p in reversed(path):
await asyncio.gather(self._rm_file(p, **kwargs))
rm = sync_wrapper(_rm)
async def _info(self, path, **kwargs):
"""Give details about a file or a directory.
:param path: (str) target path
:param kwargs: (dict, optional) arguments passed on to requests
:return: (dict) path metadata
"""
path = self._strip_protocol(path)
info = await self._get_info(path, **kwargs)
return _get_details(path, info)
info = sync_wrapper(_info)
[docs]
def created(self, path):
"""Date and time in which the path was created.
:param path: (str) target path
:return: (datetime.datetime) time of creation
"""
return self.info(path).get("created")
[docs]
def modified(self, path):
"""Date and time in which the path was last modified.
:param path: (str) target path
:return: (datetime.datetime) time of last modification
"""
return self.info(path).get("modified")
def _open(self, path, mode="rb", block_size=None, request_kwargs=None, **kwargs):
"""Create a file-like object.
:param path: (str) target file path
:param mode: (string, optional) choose between "r", "rb", "w", and "wb"
:param block_size: (int, optional) bytes to download in one request;
use instance value if None. If zero, will return a streaming
file-like object
:param request_kwargs: (dict, optional) arguments passed on to requests
:param kwargs: (dict, optional) keyword arguments passed on to the
super-class
:return: (dCacheFile or dCacheStreamFile) file-like object
"""
if mode not in {"rb", "wb"}:
raise NotImplementedError
block_size = self.block_size if block_size is None else block_size
rkw = self.request_kwargs.copy()
request_kwargs = {} if request_kwargs is None else request_kwargs
rkw.update(request_kwargs)
session = sync(self.loop, self.set_session)
if block_size:
return dCacheFile(
self,
path,
mode=mode,
block_size=block_size,
request_kwargs=rkw,
asynchronous=self.asynchronous,
session=session,
loop=self.loop,
**kwargs,
)
else:
return dCacheStreamFile(
self,
path,
mode=mode,
request_kwargs=rkw,
asynchronous=self.asynchronous,
session=session,
loop=self.loop,
**kwargs,
)
[docs]
def open(self, path, mode="rb", **kwargs):
"""Return a file-like object from the filesystem.
:param path: (str) target file path
:param mode: (str, optional) choose between "r", "rb", "w", and "wb"
:param kwargs: (dict, optional) keyword arguments passed on to the
super-class
:return: (dCacheFile or dCacheStreamFile) file-like object
"""
self.webdav_url = self._get_webdav_url(path) or self.webdav_url
return super().open(path=path, mode=mode, **kwargs)
[docs]
class dCacheFile(HTTPFile): # noqa: N801
"""A file-like object pointing to a target file on dCache.
Supports reading, with read-ahead of a pre-determined block-size, and
writing, with the file content being first cached and then uploaded to
upon file closure.
:param fs: (dCacheFileSystem) file-system instance creating the file
:param url: (str) target file path
:param mode: (str, optional) choose between "r", "rb", "w", and "wb"
:param block_size: (int, optional) The amount of read-ahead to do, in
bytes. Default is 5MB, or the value configured for the FileSystem
creating this file
:param request_kwargs: (dict, optional) arguments passed on to requests
:param asynchronous: (bool, optional) use in asynchronous mode
:param session: (aiohttp.ClientSession, optional) All calls will be made
within this session, to avoid restarting connections
:param loop: (optional) if asynchronous, event loop where to run coroutines
:param kwargs: (dict, optional) arguments passed on to the super-class
"""
def __init__(
self,
fs,
url,
mode="rb",
block_size=None,
request_kwargs=None,
asynchronous=False,
session=None,
loop=None,
**kwargs,
):
path = fs._strip_protocol(url)
url = URL(fs.webdav_url) / path
self.url = url.as_uri()
self.asynchronous = asynchronous
self.session = session
self.loop = loop
self.request_kwargs = {} if request_kwargs is None else request_kwargs
if mode not in {"rb", "wb"}:
raise ValueError
super(HTTPFile, self).__init__(
fs=fs, path=path, mode=mode, block_size=block_size, **kwargs
)
[docs]
def flush(self, force=False):
"""Write buffered data to remote file.
Since byte-range writing is not supported, the file content is only written when
`force` is True (i.e. when the file-like object is closed).
:param force: (bool, optional) Force writing of the remote file.
Disallows further writing to this file.
"""
if self.closed:
raise ValueError("Flush on closed file")
if force and self.forced:
raise ValueError("Force flush cannot be called more than once")
if force:
self.write_chunked()
self.forced = True
async def _write_chunked(self):
"""Write buffered data to remote file."""
self.buffer.seek(0)
r = await self.session.put(self.url, data=self.buffer, **self.request_kwargs)
async with r:
r.raise_for_status()
write_chunked = sync_wrapper(_write_chunked)
[docs]
def close(self):
"""Close file. Finalize writes, discard cache."""
super(HTTPFile, self).close()
[docs]
class dCacheStreamFile(HTTPStreamFile): # noqa: N801
"""A streaming file-like object pointing to a target file on dCache.
Supports reading and writing by opening request streams to the remote file.
:param fs: (dCacheFileSystem) file-system instance creating the file
:param url: (str) target file path
:param mode: (str, optional) choose between "r", "rb", "w", and "wb"
:param request_kwargs: (dict, optional) arguments passed on to requests
:param asynchronous: (bool, optional) use in asynchronous mode
:param session: (aiohttp.ClientSession, optional) All calls will be made
within this session, to avoid restarting connections
:param loop: (optional) if asynchronous, event loop where to run coroutines
:param kwargs: (dict, optional) arguments passed on to the super-class
"""
def __init__(
self,
fs,
url,
mode="rb",
request_kwargs=None,
asynchronous=False,
session=None,
loop=None,
**kwargs,
):
path = fs._strip_protocol(url)
url = URL(fs.webdav_url) / path
self.url = url.as_uri()
self.details = {"name": self.url, "size": None}
self.request_kwargs = {} if request_kwargs is None else request_kwargs
self.asynchronous = asynchronous
self.session = session
self.loop = loop
super(HTTPStreamFile, self).__init__(
fs=fs, path=path, mode=mode, block_size=0, cache_type="none", **kwargs
)
if self.mode == "rb":
async def get():
r = await self.session.get(self.url, **self.request_kwargs)
return r
self.r = sync(self.loop, get)
self.r.raise_for_status()
elif self.mode == "wb":
pass
else:
raise ValueError
[docs]
def write(self, data):
"""Write data to remote file.
Can be called only once, consecutive calls will overwrite the file.
:param data: dict, list of tuples, bytes or file-like object to write
"""
if self.mode != "wb":
raise ValueError("File not in write mode")
async def put():
r = await self.session.put(self.url, data=data, **self.request_kwargs)
return r
self.r = sync(self.loop, put)
self.r.raise_for_status()
[docs]
def read(self, num=-1):
"""Read bytes from file.
:param num: (int, optional) Read up this many bytes. If negative, read
all content to end of file.
:return: bytes read from the target file
"""
if self.mode != "rb":
raise ValueError("File not in read mode")
return super().read(num=num)