Coverage for tsfpga/system_utils.py: 95%

82 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-01 20:51 +0000

1# -------------------------------------------------------------------------------------------------- 

2# Copyright (c) Lukas Vik. All rights reserved. 

3# 

4# This file is part of the tsfpga project, a project platform for modern FPGA development. 

5# https://tsfpga.com 

6# https://github.com/tsfpga/tsfpga 

7# -------------------------------------------------------------------------------------------------- 

8 

9from __future__ import annotations 

10 

11import importlib.util 

12import os 

13import subprocess 

14from os.path import commonpath, relpath 

15from pathlib import Path 

16from platform import system 

17from shutil import rmtree 

18from sys import modules 

19from typing import TYPE_CHECKING 

20 

21from tsfpga import DEFAULT_FILE_ENCODING 

22 

23if TYPE_CHECKING: 

24 from types import ModuleType 

25 

26 

27def create_file(file: Path, contents: str | None = None) -> Path: 

28 """ 

29 Create the ``file`` and any parent directories that do not exist. 

30 File will be empty unless ``contents`` is specified. 

31 

32 Return: 

33 The path to the file that was created (i.e. the original ``file`` argument). 

34 """ 

35 # Create directory unless it already exists. Do not delete anything if it does exist. 

36 create_directory(directory=file.parent, empty=False) 

37 

38 contents = "" if contents is None else contents 

39 with file.open("w", encoding=DEFAULT_FILE_ENCODING) as file_handle: 

40 file_handle.write(contents) 

41 

42 return file 

43 

44 

45def read_file(file: Path) -> str: 

46 """ 

47 Read and return the file contents. 

48 """ 

49 with file.open(encoding=DEFAULT_FILE_ENCODING) as file_handle: 

50 return file_handle.read() 

51 

52 

53def read_last_lines_of_file(file: Path, num_lines: int) -> str: 

54 """ 

55 Read a number of lines from the end of a file, without buffering the whole file. 

56 Similar to unix ``tail`` command. 

57 

58 Arguments: 

59 file: The file that shall be read. 

60 num_lines: The number of lines to read. 

61 

62 Return: 

63 The last lines of the file. 

64 """ 

65 result_lines: list[str] = [] 

66 blocks_to_read = 0 

67 

68 with file.open(encoding=DEFAULT_FILE_ENCODING) as file_handle: 

69 while len(result_lines) < num_lines: 

70 # Since we do not know the line lengths, there is some guessing involved. Keep reading 

71 # larger and larger blocks until we have all the lines that are requested. 

72 blocks_to_read += 1 

73 

74 try: 

75 # Read a block from the end 

76 file_handle.seek(-blocks_to_read * 4096, os.SEEK_END) 

77 except OSError: 

78 # Tried to read more data than what is available. Read whatever we have and return 

79 # to user. 

80 file_handle.seek(0) 

81 result_lines = file_handle.readlines() 

82 break 

83 

84 result_lines = file_handle.readlines() 

85 

86 return "".join(result_lines[-num_lines:]) 

87 

88 

89def prepend_file(file_path: Path, text: str) -> Path: 

90 """ 

91 Insert the ``text`` at the beginning of the file, before any existing content. 

92 

93 Returns: 

94 The original file path. 

95 """ 

96 with file_path.open("r+") as file_handle: 

97 existing_content = file_handle.read() 

98 file_handle.seek(0) 

99 file_handle.write(text + existing_content) 

100 

101 return file_path 

102 

103 

104def delete(path: Path, wait_until_deleted: bool = False) -> Path: 

105 """ 

106 Delete a file or directory from the filesystem. 

107 

108 Arguments: 

109 path: The file/directory to be deleted. 

110 wait_until_deleted: When set to ``True``, the function will poll the filesystem 

111 after initiating the deletion, and not return until the path is in fact deleted. 

112 Is needed on some filesystems/mounts in a situation where we delete a path and 

113 then directly want to write to it afterwards. 

114 

115 Return: 

116 The path that was deleted (i.e. the original ``path`` argument). 

117 """ 

118 if path.exists(): 

119 if path.is_dir(): 

120 rmtree(path) 

121 else: 

122 path.unlink() 

123 

124 if wait_until_deleted: 

125 while path.exists(): 

126 pass 

127 

128 return path 

129 

130 

131def create_directory(directory: Path, empty: bool = True) -> Path: 

132 """ 

133 Create a directory. 

134 

135 Arguments: 

136 directory: Path to the directory. 

137 empty: If true and the directory already exists, all existing files/folders in it will 

138 be deleted. 

139 If false, the directory will be left as-is if it already exists, or will be newly 

140 created if it does not. 

141 

142 Return: 

143 The path that was created (i.e. the original ``directory`` argument). 

144 """ 

145 if empty: 

146 # Delete directory, and anything inside it, before creating it below. 

147 delete(directory) 

148 

149 elif directory.exists(): 

150 if directory.is_dir(): 

151 return directory 

152 

153 raise FileExistsError(f"Requested directory path already exists as a file: {directory}") 

154 

155 directory.mkdir(parents=True) 

156 return directory 

157 

158 

159def file_is_in_directory(file_path: Path, directories: list[Path]) -> bool: 

160 """ 

161 Check if the file is in any of the directories. 

162 

163 Arguments: 

164 file_path: The file to be checked. 

165 directories: The directories to be controlled. 

166 

167 Return: 

168 True if there is a common path. 

169 """ 

170 for directory in directories: 

171 if commonpath([str(file_path), str(directory)]) == str(directory): 

172 return True 

173 

174 return False 

175 

176 

177def path_relative_to(path: Path, other: Path) -> Path: 

178 """ 

179 Return a relative path from ``other`` to ``path``. 

180 This function works even if ``path`` is not inside a ``other`` folder. 

181 

182 Note Path.relative_to() does not support the use case where e.g. readme.md should get 

183 relative path "../readme.md". Hence we have to use os.path. 

184 """ 

185 return Path(relpath(str(path), str(other))) 

186 

187 

188def run_command( 

189 cmd: list[str], 

190 cwd: Path | None = None, 

191 env: dict[str, str] | None = None, 

192 capture_output: bool = False, 

193) -> subprocess.CompletedProcess[str]: 

194 """ 

195 Will raise ``CalledProcessError`` if the command fails. 

196 

197 Arguments: 

198 cmd: The command to run. 

199 cwd: The working directory where the command shall be executed. 

200 env: Environment variables to set. 

201 capture_output: Enable capturing of STDOUT and STDERR. 

202 

203 Return: 

204 Returns the subprocess completed process object, which contains useful information. 

205 If ``capture_output`` is set, the ``stdout`` and ``stderr`` members of this object can 

206 be inspected. 

207 """ 

208 if not isinstance(cmd, list): 

209 raise TypeError("Must be called with a list, not a string") 

210 

211 return subprocess.run( 

212 args=cmd, 

213 cwd=cwd, 

214 env=env, 

215 check=True, 

216 encoding=DEFAULT_FILE_ENCODING, 

217 capture_output=capture_output, 

218 ) 

219 

220 

221def load_python_module(file: Path) -> ModuleType: 

222 """ 

223 Load the specified Python module. 

224 Note that in Python nomenclature, a module is a source code file. 

225 

226 On the returned object, you can call functions or instantiate classes that are in the module. 

227 """ 

228 python_module_name = file.stem 

229 

230 spec = importlib.util.spec_from_file_location(name=python_module_name, location=file) 

231 if spec is None or spec.loader is None: 

232 raise RuntimeError(f"Could not load the Python module: {file}") 

233 

234 module = importlib.util.module_from_spec(spec=spec) 

235 modules[python_module_name] = module 

236 spec.loader.exec_module(module=module) 

237 

238 return module 

239 

240 

241def system_is_windows() -> bool: 

242 """ 

243 Return True if the script is being executed on a computer running the Windows operating system. 

244 """ 

245 return system() == "Windows"