#16 Add keepassxc-cli flag

Open Sean Breckenridge seanbreckenridge

No flags found

Use flags to group coverage reports by test type, project and/or folders.
Then setup custom commit statuses and notifications for each flag.

e.g., #unittest #integration

#production #enterprise

#frontend #backend

Learn more about Codecov Flags here.


@@ -38,8 +38,10 @@
Loading
38 38
                if key in self.__class__.parsed_attrs:
39 39
                    setattr(self, key, value)
40 40
41 -
        if not hasattr(self, "password") or (hasattr(self, "password") and getattr(self, "password") is None):
42 -
            raise ValueError("Ignoring entry with no password: {}".format(self))
41 +
        if not hasattr(self, "password") or (hasattr(
42 +
                self, "password") and getattr(self, "password") is None):
43 +
            raise ValueError(
44 +
                "Ignoring entry with no password: {}".format(self))
43 45
44 46
        self._sha1: Optional[str] = None
45 47
@@ -52,19 +54,15 @@
Loading
52 54
        if self._sha1 is None:
53 55
            if self.password is not None:  # type: ignore
54 56
                self._sha1 = (
55 -
                    hashlib.sha1(self.password.encode("utf-8")).hexdigest().upper()  # type: ignore
56 -
                )
57 +
                    hashlib.sha1(self.password.encode(  # type: ignore
58 +
                        "utf-8")).hexdigest().upper())
57 59
        return self._sha1
58 60
59 61
    def __eq__(self, other):
60 62
        if not isinstance(other, self.__class__):
61 63
            return False
62 64
        try:
63 -
            return (
64 -
                self.title == other.title
65 -
                and self.username == other.username
66 -
                and self.password == other.password
67 -
            )
65 +
            return self.title == other.title and self.username == other.username and self.password == other.password
68 66
        except AttributeError:
69 67
            return self.password == other.password
70 68
@@ -85,7 +83,10 @@
Loading
85 83
    attrs = ["database_file", "key_file"]
86 84
87 85
    def __init__(
88 -
        self, database_file: pathlib.Path, key_file: Optional[pathlib.Path] = None,
86 +
        self,
87 +
        database_file: pathlib.Path,
88 +
        key_file: Optional[pathlib.Path] = None,
89 +
        keepassxc_cli_location: Optional[pathlib.Path] = None,
89 90
    ):
90 91
        self.database_file = database_file
91 92
        self.key_file = key_file
@@ -94,6 +95,8 @@
Loading
94 95
        self._password: Optional[str] = None
95 96
        self._credentials: Optional[List[Credential]] = None
96 97
98 +
        self.keepass_wrapper = KeepassWrapper(keepassxc_cli_location)
99 +
97 100
    @property
98 101
    def password(self) -> str:
99 102
        """
@@ -110,9 +113,8 @@
Loading
110 113
            self._password = os.environ["KEEPASSXC_PWNED_PASSWD"]
111 114
            return self._password
112 115
        else:
113 -
            self._password = getpass(
114 -
                "Insert password for {}: ".format(self.database_file)
115 -
            )
116 +
            self._password = getpass("Insert password for {}: ".format(
117 +
                self.database_file))
116 118
            return self._password
117 119
118 120
    @property
@@ -131,7 +133,7 @@
Loading
131 133
        """
132 134
        if self._xml_tree is not None:
133 135
            return  # already called, use cached value
134 -
        keepass_export_process_output: str = KeepassWrapper.export_database(
136 +
        keepass_export_process_output: str = self.keepass_wrapper.export_database(
135 137
            database_file=self.database_file,
136 138
            database_password=self.password,  # calls getpass if not set
137 139
            database_keyfile=self.key_file,
@@ -157,5 +159,6 @@
Loading
157 159
                    self._credentials.append(cred)
158 160
                except ValueError as no_pw:
159 161
                    logger.debug(str(no_pw))
160 -
        logger.debug("KeepassXC parsed entry count: {}".format(len(self._credentials)))
162 +
        logger.debug("KeepassXC parsed entry count: {}".format(
163 +
            len(self._credentials)))
161 164
        return self._credentials

@@ -11,77 +11,95 @@
Loading
11 11
12 12
from .log import logger
13 13
14 +
# redirect STDIN/STDERR from subprocess
15 +
subprocess_piped = partial(subprocess.run,
16 +
                           encoding="utf-8",
17 +
                           stdout=subprocess.PIPE,
18 +
                           stderr=subprocess.PIPE)
19 +
14 20
15 21
class KeepassWrapper:
16 22
    """
17 23
    Functions that create a subprocess and call the keepassxc-cli shell command
18 24
    """
19 25
20 -
    subprocess_piped = partial(
21 -
        subprocess.run, encoding="utf-8", stdout=subprocess.PIPE, stderr=subprocess.PIPE
22 -
    )
26 +
    KEEPASSXC_CLI_NAME = "keepassxc-cli"
23 27
24 -
    @staticmethod
25 -
    def verify_binary_exists():
26 -
        """Make sure the keepassxc-cli binary exists"""
27 -
        if shutil.which("keepassxc-cli") is None:
28 -
            logger.critical(
29 -
                "Could not find the keepassxc-cli binary. Verify its installed and on your $PATH."
30 -
            )
31 -
            sys.exit(1)
28 +
    def __init__(self, keepassxc_cli_location: Optional[pathlib.Path] = None):
29 +
        """
30 +
        Create an instance of KeepassWrapper, with an optional
31 +
        path to the keepassxc-cli binary.
32 +
33 +
34 +
        Allow the path of the keepassxc-cli to be changed because
35 +
        some package managers (e.g. mint with snap) call
36 +
        this the binary keepassxc.cli
37 +
        """
38 +
39 +
        if keepassxc_cli_location is None:
40 +
            # search for keepassxc-cli which default name on $PATH
41 +
            keepassxc_location_output = shutil.which(
42 +
                self.__class__.KEEPASSXC_CLI_NAME)
43 +
            if keepassxc_location_output is None:
44 +
                logger.critical(
45 +
                    "Could not find a binary called {} on your $PATH.".format(
46 +
                        self.__class__.KEEPASSXC_CLI_NAME))
47 +
                sys.exit(1)
48 +
            self.keepassxc_cli_location: pathlib.Path = pathlib.Path(
49 +
                keepassxc_location_output)
50 +
        else:
51 +
            self.keepassxc_cli_location = keepassxc_cli_location
52 +
        logger.debug("keepassxc-cli location: {}".format(
53 +
            self.keepassxc_cli_location))
32 54
33 -
    @staticmethod
34 -
    def version() -> StrictVersion:
55 +
    def version(self) -> StrictVersion:
35 56
        """Returns the KeepassXC Version"""
36 -
        version_proc: subprocess.CompletedProcess = KeepassWrapper.subprocess_piped(
37 -
            shlex.split("keepassxc-cli --version")
38 -
        )
57 +
        version_proc: subprocess.CompletedProcess = subprocess_piped(
58 +
            [self.keepassxc_cli_location, "--version"])
39 59
        version_str: str = version_proc.stdout.strip()
40 -
        logger.debug("keepassxc-cli version: {}".format(version_str))
60 +
        logger.debug("keepassxc cli version: {}".format(version_str))
41 61
        return StrictVersion(version_str)
42 62
43 -
    @classmethod
44 -
    def backwards_compatible_export(cls) -> str:
63 +
    def backwards_compatible_export(self) -> str:
45 64
        """
46 65
        In KeepassXC version 2.5.0, the extract command was re-named to export
47 66
        Attempt to parse the version number and generate the correct subcommand
48 67
        """
49 68
        try:
50 -
            version: StrictVersion = cls.version()
69 +
            version: StrictVersion = self.version()
51 70
            if version < StrictVersion("2.5.0"):
52 71
                return "extract"
53 72
            else:
54 73
                return "export"
55 74
        except ValueError:
56 75
            return "export"
57 76
58 -
    @classmethod
59 77
    def export_database(
60 -
        cls,
78 +
        self,
61 79
        database_file: pathlib.Path,
62 80
        database_password: str,
63 81
        database_keyfile: Optional[pathlib.Path] = None,
82 +
        keepassxc_cli_location: Optional[pathlib.Path] = None,
64 83
    ) -> str:
65 84
        """Calls the keepassxc-cli export command, returns the output from the command"""
66 85
67 -
        command_parts: List[str] = ["keepassxc-cli", cls.backwards_compatible_export()]
86 +
        command_parts: List[str] = [
87 +
            str(self.keepassxc_cli_location),
88 +
            self.backwards_compatible_export()
89 +
        ]
68 90
        if database_keyfile is not None:
69 91
            command_parts.extend(["-k", str(database_keyfile)])
70 92
        command_parts.append(str(database_file))
71 93
        command_str: str = " ".join(command_parts)
72 94
        logger.debug("Export database command: {}".format(command_str))
73 -
        keepassxc_output: subprocess.CompletedProcess = KeepassWrapper.subprocess_piped(
74 -
            shlex.split(command_str), input=database_password,
95 +
        keepassxc_output: subprocess.CompletedProcess = subprocess_piped(
96 +
            shlex.split(command_str),
97 +
            input=database_password,
75 98
        )
76 99
        if keepassxc_output.returncode != 0:
100 +
            logger.critical("Error communicating with {}".format(
101 +
                self.keepassxc_cli_location))
77 102
            logger.critical(keepassxc_output.stderr)
78 103
            sys.exit(1)
79 104
        # python doesn't like the version tag, remove the first line
80 105
        return keepassxc_output.stdout.split(os.linesep, 2)[-1]
81 -
82 -
83 -
# When this is imported, make sure that the keepassxc-cli binary exists
84 -
binary_exists = False
85 -
if not binary_exists:
86 -
    KeepassWrapper.verify_binary_exists()
87 -
    binary_exists = True

@@ -9,12 +9,10 @@
Loading
9 9
10 10
        return "{}({})".format(
11 11
            self.__class__.__name__,
12 -
            ", ".join(
13 -
                [
14 -
                    "{}={}".format(a, repr(getattr(self, a, None)))
15 -
                    for a in self.__class__.attrs  # type: ignore
16 -
                ]
17 -
            ),
12 +
            ", ".join([
13 +
                "{}={}".format(a, repr(getattr(self, a, None)))
14 +
                for a in self.__class__.attrs  # type: ignore
15 +
            ]),
18 16
        )
19 17
20 18
    def __str__(self) -> str:

@@ -15,32 +15,33 @@
Loading
15 15
    "--plaintext",
16 16
    default=False,
17 17
    is_flag=True,
18 -
    help="Print breached passwords in plaintext; defaults to sha1 hashes.",
19 -
)
20 -
@click.option(
21 -
    "-k",
22 -
    "--key-file",
23 -
    required=False,
24 -
    type=click.Path(exists=True),
25 -
    help="Key file for the database",
26 -
)
27 -
@click.option(
28 -
    "-v", "--verbose", default=False, is_flag=True, help="Print debug messages"
29 -
)
30 -
@click.option(
31 -
    "-q",
32 -
    "--quiet",
33 -
    default=False,
34 -
    is_flag=True,
35 -
    help="Don't print status messages, just the summary",
36 -
)
18 +
    help="Print breached passwords in plaintext; defaults to sha1 hashes.")
19 +
@click.option("-k",
20 +
              "--key-file",
21 +
              required=False,
22 +
              type=click.Path(exists=True),
23 +
              help="Key file for the database")
24 +
@click.option("-v",
25 +
              "--verbose",
26 +
              default=False,
27 +
              is_flag=True,
28 +
              help="Print debug messages")
29 +
@click.option("-q",
30 +
              "--quiet",
31 +
              default=False,
32 +
              is_flag=True,
33 +
              help="Don't print status messages, just the summary")
34 +
@click.option("--keepassxc-cli",
35 +
              type=click.Path(exists=True),
36 +
              help="Specify a different location for the keepassxc-cli binary")
37 37
@click.argument("database", required=True, type=click.Path(exists=True))
38 -
def main(plaintext, key_file, verbose, quiet, database):
38 +
def main(plaintext, key_file, verbose, quiet, database, keepassxc_cli):
39 39
    """Check a keepassxc database against previously cracked haveibeenpwned passwords"""
40 -
    main_wrapper(plaintext, key_file, verbose, quiet, database)
40 +
    main_wrapper(plaintext, key_file, verbose, quiet, database, keepassxc_cli)
41 41
42 42
43 -
def main_wrapper(plaintext, key_file, verbose, quiet, database):
43 +
def main_wrapper(plaintext, key_file, verbose, quiet, database,
44 +
                 keepassxc_cli_location):
44 45
    """Called from main click command"""
45 46
46 47
    # setup logs before other imports to ensure correct log level
@@ -56,9 +57,12 @@
Loading
56 57
    from .parser import Database, Credential
57 58
    from .cache import PasswordCache
58 59
60 +
    if keepassxc_cli_location is not None:
61 +
        keepassxc_cli_location = pathlib.Path(keepassxc_cli_location)
59 62
    if key_file is not None:
60 63
        key_file: pathlib.Path = pathlib.Path(key_file)
61 -
    db: Database = Database(pathlib.Path(database), key_file)
64 +
    db: Database = Database(pathlib.Path(database), key_file,
65 +
                            keepassxc_cli_location)
62 66
    # maps sha1 hashes to credentials and their occurrence counts
63 67
    breached_passwords: List[Credential] = []
64 68
    pw_cache: Mapping[str, int] = PasswordCache()
@@ -70,26 +74,19 @@
Loading
70 74
            if occurrence_count > 0:
71 75
                logger.info(
72 76
                    "Found password for '{}' {} times in the dataset!".format(
73 -
                        credential.display(), occurrence_count
74 -
                    )
75 -
                )
77 +
                        credential.display(), occurrence_count))
76 78
                breached_passwords.append(credential)
77 79
        except PwnedPasswordException as http_err:
78 80
            logger.critical(str(http_err))
79 81
            logger.critical("Ignoring previous entry due to HTTP error")
80 82
    breached_passwords_count = len(breached_passwords)
81 83
    if breached_passwords_count > 0:
82 -
        print(
83 -
            "Found {} previously breached password{}:".format(
84 -
                breached_passwords_count, "s" if breached_passwords_count > 1 else ""
85 -
            )
86 -
        )
84 +
        print("Found {} previously breached password{}:".format(
85 +
            breached_passwords_count,
86 +
            "s" if breached_passwords_count > 1 else ""))
87 87
        for credential in breached_passwords:
88 88
            display_pw: str = credential.password if plaintext else credential.sha1
89 -
            print(
90 -
                "{}:{}:{}".format(
91 -
                    credential.display(), display_pw, pw_cache[credential.sha1]
92 -
                )
93 -
            )
89 +
            print("{}:{}:{}".format(credential.display(), display_pw,
90 +
                                    pw_cache[credential.sha1]))
94 91
    else:
95 92
        print("None of your passwords have been found breached.")

@@ -8,7 +8,6 @@
Loading
8 8
from .log import logger
9 9
from .exceptions import PwnedPasswordException
10 10
11 -
12 11
default_headers = {"User-Agent": "keepassxc-pwned"}
13 12
14 13
@@ -22,10 +21,14 @@
Loading
22 21
    res = requests.get(url, headers=default_headers)
23 22
    if res.status_code >= 400:
24 23
        # on occasion I've had random 409 cloudflare errors
25 -
        if res.status_code in [400, 403, 404]: # https://haveibeenpwned.com/API/v2#ResponseCodes
26 -
            raise PwnedPasswordException(
27 -
                {"url": url, "status_code": res.status_code, "http_text": res.text}
28 -
            )
24 +
        if res.status_code in [
25 +
                400, 403, 404
26 +
        ]:  # https://haveibeenpwned.com/API/v2#ResponseCodes
27 +
            raise PwnedPasswordException({
28 +
                "url": url,
29 +
                "status_code": res.status_code,
30 +
                "http_text": res.text
31 +
            })
29 32
        else:
30 33
            logger.warning("Request failed, retrying...")
31 34
            time.sleep(10)
@@ -34,7 +37,8 @@
Loading
34 37
35 38
36 39
# Adapted from: https://github.com/mikepound/pwned-search/blob/8efd8ffedd398756e26d52ef51206ba6d8e28f57/pwned.py#L12
37 -
def lookup_pwned(passwd: Optional[str] = None, pw_hash: Optional[str] = None) -> int:
40 +
def lookup_pwned(passwd: Optional[str] = None,
41 +
                 pw_hash: Optional[str] = None) -> int:
38 42
    """
39 43
    Returns number of times password was seen in pwned database
40 44
    Raises PwnedPasswordException on unrecoverable errors

Click to load this diff.
Loading diff...

Click to load this diff.
Loading diff...

Everything is accounted for!

No changes detected that need to be reviewed.
What changes does Codecov check for?
Lines, not adjusted in diff, that have changed coverage data.
Files that introduced coverage data that had none before.
Files that have missing coverage data that once were tracked.
Files Coverage
keepassxc_pwned 0.02% 97.78%
Project Totals (10 files) 97.78%
Loading