Coverage for src/ansible_sign/checksum/base.py: 99%

81 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-03-29 20:49 +0000

1import hashlib 

2import os 

3 

4 

5class InvalidChecksumLine(Exception): 

6 pass 

7 

8 

9class NoDifferException(Exception): 

10 pass 

11 

12 

13class ChecksumMismatch(Exception): 

14 pass 

15 

16 

17class ChecksumFile: 

18 """ 

19 Slurp a checksum file and be able to check and compare its contents to a 

20 given root directory. Also: be able to write out a checksum file. 

21 

22 We only allow sha256 for now, though supporting 512, etc. would be easy. 

23 """ 

24 

25 def __init__(self, root, differ=None): 

26 self.root = root 

27 if differ is not None: 

28 self.differ = differ(root=self.root) 

29 else: 

30 from .differ.distlib_manifest import ( 

31 DistlibManifestChecksumFileExistenceDiffer, 

32 ) 

33 

34 self.differ = DistlibManifestChecksumFileExistenceDiffer(root=self.root) 

35 

36 @property 

37 def differ_warnings(self): 

38 """ 

39 A differ can store a set of warnings (as strings) in the_differ.warnings 

40 which we can propagate up here. This allows calling code to display any 

41 warnings found during diffing time. 

42 """ 

43 

44 return self.differ.warnings 

45 

46 @property 

47 def warnings(self): 

48 """ 

49 Right now this is just the same as differ_warnings. In the future it 

50 might include warnings that we differ in methods in this class as well. 

51 """ 

52 

53 return self.differ_warnings 

54 

55 def _parse_gnu_style(self, line): 

56 """ 

57 Attempt to parse a GNU style line checksum line, returning False if 

58 we are unable to. 

59 

60 A GNU style line looks like this: 

61 f712979c4c5dfe739253908d122f5c87faa8b5de6f15ba7a1548ae028ff22d13 hello_world.yml 

62 

63 Or maybe like this: 

64 f82da8b4f98a3d3125fbc98408911f65dbc8dc38c0f38e258ebe290a8ad3d3c0 *binary 

65 """ 

66 

67 parts = line.split(" ", 1) 

68 if len(parts) != 2 or len(parts[0]) != 64: 

69 return False 

70 

71 if len(parts[1]) < 2 or parts[1][0] not in (" ", "*"): 

72 return False 

73 

74 shasum = parts[0] 

75 path = parts[1][1:] 

76 return (path, shasum) 

77 

78 def parse(self, checksum_file_contents): 

79 """ 

80 Given a complete checksum manifest as a string, parse it and return a 

81 dict with the result, keyed on each filename or path. 

82 """ 

83 checksums = {} 

84 for idx, line in enumerate(checksum_file_contents.splitlines()): 

85 if not line.strip(): 

86 continue 

87 # parsed = self._parse_bsd_style(line) 

88 # if parsed is False: 

89 parsed = self._parse_gnu_style(line) 

90 if parsed is False: 

91 raise InvalidChecksumLine(f"Unparsable checksum, line {idx + 1}: {line}") 

92 path = parsed[0] 

93 shasum = parsed[1] 

94 if path in checksums: 

95 raise InvalidChecksumLine(f"Duplicate path in checksum, line {idx + 1}: {line}") 

96 checksums[path] = shasum 

97 return checksums 

98 

99 def diff(self, paths): 

100 """ 

101 Given a collection of paths, use the differ to figure out which files 

102 (in reality) have been added/removed from the project root (or latest 

103 SCM tree). 

104 """ 

105 

106 paths = set(paths) 

107 return self.differ.compare_filelist(paths) 

108 

109 def generate_gnu_style(self): 

110 """ 

111 Using the root directory and 'differ' class given to the constructor, 

112 generate a GNU-style checksum manifest file. This is always generated 

113 from scratch by finding the list of relevant files in the root directory 

114 (by asking the differ), and calculating the checksum for each of them. 

115 

116 The resulting list is always sorted by filename. 

117 """ 

118 lines = [] 

119 calculated = self.calculate_checksums_from_root(verifying=False) 

120 for path, checksum in sorted(calculated.items()): 

121 # *two* spaces here - it's important for compat with coreutils. 

122 lines.append(f"{checksum} {path}") 

123 return "\n".join(lines) + "\n" 

124 

125 def calculate_checksum(self, path): 

126 shasum = hashlib.sha256() 

127 with open(path, "rb") as f: 

128 while True: 

129 chunk = f.read(65536) 

130 if not chunk: 

131 break 

132 shasum.update(chunk) 

133 return shasum.hexdigest() 

134 

135 def calculate_checksums_from_root(self, verifying): 

136 """ 

137 Using the root of the project and the differ class passed to the 

138 constructor, iterate over all files in the project and calculate their 

139 checksums. Return a dictionary of the result, keyed on the filename. 

140 

141 Just calling this is not enough in many cases- you want to ensure that 

142 the files in the checksum list are the same ones present in reality. 

143 diff() above does just that. Use that in combination with this method, 

144 or use verify() which does it for you. 

145 """ 

146 out = {} 

147 for path in self.differ.list_files(verifying=verifying): 

148 shasum = self.calculate_checksum(os.path.join(self.root, path)) 

149 out[path] = shasum 

150 return out 

151 

152 def verify(self, parsed_manifest_dct, diff=True): 

153 """ 

154 Takes a parsed manifest file (e.g. using parse(), with paths as keys and 

155 checksums as values). 

156 

157 Then calculates the current list of files in the project root. If paths 

158 have been added or removed, ChecksumMismatch is raised. 

159 

160 Otherwise, each the checksum of file in the project root (and subdirs) 

161 is calculated and that result is checked to be equal to the parsed 

162 checksums passed in. 

163 """ 

164 

165 if diff: 165 ↛ 171line 165 didn't jump to line 171, because the condition on line 165 was never false

166 # If there are any differences in existing paths, fail the check... 

167 differences = self.diff(parsed_manifest_dct.keys()) 

168 if differences["added"] or differences["removed"]: 

169 raise ChecksumMismatch(differences) 

170 

171 recalculated = self.calculate_checksums_from_root(verifying=True) 

172 mismatches = set() 

173 for parsed_path, parsed_checksum in parsed_manifest_dct.items(): 

174 if recalculated[parsed_path] != parsed_checksum: 

175 mismatches.add(parsed_path) 

176 if mismatches: 

177 raise ChecksumMismatch(f"Checksum mismatch: {', '.join(mismatches)}") 

178 

179 return True