microcore.file_storage
File storage functions
1""" 2File storage functions 3""" 4 5import fnmatch 6import json 7import os 8import shutil 9from pathlib import Path 10import chardet 11 12from ._env import config 13from .utils import file_link, list_files 14 15_missing = object() 16 17 18class Storage: 19 @property 20 def path(self) -> Path: 21 return Path(config().STORAGE_PATH) 22 23 @property 24 def default_ext(self) -> str | None: 25 ext = config().STORAGE_DEFAULT_FILE_EXT 26 if ext and not ext.startswith("."): 27 ext = "." + ext 28 return ext 29 30 def file_link(self, file_name: str | Path) -> str: 31 """Returns file name in format displayed in PyCharm console as a link.""" 32 return file_link(self.path / file_name) 33 34 @property 35 def default_encoding(self) -> str: 36 return config().DEFAULT_ENCODING 37 38 def exists(self, name: str | Path) -> bool: 39 return (self.path / name).exists() 40 41 def abs_path(self, name: str | Path) -> Path: 42 if os.path.isabs(name): 43 return Path(name) 44 return self.path / name 45 46 def read(self, name: str | Path, encoding: str = None): 47 name = str(name) 48 encoding = encoding or self.default_encoding 49 if not os.path.isabs(name) and not name.startswith("./"): 50 if "." in name: 51 parts = name.split(".") 52 name = ".".join(parts[:-1]) 53 ext = "." + parts[-1] 54 else: 55 ext = self.default_ext 56 if not self.exists(f"{name}{ext}"): 57 ext = "" 58 name = f"{self.path}/{name}{ext}" 59 if encoding is None: 60 with open(name, "rb") as f: 61 rawdata = f.read() 62 result = chardet.detect(rawdata) 63 encoding = result["encoding"] 64 return rawdata.decode(encoding) 65 66 with open(name, "r", encoding=encoding) as f: 67 return f.read() 68 69 def write_json( 70 self, 71 name: str | Path, 72 data, 73 rewrite_existing: bool = True, 74 backup_existing: bool = True, 75 ensure_ascii: bool = False, 76 ): 77 serialized_data = json.dumps(data, indent=4, ensure_ascii=ensure_ascii) 78 return self.write(name, serialized_data, rewrite_existing, backup_existing) 79 80 def read_json(self, name: str | Path, default=None): 81 try: 82 return json.loads(self.read(name)) 83 except FileNotFoundError as e: 84 if default is not None: 85 return default 86 raise e 87 88 def delete(self, target: str | Path | list[str | Path]): 89 """ 90 Removes the file or directory specified by `path` within the `storage_path` if exists. 91 """ 92 if isinstance(target, list): 93 for t in target: 94 self.delete(t) 95 return 96 path = (self.path / target).resolve() 97 if not path.exists(): 98 return 99 if path.is_dir(): 100 shutil.rmtree(path) 101 else: 102 os.remove(path) 103 104 def write( 105 self, 106 name: str | Path, 107 content: str = _missing, 108 rewrite_existing: bool = None, 109 backup_existing: bool = None, 110 encoding: str = None, 111 append: bool = False, 112 ) -> str | os.PathLike: 113 """ 114 :return: str File name for further usage 115 """ 116 if rewrite_existing is None: 117 rewrite_existing = True 118 if backup_existing is None: 119 backup_existing = not append 120 encoding = encoding or self.default_encoding 121 if content == _missing: 122 content = name 123 name = f"out{self.default_ext}" 124 125 base_name = Path(name).with_suffix("") 126 ext = Path(name).suffix or self.default_ext 127 128 file_name = f"{base_name}{ext}" 129 if (self.path / file_name).is_file() and ( 130 backup_existing or not rewrite_existing 131 ): 132 counter = 1 133 while True: 134 file_name1 = f"{base_name}_{counter}{ext}" # noqa 135 if not (self.path / file_name1).is_file(): 136 break 137 counter += 1 138 if not rewrite_existing: 139 file_name = file_name1 140 elif backup_existing: 141 os.rename(self.path / file_name, self.path / file_name1) 142 (self.path / file_name).parent.mkdir(parents=True, exist_ok=True) 143 if append: 144 with (self.path / file_name).open(mode="a", encoding=encoding) as file: 145 file.write(content) 146 else: 147 (self.path / file_name).write_text(content, encoding=encoding) 148 return file_name 149 150 def clean(self, path: str | Path): 151 """ 152 Removes the directory specified by `path` within the `storage_path`. 153 :raises ValueError: If the path is outside the storage area. 154 @deprecated use storage.delete() instead 155 """ 156 full_path = (self.path / path).resolve() 157 158 # Verify that the path is inside the storage_path 159 if self.path.resolve() not in full_path.parents: 160 raise ValueError("Cannot delete directories outside the storage path.") 161 162 if full_path.exists() and full_path.is_dir(): 163 shutil.rmtree(full_path) 164 165 def list_files( 166 self, 167 target_dir: str | Path = "", 168 exclude: list[str | Path] = None, 169 relative_to: str | Path = None, 170 absolute: bool = False, 171 posix: bool = False, 172 ) -> list[Path | str]: 173 """ 174 Lists files in a specified directory, excluding those that match given patterns. 175 176 Args: 177 target_dir (str | Path): The directory to search in. 178 exclude (list[str | Path], optional): Patterns of files to exclude. 179 relative_to (str | Path, optional): Base directory for relative paths. 180 If None, paths are relative to `target_dir`. Defaults to None. 181 absolute (bool, optional): If True, returns absolute paths. Defaults to False. 182 posix (bool, optional): If True, returns posix paths. Defaults to False. 183 """ 184 target_dir = self.path / target_dir 185 return list_files( 186 target_dir=target_dir, 187 exclude=exclude, 188 relative_to=relative_to, 189 absolute=absolute, 190 posix=posix, 191 ) 192 193 def copy(self, src: str | Path, dest: str | Path, exclude=None): 194 """ 195 Copy a file or folder from src to dest, overwriting content, 196 but skipping paths in exceptions. 197 Supports Unix shell-style wildcards in exceptions. Accepts Path objects. 198 199 Args: 200 src (Path): Source file or directory Path object. 201 dest (Path): Destination file or directory Path object. 202 exclude (list of str, optional): 203 List of Unix shell-style wildcard patterns relative to src. 204 These paths will be excluded from the copy. Defaults to None. 205 """ 206 src = self.path / Path(src) 207 dest = self.path / Path(dest) 208 exclude = exclude or [] 209 if src.is_dir(): 210 dest.mkdir(parents=True, exist_ok=True) 211 files = list_files(src, exclude) 212 for f in files: 213 if (src / f).is_dir(): 214 (dest / f).mkdir(parents=True, exist_ok=True) 215 elif (src / f).is_file(): 216 (dest / f).parent.mkdir(parents=True, exist_ok=True) 217 shutil.copy2(src / f, dest / f) 218 else: 219 raise ValueError(f"{src / f} is not a file or directory") 220 elif src.is_file(): 221 if not any(fnmatch.fnmatch(src.name, pattern) for pattern in exclude): 222 if dest.is_dir(): 223 dest = dest / src.name 224 dest.parent.mkdir(parents=True, exist_ok=True) 225 shutil.copy2(src, dest) 226 else: 227 raise ValueError(f"{src} is not a file or directory") 228 229 230storage = Storage() 231""" 232File system operations within the storage folder. 233 234See `Storage` for details. 235 236Related configuration options: 237 238 - `microcore.config.Config.STORAGE_PATH` 239 - `microcore.config.Config.DEFAULT_ENCODING` 240"""
class
Storage:
19class Storage: 20 @property 21 def path(self) -> Path: 22 return Path(config().STORAGE_PATH) 23 24 @property 25 def default_ext(self) -> str | None: 26 ext = config().STORAGE_DEFAULT_FILE_EXT 27 if ext and not ext.startswith("."): 28 ext = "." + ext 29 return ext 30 31 def file_link(self, file_name: str | Path) -> str: 32 """Returns file name in format displayed in PyCharm console as a link.""" 33 return file_link(self.path / file_name) 34 35 @property 36 def default_encoding(self) -> str: 37 return config().DEFAULT_ENCODING 38 39 def exists(self, name: str | Path) -> bool: 40 return (self.path / name).exists() 41 42 def abs_path(self, name: str | Path) -> Path: 43 if os.path.isabs(name): 44 return Path(name) 45 return self.path / name 46 47 def read(self, name: str | Path, encoding: str = None): 48 name = str(name) 49 encoding = encoding or self.default_encoding 50 if not os.path.isabs(name) and not name.startswith("./"): 51 if "." in name: 52 parts = name.split(".") 53 name = ".".join(parts[:-1]) 54 ext = "." + parts[-1] 55 else: 56 ext = self.default_ext 57 if not self.exists(f"{name}{ext}"): 58 ext = "" 59 name = f"{self.path}/{name}{ext}" 60 if encoding is None: 61 with open(name, "rb") as f: 62 rawdata = f.read() 63 result = chardet.detect(rawdata) 64 encoding = result["encoding"] 65 return rawdata.decode(encoding) 66 67 with open(name, "r", encoding=encoding) as f: 68 return f.read() 69 70 def write_json( 71 self, 72 name: str | Path, 73 data, 74 rewrite_existing: bool = True, 75 backup_existing: bool = True, 76 ensure_ascii: bool = False, 77 ): 78 serialized_data = json.dumps(data, indent=4, ensure_ascii=ensure_ascii) 79 return self.write(name, serialized_data, rewrite_existing, backup_existing) 80 81 def read_json(self, name: str | Path, default=None): 82 try: 83 return json.loads(self.read(name)) 84 except FileNotFoundError as e: 85 if default is not None: 86 return default 87 raise e 88 89 def delete(self, target: str | Path | list[str | Path]): 90 """ 91 Removes the file or directory specified by `path` within the `storage_path` if exists. 92 """ 93 if isinstance(target, list): 94 for t in target: 95 self.delete(t) 96 return 97 path = (self.path / target).resolve() 98 if not path.exists(): 99 return 100 if path.is_dir(): 101 shutil.rmtree(path) 102 else: 103 os.remove(path) 104 105 def write( 106 self, 107 name: str | Path, 108 content: str = _missing, 109 rewrite_existing: bool = None, 110 backup_existing: bool = None, 111 encoding: str = None, 112 append: bool = False, 113 ) -> str | os.PathLike: 114 """ 115 :return: str File name for further usage 116 """ 117 if rewrite_existing is None: 118 rewrite_existing = True 119 if backup_existing is None: 120 backup_existing = not append 121 encoding = encoding or self.default_encoding 122 if content == _missing: 123 content = name 124 name = f"out{self.default_ext}" 125 126 base_name = Path(name).with_suffix("") 127 ext = Path(name).suffix or self.default_ext 128 129 file_name = f"{base_name}{ext}" 130 if (self.path / file_name).is_file() and ( 131 backup_existing or not rewrite_existing 132 ): 133 counter = 1 134 while True: 135 file_name1 = f"{base_name}_{counter}{ext}" # noqa 136 if not (self.path / file_name1).is_file(): 137 break 138 counter += 1 139 if not rewrite_existing: 140 file_name = file_name1 141 elif backup_existing: 142 os.rename(self.path / file_name, self.path / file_name1) 143 (self.path / file_name).parent.mkdir(parents=True, exist_ok=True) 144 if append: 145 with (self.path / file_name).open(mode="a", encoding=encoding) as file: 146 file.write(content) 147 else: 148 (self.path / file_name).write_text(content, encoding=encoding) 149 return file_name 150 151 def clean(self, path: str | Path): 152 """ 153 Removes the directory specified by `path` within the `storage_path`. 154 :raises ValueError: If the path is outside the storage area. 155 @deprecated use storage.delete() instead 156 """ 157 full_path = (self.path / path).resolve() 158 159 # Verify that the path is inside the storage_path 160 if self.path.resolve() not in full_path.parents: 161 raise ValueError("Cannot delete directories outside the storage path.") 162 163 if full_path.exists() and full_path.is_dir(): 164 shutil.rmtree(full_path) 165 166 def list_files( 167 self, 168 target_dir: str | Path = "", 169 exclude: list[str | Path] = None, 170 relative_to: str | Path = None, 171 absolute: bool = False, 172 posix: bool = False, 173 ) -> list[Path | str]: 174 """ 175 Lists files in a specified directory, excluding those that match given patterns. 176 177 Args: 178 target_dir (str | Path): The directory to search in. 179 exclude (list[str | Path], optional): Patterns of files to exclude. 180 relative_to (str | Path, optional): Base directory for relative paths. 181 If None, paths are relative to `target_dir`. Defaults to None. 182 absolute (bool, optional): If True, returns absolute paths. Defaults to False. 183 posix (bool, optional): If True, returns posix paths. Defaults to False. 184 """ 185 target_dir = self.path / target_dir 186 return list_files( 187 target_dir=target_dir, 188 exclude=exclude, 189 relative_to=relative_to, 190 absolute=absolute, 191 posix=posix, 192 ) 193 194 def copy(self, src: str | Path, dest: str | Path, exclude=None): 195 """ 196 Copy a file or folder from src to dest, overwriting content, 197 but skipping paths in exceptions. 198 Supports Unix shell-style wildcards in exceptions. Accepts Path objects. 199 200 Args: 201 src (Path): Source file or directory Path object. 202 dest (Path): Destination file or directory Path object. 203 exclude (list of str, optional): 204 List of Unix shell-style wildcard patterns relative to src. 205 These paths will be excluded from the copy. Defaults to None. 206 """ 207 src = self.path / Path(src) 208 dest = self.path / Path(dest) 209 exclude = exclude or [] 210 if src.is_dir(): 211 dest.mkdir(parents=True, exist_ok=True) 212 files = list_files(src, exclude) 213 for f in files: 214 if (src / f).is_dir(): 215 (dest / f).mkdir(parents=True, exist_ok=True) 216 elif (src / f).is_file(): 217 (dest / f).parent.mkdir(parents=True, exist_ok=True) 218 shutil.copy2(src / f, dest / f) 219 else: 220 raise ValueError(f"{src / f} is not a file or directory") 221 elif src.is_file(): 222 if not any(fnmatch.fnmatch(src.name, pattern) for pattern in exclude): 223 if dest.is_dir(): 224 dest = dest / src.name 225 dest.parent.mkdir(parents=True, exist_ok=True) 226 shutil.copy2(src, dest) 227 else: 228 raise ValueError(f"{src} is not a file or directory")
def
file_link(self, file_name: str | pathlib.Path) -> str:
31 def file_link(self, file_name: str | Path) -> str: 32 """Returns file name in format displayed in PyCharm console as a link.""" 33 return file_link(self.path / file_name)
Returns file name in format displayed in PyCharm console as a link.
def
read(self, name: str | pathlib.Path, encoding: str = None):
47 def read(self, name: str | Path, encoding: str = None): 48 name = str(name) 49 encoding = encoding or self.default_encoding 50 if not os.path.isabs(name) and not name.startswith("./"): 51 if "." in name: 52 parts = name.split(".") 53 name = ".".join(parts[:-1]) 54 ext = "." + parts[-1] 55 else: 56 ext = self.default_ext 57 if not self.exists(f"{name}{ext}"): 58 ext = "" 59 name = f"{self.path}/{name}{ext}" 60 if encoding is None: 61 with open(name, "rb") as f: 62 rawdata = f.read() 63 result = chardet.detect(rawdata) 64 encoding = result["encoding"] 65 return rawdata.decode(encoding) 66 67 with open(name, "r", encoding=encoding) as f: 68 return f.read()
def
write_json( self, name: str | pathlib.Path, data, rewrite_existing: bool = True, backup_existing: bool = True, ensure_ascii: bool = False):
70 def write_json( 71 self, 72 name: str | Path, 73 data, 74 rewrite_existing: bool = True, 75 backup_existing: bool = True, 76 ensure_ascii: bool = False, 77 ): 78 serialized_data = json.dumps(data, indent=4, ensure_ascii=ensure_ascii) 79 return self.write(name, serialized_data, rewrite_existing, backup_existing)
def
delete(self, target: str | pathlib.Path | list[str | pathlib.Path]):
89 def delete(self, target: str | Path | list[str | Path]): 90 """ 91 Removes the file or directory specified by `path` within the `storage_path` if exists. 92 """ 93 if isinstance(target, list): 94 for t in target: 95 self.delete(t) 96 return 97 path = (self.path / target).resolve() 98 if not path.exists(): 99 return 100 if path.is_dir(): 101 shutil.rmtree(path) 102 else: 103 os.remove(path)
Removes the file or directory specified by path within the storage_path if exists.
def
write( self, name: str | pathlib.Path, content: str = <object object>, rewrite_existing: bool = None, backup_existing: bool = None, encoding: str = None, append: bool = False) -> str | os.PathLike:
105 def write( 106 self, 107 name: str | Path, 108 content: str = _missing, 109 rewrite_existing: bool = None, 110 backup_existing: bool = None, 111 encoding: str = None, 112 append: bool = False, 113 ) -> str | os.PathLike: 114 """ 115 :return: str File name for further usage 116 """ 117 if rewrite_existing is None: 118 rewrite_existing = True 119 if backup_existing is None: 120 backup_existing = not append 121 encoding = encoding or self.default_encoding 122 if content == _missing: 123 content = name 124 name = f"out{self.default_ext}" 125 126 base_name = Path(name).with_suffix("") 127 ext = Path(name).suffix or self.default_ext 128 129 file_name = f"{base_name}{ext}" 130 if (self.path / file_name).is_file() and ( 131 backup_existing or not rewrite_existing 132 ): 133 counter = 1 134 while True: 135 file_name1 = f"{base_name}_{counter}{ext}" # noqa 136 if not (self.path / file_name1).is_file(): 137 break 138 counter += 1 139 if not rewrite_existing: 140 file_name = file_name1 141 elif backup_existing: 142 os.rename(self.path / file_name, self.path / file_name1) 143 (self.path / file_name).parent.mkdir(parents=True, exist_ok=True) 144 if append: 145 with (self.path / file_name).open(mode="a", encoding=encoding) as file: 146 file.write(content) 147 else: 148 (self.path / file_name).write_text(content, encoding=encoding) 149 return file_name
Returns
str File name for further usage
def
clean(self, path: str | pathlib.Path):
151 def clean(self, path: str | Path): 152 """ 153 Removes the directory specified by `path` within the `storage_path`. 154 :raises ValueError: If the path is outside the storage area. 155 @deprecated use storage.delete() instead 156 """ 157 full_path = (self.path / path).resolve() 158 159 # Verify that the path is inside the storage_path 160 if self.path.resolve() not in full_path.parents: 161 raise ValueError("Cannot delete directories outside the storage path.") 162 163 if full_path.exists() and full_path.is_dir(): 164 shutil.rmtree(full_path)
Removes the directory specified by path within the storage_path.
Raises
- ValueError: If the path is outside the storage area. @deprecated use storage.delete() instead
def
list_files( self, target_dir: str | pathlib.Path = '', exclude: list[str | pathlib.Path] = None, relative_to: str | pathlib.Path = None, absolute: bool = False, posix: bool = False) -> list[pathlib.Path | str]:
166 def list_files( 167 self, 168 target_dir: str | Path = "", 169 exclude: list[str | Path] = None, 170 relative_to: str | Path = None, 171 absolute: bool = False, 172 posix: bool = False, 173 ) -> list[Path | str]: 174 """ 175 Lists files in a specified directory, excluding those that match given patterns. 176 177 Args: 178 target_dir (str | Path): The directory to search in. 179 exclude (list[str | Path], optional): Patterns of files to exclude. 180 relative_to (str | Path, optional): Base directory for relative paths. 181 If None, paths are relative to `target_dir`. Defaults to None. 182 absolute (bool, optional): If True, returns absolute paths. Defaults to False. 183 posix (bool, optional): If True, returns posix paths. Defaults to False. 184 """ 185 target_dir = self.path / target_dir 186 return list_files( 187 target_dir=target_dir, 188 exclude=exclude, 189 relative_to=relative_to, 190 absolute=absolute, 191 posix=posix, 192 )
Lists files in a specified directory, excluding those that match given patterns.
Arguments:
- target_dir (str | Path): The directory to search in.
- exclude (list[str | Path], optional): Patterns of files to exclude.
- relative_to (str | Path, optional): Base directory for relative paths.
If None, paths are relative to
target_dir. Defaults to None. - absolute (bool, optional): If True, returns absolute paths. Defaults to False.
- posix (bool, optional): If True, returns posix paths. Defaults to False.
def
copy( self, src: str | pathlib.Path, dest: str | pathlib.Path, exclude=None):
194 def copy(self, src: str | Path, dest: str | Path, exclude=None): 195 """ 196 Copy a file or folder from src to dest, overwriting content, 197 but skipping paths in exceptions. 198 Supports Unix shell-style wildcards in exceptions. Accepts Path objects. 199 200 Args: 201 src (Path): Source file or directory Path object. 202 dest (Path): Destination file or directory Path object. 203 exclude (list of str, optional): 204 List of Unix shell-style wildcard patterns relative to src. 205 These paths will be excluded from the copy. Defaults to None. 206 """ 207 src = self.path / Path(src) 208 dest = self.path / Path(dest) 209 exclude = exclude or [] 210 if src.is_dir(): 211 dest.mkdir(parents=True, exist_ok=True) 212 files = list_files(src, exclude) 213 for f in files: 214 if (src / f).is_dir(): 215 (dest / f).mkdir(parents=True, exist_ok=True) 216 elif (src / f).is_file(): 217 (dest / f).parent.mkdir(parents=True, exist_ok=True) 218 shutil.copy2(src / f, dest / f) 219 else: 220 raise ValueError(f"{src / f} is not a file or directory") 221 elif src.is_file(): 222 if not any(fnmatch.fnmatch(src.name, pattern) for pattern in exclude): 223 if dest.is_dir(): 224 dest = dest / src.name 225 dest.parent.mkdir(parents=True, exist_ok=True) 226 shutil.copy2(src, dest) 227 else: 228 raise ValueError(f"{src} is not a file or directory")
Copy a file or folder from src to dest, overwriting content, but skipping paths in exceptions. Supports Unix shell-style wildcards in exceptions. Accepts Path objects.
Arguments:
- src (Path): Source file or directory Path object.
- dest (Path): Destination file or directory Path object.
- exclude (list of str, optional): List of Unix shell-style wildcard patterns relative to src. These paths will be excluded from the copy. Defaults to None.
storage =
<Storage object>
File system operations within the storage folder.
See Storage for details.