Coverage for src/ansible_sign/cli.py: 88%

189 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-07-02 14:12 +0000

1import argparse 

2from distlib.manifest import DistlibException 

3import getpass 

4import logging 

5import os 

6import sys 

7 

8from ansible_sign import __version__ 

9from ansible_sign.checksum import ( 

10 ChecksumFile, 

11 ChecksumMismatch, 

12 InvalidChecksumLine, 

13) 

14from ansible_sign.checksum.differ import DistlibManifestChecksumFileExistenceDiffer 

15from ansible_sign.signing import GPGSigner, GPGVerifier 

16 

17__author__ = "Rick Elrod" 

18__copyright__ = "(c) 2022 Red Hat, Inc." 

19__license__ = "MIT" 

20 

21# This is relative to the project root passed in by the user at runtime. 

22ANSIBLE_SIGN_DIR = ".ansible-sign" 

23 

24 

25class AnsibleSignCLI: 

26 def __init__(self, args): 

27 self.logger = logging.getLogger(__name__) 

28 self.logger.debug("Parsing args: %s", str(args)) 

29 self.args = self.parse_args(args) 

30 logformat = "[%(asctime)s] %(levelname)s:%(name)s:%(message)s" 

31 logging.basicConfig( 

32 level=self.args.loglevel, 

33 stream=sys.stdout, 

34 format=logformat, 

35 datefmt="%Y-%m-%d %H:%M:%S", 

36 ) 

37 

38 def run_command(self): 

39 """ 

40 parse_args() will set self.args.func() to the function we wish to 

41 execute, based on the subcommand the user ran. These 'action functions' 

42 will return the integer exit code with which we exit at the very end. 

43 

44 Roughly: 

45 0 = success 

46 1 = error (e.g. file missing, permissions issue, couldn't parse checksum file, etc.) 

47 2 = checksum verification failed 

48 3 = signature verification failed 

49 4 = signing failed 

50 """ 

51 return self.args.func() 

52 

53 def parse_args(self, args): 

54 """ 

55 Parse command line parameters 

56 

57 Args: 

58 args (List[str]): command line parameters as list of strings 

59 (for example ``["--help"]``). 

60 

61 Returns: 

62 :obj:`argparse.Namespace`: command line parameters namespace 

63 """ 

64 

65 parser = argparse.ArgumentParser( 

66 description="Signing and validation for Ansible content" 

67 ) 

68 parser.add_argument( 

69 "--version", 

70 action="version", 

71 version="ansible-sign {ver}".format(ver=__version__), 

72 ) 

73 parser.add_argument( 

74 "--debug", 

75 help="Print a bunch of debug info", 

76 action="store_const", 

77 dest="loglevel", 

78 const=logging.DEBUG, 

79 ) 

80 parser.add_argument( 

81 "--nocolor", 

82 help="Disable color output", 

83 required=False, 

84 dest="nocolor", 

85 default=True if len(os.environ.get("NO_COLOR", "")) else False, 

86 action="store_true", 

87 ) 

88 

89 # Future-proofing for future content types. 

90 content_type_parser = parser.add_subparsers( 

91 required=True, dest="content_type", metavar="CONTENT_TYPE" 

92 ) 

93 

94 project = content_type_parser.add_parser( 

95 "project", 

96 help="Act on an Ansible project directory", 

97 ) 

98 project_commands = project.add_subparsers(required=True, dest="command") 

99 

100 # command: gpg-verify 

101 cmd_gpg_verify = project_commands.add_parser( 

102 "gpg-verify", 

103 help=( 

104 "Perform signature validation AND checksum verification on the checksum manifest" 

105 ), 

106 ) 

107 cmd_gpg_verify.set_defaults(func=self.gpg_verify) 

108 cmd_gpg_verify.add_argument( 

109 "--keyring", 

110 help=( 

111 "The GPG keyring file to use to find the matching public key. (default: the user's default keyring)" 

112 ), 

113 required=False, 

114 metavar="KEYRING", 

115 dest="keyring", 

116 default=None, 

117 ) 

118 cmd_gpg_verify.add_argument( 

119 "--gnupg-home", 

120 help=( 

121 "A valid GnuPG home directory. (default: the GnuPG default, usually ~/.gnupg)" 

122 ), 

123 required=False, 

124 metavar="GNUPG_HOME", 

125 dest="gnupg_home", 

126 default=None, 

127 ) 

128 cmd_gpg_verify.add_argument( 

129 "project_root", 

130 help="The directory containing the files being validated and verified", 

131 metavar="PROJECT_ROOT", 

132 ) 

133 

134 # command: gpg-sign 

135 cmd_gpg_sign = project_commands.add_parser( 

136 "gpg-sign", 

137 help="Generate a checksum manifest and GPG sign it", 

138 ) 

139 cmd_gpg_sign.set_defaults(func=self.gpg_sign) 

140 cmd_gpg_sign.add_argument( 

141 "--fingerprint", 

142 help=( 

143 "The GPG private key fingerprint to sign with. (default: First usable key in the user's keyring)" 

144 ), 

145 required=False, 

146 metavar="PRIVATE_KEY", 

147 dest="fingerprint", 

148 default=None, 

149 ) 

150 cmd_gpg_sign.add_argument( 

151 "-p", 

152 "--prompt-passphrase", 

153 help="Prompt for a GPG key passphrase", 

154 required=False, 

155 dest="prompt_passphrase", 

156 default=False, 

157 action="store_true", 

158 ) 

159 cmd_gpg_sign.add_argument( 

160 "--gnupg-home", 

161 help=( 

162 "A valid GnuPG home directory. (default: the GnuPG default, usually ~/.gnupg)" 

163 ), 

164 required=False, 

165 metavar="GNUPG_HOME", 

166 dest="gnupg_home", 

167 default=None, 

168 ) 

169 cmd_gpg_sign.add_argument( 

170 "project_root", 

171 help="The directory containing the files being validated and verified", 

172 metavar="PROJECT_ROOT", 

173 ) 

174 return parser.parse_args(args) 

175 

176 def _generate_checksum_manifest(self): 

177 differ = DistlibManifestChecksumFileExistenceDiffer 

178 checksum = ChecksumFile(self.args.project_root, differ=differ) 

179 try: 

180 manifest = checksum.generate_gnu_style() 

181 except FileNotFoundError as e: 

182 if os.path.islink(e.filename): 

183 self._error( 

184 f"Broken symlink found at {e.filename} -- this is not supported. Aborting." 

185 ) 

186 return False 

187 

188 if e.filename.endswith("/MANIFEST.in"): 188 ↛ 197line 188 didn't jump to line 197 because the condition on line 188 was always true

189 self._error( 

190 "Could not find a MANIFEST.in file in the specified project." 

191 ) 

192 self._note( 

193 "If you are attempting to sign a project, please create this file." 

194 ) 

195 self._note("See the ansible-sign documentation for more information.") 

196 return False 

197 raise e 

198 except DistlibException as e: 

199 self._error(f"An error was encountered while parsing MANIFEST.in: {e}") 

200 if self.args.loglevel != logging.DEBUG: 200 ↛ 201line 200 didn't jump to line 201 because the condition on line 200 was never true

201 self._note( 

202 "You can use the --debug global flag to view the full traceback." 

203 ) 

204 self.logger.debug(e, exc_info=e) 

205 return False 

206 for warning in checksum.warnings: 206 ↛ 207line 206 didn't jump to line 207 because the loop on line 206 never started

207 self._warn(warning) 

208 self.logger.debug( 

209 "Full calculated checksum manifest (%s):\n%s", 

210 self.args.project_root, 

211 manifest, 

212 ) 

213 return manifest 

214 

215 def _error(self, msg): 

216 if self.args.nocolor: 

217 print(f"[ERROR] {msg}") 

218 else: 

219 print(f"[\033[91mERROR\033[0m] {msg}") 

220 

221 def _ok(self, msg): 

222 if self.args.nocolor: 

223 print(f"[OK ] {msg}") 

224 else: 

225 print(f"[\033[92mOK \033[0m] {msg}") 

226 

227 def _note(self, msg): 

228 if self.args.nocolor: 

229 print(f"[NOTE ] {msg}") 

230 else: 

231 print(f"[\033[94mNOTE \033[0m] {msg}") 

232 

233 def _warn(self, msg): 

234 if self.args.nocolor: 

235 print(f"[WARN ] {msg}") 

236 else: 

237 print(f"[\033[93mWARN \033[0m] {msg}") 

238 

239 def validate_checksum(self): 

240 """ 

241 Validate a checksum manifest file. Print a pretty message and return an 

242 appropriate exit code. 

243 

244 NOTE that this function does not actually check the path for existence, it 

245 leaves that to the caller (which in nearly all cases would need to do so 

246 anyway). This function will throw FileNotFoundError if the manifest does not 

247 exist. 

248 """ 

249 differ = DistlibManifestChecksumFileExistenceDiffer 

250 checksum = ChecksumFile(self.args.project_root, differ=differ) 

251 checksum_path = os.path.join( 

252 self.args.project_root, ".ansible-sign", "sha256sum.txt" 

253 ) 

254 checksum_file_contents = open(checksum_path, "r").read() 

255 

256 try: 

257 manifest = checksum.parse(checksum_file_contents) 

258 except InvalidChecksumLine as e: 

259 self._error(f"Invalid line encountered in checksum manifest: {e}") 

260 return 1 

261 

262 try: 

263 checksum.verify(manifest, diff=True) 

264 except ChecksumMismatch as e: 

265 self._error("Checksum validation failed.") 

266 self._error(str(e)) 

267 return 2 

268 except FileNotFoundError as e: 

269 if os.path.islink(e.filename): 269 ↛ 270line 269 didn't jump to line 270 because the condition on line 269 was never true

270 self._error( 

271 f"Broken symlink found at {e.filename} -- this is not supported. Aborting." 

272 ) 

273 return 1 

274 

275 if e.filename.endswith("MANIFEST.in"): 275 ↛ 293line 275 didn't jump to line 293 because the condition on line 275 was always true

276 self._error( 

277 "Could not find a MANIFEST.in file in the specified project." 

278 ) 

279 self._note( 

280 "If you are attempting to verify a signed project, please ensure that the project directory includes this file after signing." 

281 ) 

282 self._note("See the ansible-sign documentation for more information.") 

283 return 1 

284 except DistlibException as e: 

285 self._error(f"An error was encountered while parsing MANIFEST.in: {e}") 

286 if self.args.loglevel != logging.DEBUG: 286 ↛ 287line 286 didn't jump to line 287 because the condition on line 286 was never true

287 self._note( 

288 "You can use the --debug global flag to view the full traceback." 

289 ) 

290 self.logger.debug(e, exc_info=e) 

291 return 1 

292 

293 for warning in checksum.warnings: 293 ↛ 294line 293 didn't jump to line 294 because the loop on line 293 never started

294 self._warn(warning) 

295 

296 self._ok("Checksum validation succeeded.") 

297 return 0 

298 

299 def gpg_verify(self): 

300 signature_file = os.path.join( 

301 self.args.project_root, ".ansible-sign", "sha256sum.txt.sig" 

302 ) 

303 manifest_file = os.path.join( 

304 self.args.project_root, ".ansible-sign", "sha256sum.txt" 

305 ) 

306 

307 if not os.path.exists(signature_file): 

308 self._error(f"Signature file does not exist: {signature_file}") 

309 return 1 

310 

311 if not os.path.exists(manifest_file): 

312 self._error(f"Checksum manifest file does not exist: {manifest_file}") 

313 return 1 

314 

315 if self.args.keyring is not None and not os.path.exists(self.args.keyring): 

316 self._error(f"Specified keyring file not found: {self.args.keyring}") 

317 return 1 

318 

319 if self.args.gnupg_home is not None and not os.path.isdir(self.args.gnupg_home): 

320 self._error( 

321 f"Specified GnuPG home is not a directory: {self.args.gnupg_home}" 

322 ) 

323 return 1 

324 

325 verifier = GPGVerifier( 

326 manifest_path=manifest_file, 

327 detached_signature_path=signature_file, 

328 gpg_home=self.args.gnupg_home, 

329 keyring=self.args.keyring, 

330 ) 

331 

332 result = verifier.verify() 

333 

334 if result.success is not True: 

335 self._error(result.summary) 

336 self._note("Re-run with the global --debug flag for more information.") 

337 self.logger.debug(result.extra_information) 

338 return 3 

339 

340 self._ok(result.summary) 

341 

342 # GPG verification is done and we are still here, so return based on 

343 # checksum validation now. 

344 return self.validate_checksum() 

345 

346 def _write_file_or_print(self, dest, contents): 

347 if dest == "-": 347 ↛ 348line 347 didn't jump to line 348 because the condition on line 347 was never true

348 print(contents, end="") 

349 return 

350 

351 outdir = os.path.dirname(dest) 

352 

353 if len(outdir) > 0 and not os.path.isdir(outdir): 353 ↛ 354line 353 didn't jump to line 354 because the condition on line 353 was never true

354 self.logger.info("Creating output directory: %s", outdir) 

355 os.makedirs(outdir) 

356 

357 with open(dest, "w") as f: 

358 f.write(contents) 

359 self.logger.info("Wrote to file: %s", dest) 

360 

361 def gpg_sign(self): 

362 # Step 1: Manifest 

363 manifest_path = os.path.join( 

364 self.args.project_root, ".ansible-sign", "sha256sum.txt" 

365 ) 

366 checksum_file_contents = self._generate_checksum_manifest() 

367 if checksum_file_contents is False: 

368 return 1 

369 self._write_file_or_print(manifest_path, checksum_file_contents) 

370 

371 # Step 2: Signing 

372 # Do they need a passphrase? 

373 passphrase = None 

374 if self.args.prompt_passphrase: 

375 self.logger.debug("Prompting for GPG key passphrase") 

376 passphrase = getpass.getpass("GPG Key Passphrase: ") 

377 elif "ANSIBLE_SIGN_GPG_PASSPHRASE" in os.environ: 

378 self.logger.debug( 

379 "Taking GPG key passphrase from ANSIBLE_SIGN_GPG_PASSPHRASE env var" 

380 ) 

381 passphrase = os.environ["ANSIBLE_SIGN_GPG_PASSPHRASE"] 

382 else: 

383 os.environ["GPG_TTY"] = os.ttyname(sys.stdin.fileno()) 

384 

385 signature_path = os.path.join( 

386 self.args.project_root, ".ansible-sign", "sha256sum.txt.sig" 

387 ) 

388 signer = GPGSigner( 

389 manifest_path=manifest_path, 

390 output_path=signature_path, 

391 privkey=self.args.fingerprint, 

392 passphrase=passphrase, 

393 gpg_home=self.args.gnupg_home, 

394 ) 

395 result = signer.sign() 

396 if result.success: 396 ↛ 400line 396 didn't jump to line 400 because the condition on line 396 was always true

397 self._ok("GPG signing successful!") 

398 retcode = 0 

399 else: 

400 self._error("GPG signing FAILED!") 

401 self._note("Re-run with the global --debug flag for more information.") 

402 retcode = 4 

403 

404 self._note(f"Checksum manifest: {manifest_path}") 

405 self._note(f"GPG summary: {result.summary}") 

406 self.logger.debug(f"GPG Details: {result.extra_information}") 

407 return retcode 

408 

409 

410def main(args): 

411 cli = AnsibleSignCLI(args) 

412 cli.logger.debug("Running requested command/passing to function") 

413 exitcode = cli.run_command() 

414 cli.logger.info("Script ends here, rc=%d", exitcode) 

415 return exitcode 

416 

417 

418def run(): 

419 """Calls :func:`main` passing the CLI arguments extracted from :obj:`sys.argv` 

420 

421 This function can be used as entry point to create console scripts with setuptools. 

422 """ 

423 return main(sys.argv[1:]) 

424 

425 

426if __name__ == "__main__": 

427 run()