Coverage for src/ansible_sign/checksum/base.py: 99%

81 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-07-02 14:12 +0000

1import hashlib 

2import os 

3 

4 

5class InvalidChecksumLine(Exception): 

6 pass 

7 

8 

9class NoDifferException(Exception): 

10 pass 

11 

12 

13class ChecksumMismatch(Exception): 

14 pass 

15 

16 

17class ChecksumFile: 

18 """ 

19 Slurp a checksum file and be able to check and compare its contents to a 

20 given root directory. Also: be able to write out a checksum file. 

21 

22 We only allow sha256 for now, though supporting 512, etc. would be easy. 

23 """ 

24 

25 def __init__(self, root, differ=None): 

26 self.root = root 

27 if differ is not None: 

28 self.differ = differ(root=self.root) 

29 else: 

30 from .differ.distlib_manifest import ( 

31 DistlibManifestChecksumFileExistenceDiffer, 

32 ) 

33 

34 self.differ = DistlibManifestChecksumFileExistenceDiffer(root=self.root) 

35 

36 @property 

37 def differ_warnings(self): 

38 """ 

39 A differ can store a set of warnings (as strings) in the_differ.warnings 

40 which we can propagate up here. This allows calling code to display any 

41 warnings found during diffing time. 

42 """ 

43 

44 return self.differ.warnings 

45 

46 @property 

47 def warnings(self): 

48 """ 

49 Right now this is just the same as differ_warnings. In the future it 

50 might include warnings that we differ in methods in this class as well. 

51 """ 

52 

53 return self.differ_warnings 

54 

55 def _parse_gnu_style(self, line): 

56 """ 

57 Attempt to parse a GNU style line checksum line, returning False if 

58 we are unable to. 

59 

60 A GNU style line looks like this: 

61 f712979c4c5dfe739253908d122f5c87faa8b5de6f15ba7a1548ae028ff22d13 hello_world.yml 

62 

63 Or maybe like this: 

64 f82da8b4f98a3d3125fbc98408911f65dbc8dc38c0f38e258ebe290a8ad3d3c0 *binary 

65 """ 

66 

67 parts = line.split(" ", 1) 

68 if len(parts) != 2 or len(parts[0]) != 64: 

69 return False 

70 

71 if len(parts[1]) < 2 or parts[1][0] not in (" ", "*"): 

72 return False 

73 

74 shasum = parts[0] 

75 path = parts[1][1:] 

76 return (path, shasum) 

77 

78 def parse(self, checksum_file_contents): 

79 """ 

80 Given a complete checksum manifest as a string, parse it and return a 

81 dict with the result, keyed on each filename or path. 

82 """ 

83 checksums = {} 

84 for idx, line in enumerate(checksum_file_contents.splitlines()): 

85 if not line.strip(): 

86 continue 

87 # parsed = self._parse_bsd_style(line) 

88 # if parsed is False: 

89 parsed = self._parse_gnu_style(line) 

90 if parsed is False: 

91 raise InvalidChecksumLine( 

92 f"Unparsable checksum, line {idx + 1}: {line}" 

93 ) 

94 path = parsed[0] 

95 shasum = parsed[1] 

96 if path in checksums: 

97 raise InvalidChecksumLine( 

98 f"Duplicate path in checksum, line {idx + 1}: {line}" 

99 ) 

100 checksums[path] = shasum 

101 return checksums 

102 

103 def diff(self, paths): 

104 """ 

105 Given a collection of paths, use the differ to figure out which files 

106 (in reality) have been added/removed from the project root (or latest 

107 SCM tree). 

108 """ 

109 

110 paths = set(paths) 

111 return self.differ.compare_filelist(paths) 

112 

113 def generate_gnu_style(self): 

114 """ 

115 Using the root directory and 'differ' class given to the constructor, 

116 generate a GNU-style checksum manifest file. This is always generated 

117 from scratch by finding the list of relevant files in the root directory 

118 (by asking the differ), and calculating the checksum for each of them. 

119 

120 The resulting list is always sorted by filename. 

121 """ 

122 lines = [] 

123 calculated = self.calculate_checksums_from_root(verifying=False) 

124 for path, checksum in sorted(calculated.items()): 

125 # *two* spaces here - it's important for compat with coreutils. 

126 lines.append(f"{checksum} {path}") 

127 return "\n".join(lines) + "\n" 

128 

129 def calculate_checksum(self, path): 

130 shasum = hashlib.sha256() 

131 with open(path, "rb") as f: 

132 while True: 

133 chunk = f.read(65536) 

134 if not chunk: 

135 break 

136 shasum.update(chunk) 

137 return shasum.hexdigest() 

138 

139 def calculate_checksums_from_root(self, verifying): 

140 """ 

141 Using the root of the project and the differ class passed to the 

142 constructor, iterate over all files in the project and calculate their 

143 checksums. Return a dictionary of the result, keyed on the filename. 

144 

145 Just calling this is not enough in many cases- you want to ensure that 

146 the files in the checksum list are the same ones present in reality. 

147 diff() above does just that. Use that in combination with this method, 

148 or use verify() which does it for you. 

149 """ 

150 out = {} 

151 for path in self.differ.list_files(verifying=verifying): 

152 shasum = self.calculate_checksum(os.path.join(self.root, path)) 

153 out[path] = shasum 

154 return out 

155 

156 def verify(self, parsed_manifest_dct, diff=True): 

157 """ 

158 Takes a parsed manifest file (e.g. using parse(), with paths as keys and 

159 checksums as values). 

160 

161 Then calculates the current list of files in the project root. If paths 

162 have been added or removed, ChecksumMismatch is raised. 

163 

164 Otherwise, each the checksum of file in the project root (and subdirs) 

165 is calculated and that result is checked to be equal to the parsed 

166 checksums passed in. 

167 """ 

168 

169 if diff: 169 ↛ 175line 169 didn't jump to line 175 because the condition on line 169 was always true

170 # If there are any differences in existing paths, fail the check... 

171 differences = self.diff(parsed_manifest_dct.keys()) 

172 if differences["added"] or differences["removed"]: 

173 raise ChecksumMismatch(differences) 

174 

175 recalculated = self.calculate_checksums_from_root(verifying=True) 

176 mismatches = set() 

177 for parsed_path, parsed_checksum in parsed_manifest_dct.items(): 

178 if recalculated[parsed_path] != parsed_checksum: 

179 mismatches.add(parsed_path) 

180 if mismatches: 

181 raise ChecksumMismatch(f"Checksum mismatch: {', '.join(mismatches)}") 

182 

183 return True