1
|
|
# Python
|
2
|
6
|
import base64
|
3
|
6
|
import re
|
4
|
|
|
5
|
|
# Django
|
6
|
6
|
from django.utils.translation import ugettext_lazy as _
|
7
|
6
|
from django.core.exceptions import ValidationError
|
8
|
|
|
9
|
|
# REST framework
|
10
|
6
|
from rest_framework.serializers import ValidationError as RestValidationError
|
11
|
6
|
from rest_framework.exceptions import ParseError
|
12
|
|
|
13
|
|
# CyBorgBackup
|
14
|
6
|
from cyborgbackup.main.utils.common import parse_yaml_or_json
|
15
|
|
|
16
|
|
|
17
|
6
|
def validate_pem(data, min_keys=0, max_keys=None, min_certs=0, max_certs=None):
|
18
|
|
"""
|
19
|
|
Validate the given PEM data is valid and contains the required numbers of
|
20
|
|
keys and certificates.
|
21
|
|
|
22
|
|
Return a list of PEM objects, where each object is a dict with the following
|
23
|
|
keys:
|
24
|
|
- 'all': The entire string for the PEM object including BEGIN/END lines.
|
25
|
|
- 'type': The type of PEM object ('PRIVATE KEY' or 'CERTIFICATE').
|
26
|
|
- 'data': The string inside the BEGIN/END lines.
|
27
|
|
- 'b64': Key/certificate as a base64-encoded string.
|
28
|
|
- 'bin': Key/certificate as bytes.
|
29
|
|
- 'key_type': Only when type == 'PRIVATE KEY', one of 'rsa', 'dsa',
|
30
|
|
'ecdsa', 'ed25519' or 'rsa1'.
|
31
|
|
- 'key_enc': Only when type == 'PRIVATE KEY', boolean indicating if key is
|
32
|
|
encrypted.
|
33
|
|
"""
|
34
|
|
|
35
|
|
# Map the X in BEGIN X PRIVATE KEY to the key type (ssh-keygen -t).
|
36
|
|
# Tower jobs using OPENSSH format private keys may still fail if the
|
37
|
|
# system SSH implementation lacks support for this format.
|
38
|
0
|
private_key_types = {
|
39
|
|
'RSA': 'rsa',
|
40
|
|
'DSA': 'dsa',
|
41
|
|
'EC': 'ecdsa',
|
42
|
|
'OPENSSH': 'ed25519',
|
43
|
|
'': 'rsa1',
|
44
|
|
}
|
45
|
|
|
46
|
|
# Build regular expressions for matching each object in the PEM file.
|
47
|
0
|
pem_obj_re = re.compile(
|
48
|
|
r'^(?P<dashes>-{4,}) *BEGIN (?P<type>[A-Z ]+?) *(?P=dashes)' +
|
49
|
|
r'\s*(?P<data>.+?)\s*' +
|
50
|
|
r'(?P=dashes) *END (?P=type) *(?P=dashes)' +
|
51
|
|
r'(?P<next>.*?)$', re.DOTALL
|
52
|
|
)
|
53
|
0
|
pem_obj_header_re = re.compile(r'^(.+?):\s*?(.+?)(\\??)$')
|
54
|
|
|
55
|
0
|
pem_objects = []
|
56
|
0
|
key_count, cert_count = 0, 0
|
57
|
|
|
58
|
|
# Strip leading whitespaces at the start of the PEM data
|
59
|
0
|
data = data.lstrip()
|
60
|
|
|
61
|
0
|
while data:
|
62
|
0
|
match = pem_obj_re.match(data)
|
63
|
0
|
if not match:
|
64
|
0
|
raise ValidationError(_('Invalid certificate or key: %s...') % data[:100])
|
65
|
|
|
66
|
|
# The rest of the PEM data to process
|
67
|
0
|
data = match.group('next').lstrip()
|
68
|
|
|
69
|
|
# Check PEM object type, check key type if private key.
|
70
|
0
|
pem_obj_info = {}
|
71
|
0
|
pem_obj_info['all'] = match.group(0)
|
72
|
0
|
pem_obj_info['type'] = pem_obj_type = match.group('type')
|
73
|
0
|
if pem_obj_type.endswith('PRIVATE KEY'):
|
74
|
0
|
key_count += 1
|
75
|
0
|
pem_obj_info['type'] = 'PRIVATE KEY'
|
76
|
0
|
key_type = pem_obj_type.replace('PRIVATE KEY', '').strip()
|
77
|
0
|
try:
|
78
|
0
|
pem_obj_info['key_type'] = private_key_types[key_type]
|
79
|
0
|
except KeyError:
|
80
|
0
|
raise ValidationError(_('Invalid private key: unsupported type "%s"') % key_type)
|
81
|
0
|
elif pem_obj_type == 'CERTIFICATE':
|
82
|
0
|
cert_count += 1
|
83
|
|
else:
|
84
|
0
|
raise ValidationError(_('Unsupported PEM object type: "%s"') % pem_obj_type)
|
85
|
|
|
86
|
|
# Ensure that this PEM object is valid base64 data.
|
87
|
0
|
pem_obj_info['data'] = match.group('data')
|
88
|
0
|
base64_data = ''
|
89
|
0
|
line_continues = False
|
90
|
0
|
for line in pem_obj_info['data'].splitlines():
|
91
|
0
|
line = line.strip()
|
92
|
0
|
if not line:
|
93
|
0
|
continue
|
94
|
0
|
if line_continues:
|
95
|
0
|
line_continues = line.endswith('\\')
|
96
|
0
|
continue
|
97
|
0
|
line_match = pem_obj_header_re.match(line)
|
98
|
0
|
if line_match:
|
99
|
0
|
line_continues = line.endswith('\\')
|
100
|
0
|
continue
|
101
|
0
|
base64_data += line
|
102
|
0
|
try:
|
103
|
0
|
decoded_data = base64.b64decode(base64_data)
|
104
|
0
|
if not decoded_data:
|
105
|
0
|
raise TypeError
|
106
|
0
|
pem_obj_info['b64'] = base64_data
|
107
|
0
|
pem_obj_info['bin'] = decoded_data
|
108
|
0
|
except TypeError:
|
109
|
0
|
raise ValidationError(_('Invalid base64-encoded data'))
|
110
|
|
|
111
|
|
# If private key, check whether it is encrypted.
|
112
|
0
|
if pem_obj_info.get('key_type', '') == 'ed25519':
|
113
|
|
# See https://github.com/openssh/openssh-portable/blob/master/sshkey.c#L3218
|
114
|
|
# Decoded key data starts with magic string (null-terminated), four byte
|
115
|
|
# length field, followed by the ciphername -- if ciphername is anything
|
116
|
|
# other than 'none' the key is encrypted.
|
117
|
0
|
magic = 'openssh-key-v1\x00\x00\x00\x00\x04none'
|
118
|
0
|
pem_obj_info['key_enc'] = not bool(pem_obj_info['bin'].startswith(magic))
|
119
|
0
|
elif pem_obj_info.get('key_type', ''):
|
120
|
0
|
pem_obj_info['key_enc'] = bool('ENCRYPTED' in pem_obj_info['data'])
|
121
|
|
|
122
|
0
|
pem_objects.append(pem_obj_info)
|
123
|
|
|
124
|
|
# Validate that the number of keys and certs provided are within the limits.
|
125
|
0
|
kdict = dict(min=min_keys, max=max_keys, cnt=key_count)
|
126
|
0
|
if key_count < min_keys:
|
127
|
0
|
if min_keys == 1:
|
128
|
0
|
if max_keys == min_keys:
|
129
|
0
|
raise ValidationError(_('Exactly one private key is required.'))
|
130
|
|
else:
|
131
|
0
|
raise ValidationError(_('At least one private key is required.'))
|
132
|
|
else:
|
133
|
0
|
raise ValidationError(_('At least %(min)d private keys are required, only %(cnt)d provided.') % kdict)
|
134
|
0
|
elif max_keys is not None and key_count > max_keys:
|
135
|
0
|
if max_keys == 1:
|
136
|
0
|
raise ValidationError(_('Only one private key is allowed, %(cnt)d provided.') % kdict)
|
137
|
|
else:
|
138
|
0
|
raise ValidationError(_('No more than %(max)d private keys are allowed, %(cnt)d provided.') % kdict)
|
139
|
0
|
cdict = dict(min=min_certs, max=max_certs, cnt=cert_count)
|
140
|
0
|
if cert_count < min_certs:
|
141
|
0
|
if min_certs == 1:
|
142
|
0
|
if max_certs == min_certs:
|
143
|
0
|
raise ValidationError(_('Exactly one certificate is required.'))
|
144
|
|
else:
|
145
|
0
|
raise ValidationError(_('At least one certificate is required.'))
|
146
|
|
else:
|
147
|
0
|
raise ValidationError(_('At least %(min)d certificates are required, only %(cnt)d provided.') % cdict)
|
148
|
0
|
elif max_certs is not None and cert_count > max_certs:
|
149
|
0
|
if max_certs == 1:
|
150
|
0
|
raise ValidationError(_('Only one certificate is allowed, %(cnt)d provided.') % cdict)
|
151
|
|
else:
|
152
|
0
|
raise ValidationError(_('No more than %(max)d certificates are allowed, %(cnt)d provided.') % cdict)
|
153
|
|
|
154
|
0
|
return pem_objects
|
155
|
|
|
156
|
|
|
157
|
6
|
def validate_private_key(data):
|
158
|
|
"""
|
159
|
|
Validate that data contains exactly one private key.
|
160
|
|
"""
|
161
|
0
|
return validate_pem(data, min_keys=1, max_keys=1, max_certs=0)
|
162
|
|
|
163
|
|
|
164
|
6
|
def validate_certificate(data):
|
165
|
|
"""
|
166
|
|
Validate that data contains one or more certificates. Adds BEGIN/END lines
|
167
|
|
if necessary.
|
168
|
|
"""
|
169
|
0
|
if 'BEGIN' not in data:
|
170
|
0
|
data = "-----BEGIN CERTIFICATE-----\n{}".format(data)
|
171
|
0
|
if 'END' not in data:
|
172
|
0
|
data = "{}\n-----END CERTIFICATE-----\n".format(data)
|
173
|
0
|
return validate_pem(data, max_keys=0, min_certs=1)
|
174
|
|
|
175
|
|
|
176
|
6
|
def validate_ssh_private_key(data):
|
177
|
|
"""
|
178
|
|
Validate that data contains at least one private key and optionally
|
179
|
|
certificates; should handle any valid options for ssh_private_key on a
|
180
|
|
credential.
|
181
|
|
"""
|
182
|
0
|
return validate_pem(data, min_keys=1)
|
183
|
|
|
184
|
|
|
185
|
6
|
def vars_validate_or_raise(vars_str):
|
186
|
|
"""
|
187
|
|
Validate that fields like extra_vars or variables on resources like
|
188
|
|
job templates, inventories, or hosts are either an acceptable
|
189
|
|
blank string, or are valid JSON or YAML dict
|
190
|
|
"""
|
191
|
0
|
try:
|
192
|
0
|
parse_yaml_or_json(vars_str, silent_failure=False)
|
193
|
0
|
return vars_str
|
194
|
0
|
except ParseError as e:
|
195
|
0
|
raise RestValidationError(str(e))
|