Async

fsspec supports asynchronous operations on certain implementations. This allows for concurrent calls within bulk operations such as cat (fetch the contents of many files at once) even from normal code, and for the direct use of fsspec in async code without blocking. Async implementations derive from the class fsspec.async.AsyncFileSystem. The class attribute async_impl can be used to test whether an implementation is async of not.

AsyncFileSystem contains async def coroutine versions of the methods of AbstractFileSystem. By convention, these methods are prefixed with “_” to indicate that they are not to called directly in normal code, only when you know what you are doing. In most cases, the code is identical or slightly modified by replacing sync calls with await calls to async functions.

The only async implementation built into fsspec is HTTPFileSystem.

Synchronous API

The methods of AbstractFileSystem are available and can be called from normal code. They call and wait on the corresponding async function. The work is carried out in a separate threads, so if there are many fsspec operations in flight at once, launched from many threads, they will still all be processed on the same IO-dedicated thread.

Most users should not be aware that their code is running async.

Note that the sync functions are wrapped using sync_wrapper, which copies the docstrings from AbstractFileSystem, unless they are explicitly given in the implementation.

Example:

fs = fsspec.filesystem("http")
out = fs.cat([url1, url2, url3])  # fetches data concurrently

Using from Async

File system instances can be created with asynchronous=True. This implies that the instantiation is happening within a coroutine, so the various async method can be called directly with await, as is normal in async code.

Note that, because __init__ is a blocking function, any creation of asynchronous resources will be deferred. You will normally need to explicitly await a coroutine to create them. Since garbage collection also happens in blocking code, you may wish to explicitly await resource destructors too. Example:

async def work_coroutine():
    fs = fsspec.filesystem("http", asynchronous=True)
    session = await fs.set_session()  # creates client
    out = await fs._cat([url1, url2, url3])  # fetches data concurrently
    await session.close()  # explicit destructor

asyncio.run(work_coroutine())

Bring your own loop

For the non-asynchronous case, fsspec will normally create an asyncio event loop on a specific thread. However, the calling application may prefer IO processes to run on a loop that is already around and running (in another thread). The loop needs to be asyncio compliant, but does not necessarily need to be an ayncio.events.AbstractEventLoop. Example:

loop = ...  # however a loop was made, running on another thread
fs = fsspec.filesystem("http", loop=loop)
out = fs.cat([url1, url2, url3])  # fetches data concurrently

Implementing new backends

Async file systems should derive from AsyncFileSystem, and implement the async def _* coroutines there. These functions will either have sync versions automatically generated is the name is in the async_methods list, or can be directly created using sync_wrapper.

class MyFileSystem(AsyncFileSystem):

    async def _my_method(self):
        ...

    my_method = sync_wrapper(_my_method)

These functions must not call methods or functions which themselves are synced, but should instead await other coroutines. Calling methods which do not require sync, such as _strip_protocol is fine.

Note that __init__, cannot be async, so it might need to allocate async resources using the sync function, but only if asynchronous=False. If it is True, you probably need to require the caller to await a coroutine that creates those resources. Similarly, any destructor (e.g., __del__) will run from normal code, and possibly after the loop has stopped/closed.

To call sync, you will need to pass the associated event loop, which will be available as the attribute .loop.

fsspec.asyn.AsyncFileSystem(*args, **kwargs)

Async file operations, default implementations

fsspec.asyn.sync(loop, func, *args[, timeout])

Make loop run coroutine until it returns.

fsspec.asyn.sync_wrapper(func[, obj])

Given a function, make so can be called in async or blocking contexts

fsspec.asyn.get_loop()

Create or return the default fsspec IO loop

class fsspec.asyn.AsyncFileSystem(*args, **kwargs)[source]

Async file operations, default implementations

Passes bulk operations to asyncio.gather for concurrent operation.

Implementations that have concurrent batch operations and/or async methods should inherit from this class instead of AbstractFileSystem. Docstrings are copied from the un-underscored method in AbstractFileSystem, if not given.

Attributes
loop
transaction

A context within which files are committed together upon exit

Methods

cat(path[, recursive, on_error])

Fetch (potentially multiple) paths' contents

cat_file(path[, start, end])

Get the content of a file

checksum(path)

Unique value for current version of file

clear_instance_cache()

Clear the cache of filesystem instances.

copy(path1, path2[, recursive, on_error])

Copy within two locations in the filesystem

cp(path1, path2, **kwargs)

Alias of AbstractFileSystem.copy.

created(path)

Return the created timestamp of a file as a datetime.datetime

current()

Return the most recently created FileSystem

delete(path[, recursive, maxdepth])

Alias of AbstractFileSystem.rm.

disk_usage(path[, total, maxdepth])

Alias of AbstractFileSystem.du.

download(rpath, lpath[, recursive])

Alias of AbstractFileSystem.get.

du(path[, total, maxdepth])

Space used by files within a path

end_transaction()

Finish write transaction, non-context version

exists(path, **kwargs)

Is there a file at the given path

expand_path(path[, recursive, maxdepth])

Turn one or more globs or directories into a list of all matching paths to files or directories.

find(path[, maxdepth, withdirs])

List all files below path.

from_json(blob)

Recreate a filesystem instance from JSON representation

get(rpath, lpath[, recursive, callback])

Copy file(s) to local.

get_file(rpath, lpath[, callback])

Copy single remote file to local

get_mapper(root[, check, create])

Create key/value store based on this file-system

glob(path, **kwargs)

Find files by glob-matching.

head(path[, size])

Get the first size bytes from file

info(path, **kwargs)

Give details of entry at path

invalidate_cache([path])

Discard any cached directory information

isdir(path)

Is this entry directory-like?

isfile(path)

Is this entry file-like?

lexists(path, **kwargs)

If there is a file at the given path (including broken links)

listdir(path[, detail])

Alias of AbstractFileSystem.ls.

ls(path[, detail])

List objects at path.

makedir(path[, create_parents])

Alias of AbstractFileSystem.mkdir.

makedirs(path[, exist_ok])

Recursively make directories

mkdir(path[, create_parents])

Create directory entry at path

mkdirs(path[, exist_ok])

Alias of AbstractFileSystem.makedirs.

modified(path)

Return the modified timestamp of a file as a datetime.datetime

move(path1, path2, **kwargs)

Alias of AbstractFileSystem.mv.

mv(path1, path2[, recursive, maxdepth])

Move file(s) from one location to another

open(path[, mode, block_size, cache_options])

Return a file-like object from the filesystem

pipe(path[, value])

Put value into path

pipe_file(path, value, **kwargs)

Set the bytes of given file

put(lpath, rpath[, recursive, callback])

Copy file(s) from local.

put_file(lpath, rpath[, callback])

Copy single file to remote

read_block(fn, offset, length[, delimiter])

Read a block of bytes from

rename(path1, path2, **kwargs)

Alias of AbstractFileSystem.mv.

rm(path[, recursive, maxdepth])

Delete files.

rm_file(path)

Delete a file

rmdir(path)

Remove a directory, if empty

sign(path[, expiration])

Create a signed URL representing the given path

size(path)

Size in bytes of file

start_transaction()

Begin write transaction for deferring files, non-context version

stat(path, **kwargs)

Alias of AbstractFileSystem.info.

tail(path[, size])

Get the last size bytes from file

to_json()

JSON representation of this filesystem instance

touch(path[, truncate])

Create empty file, or update timestamp

ukey(path)

Hash of file properties, to tell if it has changed

upload(lpath, rpath[, recursive])

Alias of AbstractFileSystem.put.

walk(path[, maxdepth])

Return all files belows path

cat_ranges

cp_file

fsspec.asyn.sync(loop, func, *args, timeout=None, **kwargs)[source]

Make loop run coroutine until it returns. Runs in other thread

fsspec.asyn.sync_wrapper(func, obj=None)[source]

Given a function, make so can be called in async or blocking contexts

Leave obj=None if defining within a class. Pass the instance if attaching as an attribute of the instance.

fsspec.asyn.get_loop()[source]

Create or return the default fsspec IO loop

The loop will be running on a separate thread.

fsspec.asyn.fsspec_loop()[source]

Temporarily switch the current event loop to the fsspec’s own loop, and then revert it back after the context gets terinated.