up follow livre

This commit is contained in:
Tykayn 2025-08-30 18:14:14 +02:00 committed by tykayn
parent 70a5c3465c
commit cffb31c1ef
12198 changed files with 2562132 additions and 35 deletions

View file

@ -0,0 +1,68 @@
"""Minimal, stdlib-only replacement for [`pyfilesystem2`][1] API for use by `fontTools.ufoLib`.
This package is a partial reimplementation of the `fs` package by Will McGugan, used under the
MIT license. See LICENSE.external for details.
Note this only exports a **subset** of the `pyfilesystem2` API, in particular the modules,
classes and functions that are currently used directly by `fontTools.ufoLib`.
It opportunistically tries to import the relevant modules from the upstream `fs` package
when this is available. Otherwise it falls back to the replacement modules within this package.
As of version 4.59.0, the `fonttools[ufo]` extra no longer requires the `fs` package, thus
this `fontTools.misc.filesystem` package is used by default.
Client code can either replace `import fs` with `from fontTools.misc import filesystem as fs`
if that happens to work (no guarantee), or they can continue to use `fs` but they will have
to specify it as an explicit dependency of their project.
[1]: https://github.com/PyFilesystem/pyfilesystem2
"""
from __future__ import annotations
try:
__import__("fs")
except ImportError:
from . import _base as base
from . import _copy as copy
from . import _errors as errors
from . import _info as info
from . import _osfs as osfs
from . import _path as path
from . import _subfs as subfs
from . import _tempfs as tempfs
from . import _tools as tools
from . import _walk as walk
from . import _zipfs as zipfs
_haveFS = False
else:
import fs.base as base
import fs.copy as copy
import fs.errors as errors
import fs.info as info
import fs.osfs as osfs
import fs.path as path
import fs.subfs as subfs
import fs.tempfs as tempfs
import fs.tools as tools
import fs.walk as walk
import fs.zipfs as zipfs
_haveFS = True
__all__ = [
"base",
"copy",
"errors",
"info",
"osfs",
"path",
"subfs",
"tempfs",
"tools",
"walk",
"zipfs",
]

View file

@ -0,0 +1,134 @@
from __future__ import annotations
import typing
from abc import ABC, abstractmethod
from ._copy import copy_dir, copy_file
from ._errors import (
DestinationExists,
DirectoryExpected,
FileExpected,
FilesystemClosed,
NoSysPath,
ResourceNotFound,
)
from ._path import dirname
from ._walk import BoundWalker
if typing.TYPE_CHECKING:
from typing import IO, Any, Collection, Iterator, Self, Type
from ._info import Info
from ._subfs import SubFS
class FS(ABC):
"""Abstract base class for custom filesystems."""
_closed: bool = False
@abstractmethod
def open(self, path: str, mode: str = "rb", **kwargs) -> IO[Any]: ...
@abstractmethod
def exists(self, path: str) -> bool: ...
@abstractmethod
def isdir(self, path: str) -> bool: ...
@abstractmethod
def isfile(self, path: str) -> bool: ...
@abstractmethod
def listdir(self, path: str) -> list[str]: ...
@abstractmethod
def makedir(self, path: str, recreate: bool = False) -> SubFS: ...
@abstractmethod
def makedirs(self, path: str, recreate: bool = False) -> SubFS: ...
@abstractmethod
def getinfo(self, path: str, namespaces: Collection[str] | None = None) -> Info: ...
@abstractmethod
def remove(self, path: str) -> None: ...
@abstractmethod
def removedir(self, path: str) -> None: ...
@abstractmethod
def removetree(self, path: str) -> None: ...
@abstractmethod
def movedir(self, src: str, dst: str, create: bool = False) -> None: ...
def getsyspath(self, path: str) -> str:
raise NoSysPath(f"the filesystem {self!r} has no system path")
def close(self):
self._closed = True
def isclosed(self) -> bool:
return self._closed
def __enter__(self) -> Self:
return self
def __exit__(self, exc_type, exc, tb):
self.close()
return False # never swallow exceptions
def check(self):
if self._closed:
raise FilesystemClosed(f"the filesystem {self!r} is closed")
def opendir(self, path: str, *, factory: Type[SubFS] | None = None) -> SubFS:
"""Return a subfilesystem rooted at `path`."""
if factory is None:
from ._subfs import SubFS
factory = SubFS
return factory(self, path)
def scandir(
self, path: str, namespaces: Collection[str] | None = None
) -> Iterator[Info]:
return (self.getinfo(f"{path}/{p}", namespaces) for p in self.listdir(path))
@property
def walk(self) -> BoundWalker:
return BoundWalker(self)
def readbytes(self, path: str) -> bytes:
with self.open(path, "rb") as f:
return f.read()
def writebytes(self, path: str, data: bytes):
with self.open(path, "wb") as f:
f.write(data)
def create(self, path: str, wipe: bool = False):
if not wipe and self.exists(path):
return False
with self.open(path, "wb"):
pass # 'touch' empty file
return True
def copy(self, src_path: str, dst_path: str, overwrite=False):
if not self.exists(src_path):
raise ResourceNotFound(f"{src_path!r} does not exist")
elif not self.isfile(src_path):
raise FileExpected(f"path {src_path!r} should be a file")
if not overwrite and self.exists(dst_path):
raise DestinationExists(f"destination {dst_path!r} already exists")
if not self.isdir(dirname(dst_path)):
raise DirectoryExpected(f"path {dirname(dst_path)!r} should be a directory")
copy_file(self, src_path, self, dst_path)
def copydir(self, src_path: str, dst_path: str, create=False):
if not create and not self.exists(dst_path):
raise ResourceNotFound(f"{dst_path!r} does not exist")
if not self.isdir(src_path):
raise DirectoryExpected(f"path {src_path!r} should be a directory")
copy_dir(self, src_path, self, dst_path)

View file

@ -0,0 +1,45 @@
from __future__ import annotations
import typing
from ._errors import IllegalDestination
from ._path import combine, frombase, isbase
from ._tools import copy_file_data
if typing.TYPE_CHECKING:
from ._base import FS
def copy_file(src_fs: FS, src_path: str, dst_fs: FS, dst_path: str):
if src_fs is dst_fs and src_path == dst_path:
raise IllegalDestination(f"cannot copy {src_path!r} to itself")
with src_fs.open(src_path, "rb") as src_file:
with dst_fs.open(dst_path, "wb") as dst_file:
copy_file_data(src_file, dst_file)
def copy_structure(
src_fs: FS,
dst_fs: FS,
src_root: str = "/",
dst_root: str = "/",
):
if src_fs is dst_fs and isbase(src_root, dst_root):
raise IllegalDestination(f"cannot copy {src_fs!r} to itself")
dst_fs.makedirs(dst_root, recreate=True)
for dir_path in src_fs.walk.dirs(src_root):
dst_fs.makedir(combine(dst_root, frombase(src_root, dir_path)), recreate=True)
def copy_dir(src_fs: FS, src_path: str, dst_fs: FS, dst_path: str):
copy_structure(src_fs, dst_fs, src_path, dst_path)
for file_path in src_fs.walk.files(src_path):
copy_path = combine(dst_path, frombase(src_path, file_path))
copy_file(src_fs, file_path, dst_fs, copy_path)
def copy_fs(src_fs: FS, dst_fs: FS):
copy_dir(src_fs, "/", dst_fs, "/")

View file

@ -0,0 +1,54 @@
class FSError(Exception):
pass
class CreateFailed(FSError):
pass
class FilesystemClosed(FSError):
pass
class MissingInfoNamespace(FSError):
pass
class NoSysPath(FSError):
pass
class OperationFailed(FSError):
pass
class IllegalDestination(OperationFailed):
pass
class ResourceError(FSError):
pass
class ResourceNotFound(ResourceError):
pass
class DirectoryExpected(ResourceError):
pass
class DirectoryNotEmpty(ResourceError):
pass
class FileExpected(ResourceError):
pass
class DestinationExists(ResourceError):
pass
class ResourceReadOnly(ResourceError):
pass

View file

@ -0,0 +1,75 @@
from __future__ import annotations
import typing
from datetime import datetime, timezone
from ._errors import MissingInfoNamespace
if typing.TYPE_CHECKING:
from collections.abc import Mapping
from typing import Any
def epoch_to_datetime(t: int | None) -> datetime | None:
"""Convert epoch time to a UTC datetime."""
if t is None:
return None
return datetime.fromtimestamp(t, tz=timezone.utc)
class Info:
__slots__ = ["raw", "namespaces"]
def __init__(self, raw_info: Mapping[str, Any]):
self.raw = raw_info
self.namespaces = frozenset(raw_info.keys())
def get(self, namespace: str, key: str, default: Any | None = None) -> Any | None:
try:
return self.raw[namespace].get(key, default)
except KeyError:
raise MissingInfoNamespace(f"Namespace {namespace!r} does not exist")
@property
def name(self) -> str:
return self.get("basic", "name")
@property
def is_dir(self) -> bool:
return self.get("basic", "is_dir")
@property
def is_file(self) -> bool:
return not self.is_dir
@property
def accessed(self) -> datetime | None:
return epoch_to_datetime(self.get("details", "accessed"))
@property
def modified(self) -> datetime | None:
return epoch_to_datetime(self.get("details", "modified"))
@property
def size(self) -> int | None:
return self.get("details", "size")
@property
def type(self) -> int | None:
return self.get("details", "type")
@property
def created(self) -> datetime | None:
return epoch_to_datetime(self.get("details", "created"))
@property
def metadata_changed(self) -> datetime | None:
return epoch_to_datetime(self.get("details", "metadata_changed"))
def __str__(self) -> str:
if self.is_dir:
return "<dir '{}'>".format(self.name)
else:
return "<file '{}'>".format(self.name)
__repr__ = __str__

View file

@ -0,0 +1,164 @@
from __future__ import annotations
import errno
import platform
import shutil
import stat
import typing
from os import PathLike
from pathlib import Path
from ._base import FS
from ._errors import (
CreateFailed,
DirectoryExpected,
DirectoryNotEmpty,
FileExpected,
IllegalDestination,
ResourceError,
ResourceNotFound,
)
from ._info import Info
from ._path import isbase
if typing.TYPE_CHECKING:
from collections.abc import Collection
from typing import IO, Any
from ._subfs import SubFS
_WINDOWS_PLATFORM = platform.system() == "Windows"
class OSFS(FS):
"""Filesystem for a directory on the local disk.
A thin layer on top of `pathlib.Path`.
"""
def __init__(self, root: str | PathLike, create: bool = False):
super().__init__()
self._root = Path(root).resolve()
if create:
self._root.mkdir(parents=True, exist_ok=True)
else:
if not self._root.is_dir():
raise CreateFailed(
f"unable to create OSFS: {root!r} does not exist or is not a directory"
)
def _abs(self, rel_path: str) -> Path:
self.check()
return (self._root / rel_path.strip("/")).resolve()
def open(self, path: str, mode: str = "rb", **kwargs) -> IO[Any]:
try:
return self._abs(path).open(mode, **kwargs)
except FileNotFoundError:
raise ResourceNotFound(f"No such file or directory: {path!r}")
def exists(self, path: str) -> bool:
return self._abs(path).exists()
def isdir(self, path: str) -> bool:
return self._abs(path).is_dir()
def isfile(self, path: str) -> bool:
return self._abs(path).is_file()
def listdir(self, path: str) -> list[str]:
return [p.name for p in self._abs(path).iterdir()]
def _mkdir(self, path: str, parents: bool = False, exist_ok: bool = False) -> SubFS:
self._abs(path).mkdir(parents=parents, exist_ok=exist_ok)
return self.opendir(path)
def makedir(self, path: str, recreate: bool = False) -> SubFS:
return self._mkdir(path, parents=False, exist_ok=recreate)
def makedirs(self, path: str, recreate: bool = False) -> SubFS:
return self._mkdir(path, parents=True, exist_ok=recreate)
def getinfo(self, path: str, namespaces: Collection[str] | None = None) -> Info:
path = self._abs(path)
if not path.exists():
raise ResourceNotFound(f"No such file or directory: {str(path)!r}")
info = {
"basic": {
"name": path.name,
"is_dir": path.is_dir(),
}
}
namespaces = namespaces or ()
if "details" in namespaces:
stat_result = path.stat()
details = info["details"] = {
"accessed": stat_result.st_atime,
"modified": stat_result.st_mtime,
"size": stat_result.st_size,
"type": stat.S_IFMT(stat_result.st_mode),
"created": getattr(stat_result, "st_birthtime", None),
}
ctime_key = "created" if _WINDOWS_PLATFORM else "metadata_changed"
details[ctime_key] = stat_result.st_ctime
return Info(info)
def remove(self, path: str):
path = self._abs(path)
try:
path.unlink()
except FileNotFoundError:
raise ResourceNotFound(f"No such file or directory: {str(path)!r}")
except OSError as e:
if path.is_dir():
raise FileExpected(f"path {str(path)!r} should be a file")
else:
raise ResourceError(f"unable to remove {str(path)!r}: {e}")
def removedir(self, path: str):
try:
self._abs(path).rmdir()
except NotADirectoryError:
raise DirectoryExpected(f"path {path!r} should be a directory")
except OSError as e:
if e.errno == errno.ENOTEMPTY:
raise DirectoryNotEmpty(f"Directory not empty: {path!r}")
else:
raise ResourceError(f"unable to remove {path!r}: {e}")
def removetree(self, path: str):
shutil.rmtree(self._abs(path))
def movedir(self, src_dir: str, dst_dir: str, create: bool = False):
if isbase(src_dir, dst_dir):
raise IllegalDestination(f"cannot move {src_dir!r} to {dst_dir!r}")
src_path = self._abs(src_dir)
if not src_path.exists():
raise ResourceNotFound(f"Source {src_dir!r} does not exist")
elif not src_path.is_dir():
raise DirectoryExpected(f"Source {src_dir!r} should be a directory")
dst_path = self._abs(dst_dir)
if not create and not dst_path.exists():
raise ResourceNotFound(f"Destination {dst_dir!r} does not exist")
if dst_path.is_file():
raise DirectoryExpected(f"Destination {dst_dir!r} should be a directory")
if create:
dst_path.parent.mkdir(parents=True, exist_ok=True)
if dst_path.exists():
if list(dst_path.iterdir()):
raise DirectoryNotEmpty(f"Destination {dst_dir!r} is not empty")
elif _WINDOWS_PLATFORM:
# on Unix os.rename silently replaces an empty dst_dir whereas on
# Windows it always raises FileExistsError, empty or not.
dst_path.rmdir()
src_path.rename(dst_path)
def getsyspath(self, path: str) -> str:
return str(self._abs(path))
def __repr__(self) -> str:
return f"{self.__class__.__name__}({str(self._root)!r})"
def __str__(self) -> str:
return f"<{self.__class__.__name__.lower()} '{self._root}'>"

View file

@ -0,0 +1,67 @@
import os
import platform
_WINDOWS_PLATFORM = platform.system() == "Windows"
def combine(path1: str, path2) -> str:
if not path1:
return path2
return "{}/{}".format(path1.rstrip("/"), path2.lstrip("/"))
def split(path: str) -> tuple[str, str]:
if "/" not in path:
return ("", path)
split = path.rsplit("/", 1)
return (split[0] or "/", split[1])
def dirname(path: str) -> str:
return split(path)[0]
def basename(path: str) -> str:
return split(path)[1]
def forcedir(path: str) -> str:
# Ensure the path ends with a trailing forward slash.
if not path.endswith("/"):
return path + "/"
return path
def abspath(path: str) -> str:
# FS objects have no concept of a *current directory*. This simply
# ensures the path starts with a forward slash.
if not path.startswith("/"):
return "/" + path
return path
def isbase(path1: str, path2: str) -> bool:
# Check if `path1` is a base or prefix of `path2`.
_path1 = forcedir(abspath(path1))
_path2 = forcedir(abspath(path2))
return _path2.startswith(_path1)
def frombase(path1: str, path2: str) -> str:
# Get the final path of `path2` that isn't in `path1`.
if not isbase(path1, path2):
raise ValueError(f"path1 must be a prefix of path2: {path1!r} vs {path2!r}")
return path2[len(path1) :]
def relpath(path: str) -> str:
return path.lstrip("/")
def normpath(path: str) -> str:
normalized = os.path.normpath(path)
if _WINDOWS_PLATFORM:
# os.path.normpath converts backslashes to forward slashes on Windows
# but we want forward slashes, so we convert them back
normalized = normalized.replace("\\", "/")
return normalized

View file

@ -0,0 +1,92 @@
from __future__ import annotations
import typing
from pathlib import PurePosixPath
from ._base import FS
from ._errors import DirectoryExpected, ResourceNotFound
if typing.TYPE_CHECKING:
from collections.abc import Collection
from typing import IO, Any
from ._info import Info
class SubFS(FS):
"""Maps a sub-directory of another filesystem."""
def __init__(self, parent: FS, sub_path: str):
super().__init__()
self._parent = parent
self._prefix = PurePosixPath(sub_path).as_posix().rstrip("/")
if not parent.exists(self._prefix):
raise ResourceNotFound(f"No such file or directory: {sub_path!r}")
elif not parent.isdir(self._prefix):
raise DirectoryExpected(f"{sub_path!r} is not a directory")
def delegate_fs(self):
return self._parent
def _full(self, rel: str) -> str:
self.check()
return f"{self._prefix}/{PurePosixPath(rel).as_posix()}".lstrip("/")
def open(self, path: str, mode: str = "rb", **kwargs) -> IO[Any]:
return self._parent.open(self._full(path), mode, **kwargs)
def exists(self, path: str) -> bool:
return self._parent.exists(self._full(path))
def isdir(self, path: str) -> bool:
return self._parent.isdir(self._full(path))
def isfile(self, path: str) -> bool:
return self._parent.isfile(self._full(path))
def listdir(self, path: str) -> list[str]:
return self._parent.listdir(self._full(path))
def makedir(self, path: str, recreate: bool = False):
return self._parent.makedir(self._full(path), recreate=recreate)
def makedirs(self, path: str, recreate: bool = False):
return self._parent.makedirs(self._full(path), recreate=recreate)
def getinfo(self, path: str, namespaces: Collection[str] | None = None) -> Info:
return self._parent.getinfo(self._full(path), namespaces=namespaces)
def remove(self, path: str):
return self._parent.remove(self._full(path))
def removedir(self, path: str):
return self._parent.removedir(self._full(path))
def removetree(self, path: str):
return self._parent.removetree(self._full(path))
def movedir(self, src: str, dst: str, create: bool = False):
self._parent.movedir(self._full(src), self._full(dst), create=create)
def getsyspath(self, path: str) -> str:
return self._parent.getsyspath(self._full(path))
def readbytes(self, path: str) -> bytes:
return self._parent.readbytes(self._full(path))
def writebytes(self, path: str, data: bytes):
self._parent.writebytes(self._full(path), data)
def __repr__(self) -> str:
return f"{self.__class__.__name__}({self._parent!r}, {self._prefix!r})"
def __str__(self) -> str:
return f"{self._parent}/{self._prefix}"
class ClosingSubFS(SubFS):
"""Like SubFS, but auto-closes the parent filesystem when closed."""
def close(self):
super().close()
self._parent.close()

View file

@ -0,0 +1,34 @@
from __future__ import annotations
import shutil
import tempfile
from ._errors import OperationFailed
from ._osfs import OSFS
class TempFS(OSFS):
def __init__(self, auto_clean: bool = True, ignore_clean_errors: bool = True):
self.auto_clean = auto_clean
self.ignore_clean_errors = ignore_clean_errors
self._temp_dir = tempfile.mkdtemp("__temp_fs__")
self._cleaned = False
super().__init__(self._temp_dir)
def close(self):
if self.auto_clean:
self.clean()
super().close()
def clean(self):
if self._cleaned:
return
try:
shutil.rmtree(self._temp_dir)
except Exception as e:
if not self.ignore_clean_errors:
raise OperationFailed(
f"failed to remove temporary directory: {self._temp_dir!r}"
) from e
self._cleaned = True

View file

@ -0,0 +1,34 @@
from __future__ import annotations
import typing
from pathlib import PurePosixPath
from ._errors import DirectoryNotEmpty
if typing.TYPE_CHECKING:
from typing import IO
from ._base import FS
def remove_empty(fs: FS, path: str):
"""Remove all empty parents."""
path = PurePosixPath(path)
root = PurePosixPath("/")
try:
while path != root:
fs.removedir(path.as_posix())
path = path.parent
except DirectoryNotEmpty:
pass
def copy_file_data(src_file: IO, dst_file: IO, chunk_size: int | None = None):
"""Copy data from one file object to another."""
_chunk_size = 1024 * 1024 if chunk_size is None else chunk_size
read = src_file.read
write = dst_file.write
# in iter(callable, sentilel), callable is called until it returns the sentinel;
# this allows to copy `chunk_size` bytes at a time.
for chunk in iter(lambda: read(_chunk_size) or None, None):
write(chunk)

View file

@ -0,0 +1,55 @@
from __future__ import annotations
import typing
from collections import deque
from collections.abc import Collection, Iterator
from ._path import combine
if typing.TYPE_CHECKING:
from typing import Callable
from ._base import FS
from ._info import Info
class BoundWalker:
def __init__(self, fs: FS):
self._fs = fs
def _iter_walk(
self, path: str, namespaces: Collection[str] | None = None
) -> Iterator[tuple[str, Info | None]]:
"""Walk files using a *breadth first* search."""
queue = deque([path])
push = queue.appendleft
pop = queue.pop
_scan = self._fs.scandir
_combine = combine
while queue:
dir_path = pop()
for info in _scan(dir_path, namespaces=namespaces):
if info.is_dir:
yield dir_path, info
push(_combine(dir_path, info.name))
else:
yield dir_path, info
yield path, None
def _filter(
self,
include: Callable[[str, Info], bool] = lambda path, info: True,
path: str = "/",
namespaces: Collection[str] | None = None,
) -> Iterator[str]:
_combine = combine
for path, info in self._iter_walk(path, namespaces):
if info is not None and include(path, info):
yield _combine(path, info.name)
def files(self, path: str = "/") -> Iterator[str]:
yield from self._filter(lambda _, info: info.is_file, path)
def dirs(self, path: str = "/") -> Iterator[str]:
yield from self._filter(lambda _, info: info.is_dir, path)

View file

@ -0,0 +1,204 @@
from __future__ import annotations
import io
import os
import shutil
import stat
import typing
import zipfile
from datetime import datetime
from ._base import FS
from ._errors import FileExpected, ResourceNotFound, ResourceReadOnly
from ._info import Info
from ._path import dirname, forcedir, normpath, relpath
from ._tempfs import TempFS
if typing.TYPE_CHECKING:
from collections.abc import Collection
from typing import IO, Any
from ._subfs import SubFS
class ZipFS(FS):
"""Read and write zip files."""
def __new__(
cls, file: str | os.PathLike, write: bool = False, encoding: str = "utf-8"
):
if write:
return WriteZipFS(file, encoding)
else:
return ReadZipFS(file, encoding)
if typing.TYPE_CHECKING:
def __init__(
self, file: str | os.PathLike, write: bool = False, encoding: str = "utf-8"
):
pass
class ReadZipFS(FS):
"""A readable zip file."""
def __init__(self, file: str | os.PathLike, encoding: str = "utf-8"):
super().__init__()
self._file = os.fspath(file)
self.encoding = encoding # unused
self._zip = zipfile.ZipFile(file, "r")
self._directory_fs = None
def __repr__(self) -> str:
return f"ReadZipFS({self._file!r})"
def __str__(self) -> str:
return f"<zipfs '{self._file}'>"
def _path_to_zip_name(self, path: str) -> str:
"""Convert a path to a zip file name."""
path = relpath(normpath(path))
if self._directory.isdir(path):
path = forcedir(path)
return path
@property
def _directory(self) -> TempFS:
if self._directory_fs is None:
self._directory_fs = _fs = TempFS()
for zip_name in self._zip.namelist():
resource_name = zip_name
if resource_name.endswith("/"):
_fs.makedirs(resource_name, recreate=True)
else:
_fs.makedirs(dirname(resource_name), recreate=True)
_fs.create(resource_name)
return self._directory_fs
def close(self):
super(ReadZipFS, self).close()
self._zip.close()
if self._directory_fs is not None:
self._directory_fs.close()
def getinfo(self, path: str, namespaces: Collection[str] | None = None) -> Info:
namespaces = namespaces or ()
raw_info = {}
if path == "/":
raw_info["basic"] = {"name": "", "is_dir": True}
if "details" in namespaces:
raw_info["details"] = {"type": stat.S_IFDIR}
else:
basic_info = self._directory.getinfo(path)
raw_info["basic"] = {"name": basic_info.name, "is_dir": basic_info.is_dir}
if "details" in namespaces:
zip_name = self._path_to_zip_name(path)
try:
zip_info = self._zip.getinfo(zip_name)
except KeyError:
pass
else:
if "details" in namespaces:
raw_info["details"] = {
"size": zip_info.file_size,
"type": int(
stat.S_IFDIR if basic_info.is_dir else stat.S_IFREG
),
"modified": datetime(*zip_info.date_time).timestamp(),
}
return Info(raw_info)
def exists(self, path: str) -> bool:
self.check()
return self._directory.exists(path)
def isdir(self, path: str) -> bool:
self.check()
return self._directory.isdir(path)
def isfile(self, path: str) -> bool:
self.check()
return self._directory.isfile(path)
def listdir(self, path: str) -> str:
self.check()
return self._directory.listdir(path)
def makedir(self, path: str, recreate: bool = False) -> SubFS:
self.check()
raise ResourceReadOnly(path)
def makedirs(self, path: str, recreate: bool = False) -> SubFS:
self.check()
raise ResourceReadOnly(path)
def remove(self, path: str):
self.check()
raise ResourceReadOnly(path)
def removedir(self, path: str):
self.check()
raise ResourceReadOnly(path)
def removetree(self, path: str):
self.check()
raise ResourceReadOnly(path)
def movedir(self, src: str, dst: str, create: bool = False):
self.check()
raise ResourceReadOnly(src)
def readbytes(self, path: str) -> bytes:
self.check()
if not self._directory.isfile(path):
raise ResourceNotFound(path)
zip_name = self._path_to_zip_name(path)
zip_bytes = self._zip.read(zip_name)
return zip_bytes
def open(self, path: str, mode: str = "rb", **kwargs) -> IO[Any]:
self.check()
if self._directory.isdir(path):
raise FileExpected(f"{path!r} is a directory")
zip_mode = mode[0]
if zip_mode == "r" and not self._directory.exists(path):
raise ResourceNotFound(f"No such file or directory: {path!r}")
if any(m in mode for m in "wax+"):
raise ResourceReadOnly(path)
zip_name = self._path_to_zip_name(path)
stream = self._zip.open(zip_name, zip_mode)
if "b" in mode:
if kwargs:
raise ValueError("encoding args invalid for binary operation")
return stream
# Text mode
return io.TextIOWrapper(stream, **kwargs)
class WriteZipFS(TempFS):
"""A writable zip file."""
def __init__(self, file: str | os.PathLike, encoding: str = "utf-8"):
super().__init__()
self._file = os.fspath(file)
self.encoding = encoding # unused
def __repr__(self) -> str:
return f"WriteZipFS({self._file!r})"
def __str__(self) -> str:
return f"<zipfs-write '{self._file}'>"
def close(self):
base_name = os.path.splitext(self._file)[0]
shutil.make_archive(base_name, format="zip", root_dir=self._temp_dir)
if self._file != base_name + ".zip":
shutil.move(base_name + ".zip", self._file)
super().close()