Coverage for tsfpga/system_utils.py: 97%

68 statements  

« prev     ^ index     » next       coverage.py v6.4, created at 2022-05-28 04:01 +0000

1# -------------------------------------------------------------------------------------------------- 

2# Copyright (c) Lukas Vik. All rights reserved. 

3# 

4# This file is part of the tsfpga project. 

5# https://tsfpga.com 

6# https://gitlab.com/tsfpga/tsfpga 

7# -------------------------------------------------------------------------------------------------- 

8 

9import importlib.util 

10import os 

11from os.path import commonpath, relpath 

12from platform import system 

13from shutil import rmtree 

14import subprocess 

15 

16from pathlib import Path 

17 

18from tsfpga import DEFAULT_FILE_ENCODING 

19 

20 

21def create_file(file, contents=None): 

22 create_directory(file.parent, empty=False) 

23 

24 contents = "" if contents is None else contents 

25 with open(file, "w", encoding=DEFAULT_FILE_ENCODING) as file_handle: 

26 file_handle.write(contents) 

27 

28 return file 

29 

30 

31def read_file(file): 

32 with open(file, encoding=DEFAULT_FILE_ENCODING) as file_handle: 

33 return file_handle.read() 

34 

35 

36def read_last_lines_of_file(file, num_lines): 

37 """ 

38 Read a number of lines from the end of a file, without buffering the whole file. 

39 Similar to unix ``tail`` command. 

40 

41 Arguments: 

42 file (pathlib.Path): The file that shall be read. 

43 num_lines (int): The number of lines to read. 

44 

45 Return: 

46 str: The last lines of the file. 

47 """ 

48 result_lines = [] 

49 blocks_to_read = 0 

50 

51 with open(file, encoding=DEFAULT_FILE_ENCODING) as file_handle: 

52 while len(result_lines) < num_lines: 

53 # Since we do not know the line lengths, there is some guessing involved. Keep reading 

54 # larger and larger blocks until we have all the lines that are requested. 

55 blocks_to_read += 1 

56 

57 try: 

58 # Read a block from the end 

59 file_handle.seek(-blocks_to_read * 4096, os.SEEK_END) 

60 except IOError: 

61 # Tried to read more data than what is available. Read whatever we have and return 

62 # to user. 

63 file_handle.seek(0) 

64 result_lines = file_handle.readlines() 

65 break 

66 

67 result_lines = file_handle.readlines() 

68 

69 result = "".join(result_lines[-num_lines:]) 

70 return result 

71 

72 

73def delete(path, wait_until_deleted=False): 

74 """ 

75 Delete a file or directory from the filesystem. 

76 

77 Arguments: 

78 path (pathlib.Path): The file/directory to be deleted. 

79 wait_until_deleted (bool): When set to ``True``, the function will poll the filesystem 

80 after initiating the deletion, and not return until the path is in fact deleted. 

81 Is needed on some filesystems/mounts in a situation where we delete a path and 

82 then directly want to write to it afterwards. 

83 

84 Returns: 

85 pathlib.Path: The path that was deleted (i.e. the original ``path`` argument). 

86 """ 

87 if path.exists(): 

88 if path.is_dir(): 

89 rmtree(path) 

90 else: 

91 path.unlink() 

92 

93 if wait_until_deleted: 

94 while path.exists(): 

95 pass 

96 

97 return path 

98 

99 

100def create_directory(directory, empty=True): 

101 if empty: 

102 delete(directory) 

103 elif directory.exists(): 

104 return directory 

105 

106 directory.mkdir(parents=True) 

107 return directory 

108 

109 

110def file_is_in_directory(file_path, directories): 

111 """ 

112 Check if the file is in any of the directories. 

113 

114 Arguments: 

115 file_path (pathlib.Path): The file to be checked. 

116 directories (list(pathlib.Path)): The directories to be controlled. 

117 

118 Returns: 

119 bool: True if there is a common path. 

120 """ 

121 for directory in directories: 

122 if commonpath([str(file_path), str(directory)]) == str(directory): 

123 return True 

124 return False 

125 

126 

127def path_relative_to(path, other): 

128 """ 

129 Note Path.relative_to() does not support the use case where e.g. readme.md should get 

130 relative path "../readme.md". Hence we have to use os.path. 

131 """ 

132 assert path.exists(), path 

133 return Path(relpath(str(path), str(other))) 

134 

135 

136def run_command(cmd, cwd=None, env=None): 

137 if not isinstance(cmd, list): 

138 raise ValueError("Must be called with a list, not a string") 

139 

140 subprocess.check_call(cmd, cwd=cwd, env=env) 

141 

142 

143def load_python_module(file): 

144 python_module_name = file.stem 

145 

146 spec = importlib.util.spec_from_file_location(python_module_name, file) 

147 module = importlib.util.module_from_spec(spec) 

148 spec.loader.exec_module(module) 

149 

150 return module 

151 

152 

153def system_is_windows(): 

154 return system() == "Windows"