-
Notifications
You must be signed in to change notification settings - Fork 6
Expand file tree
/
Copy pathcertificate.py
More file actions
236 lines (194 loc) · 9.41 KB
/
certificate.py
File metadata and controls
236 lines (194 loc) · 9.41 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
import logging
from OpenSSL import crypto
from pyasn1.codec.der import decoder as der_decoder
from pyasn1.type import univ, constraint
from ndg.httpsclient.ssl_peer_verification import SUBJ_ALT_NAME_SUPPORT
from ndg.httpsclient.subj_alt_name import SubjectAltName as BaseSubjectAltName
from datetime import datetime
from pytz import utc
from django.shortcuts import get_object_or_404
from django.db import models
from django.core.exceptions import SuspiciousOperation
from django.contrib.auth import get_user_model
from rest_framework.exceptions import ValidationError
from api.utils import validate_label
from api.exceptions import AlreadyExists, ServiceUnavailable
from scheduler import KubeException
from .base import AuditedModel
from .domain import Domain
User = get_user_model()
logger = logging.getLogger(__name__)
# Note: This is a slightly bug-fixed version of same from ndg-httpsclient.
class SubjectAltName(BaseSubjectAltName):
'''ASN.1 implementation for subjectAltNames support'''
# There is no limit to how many SAN certificates a certificate may have,
# however this needs to have some limit so we'll set an arbitrarily high
# limit.
sizeSpec = univ.SequenceOf.sizeSpec + \
constraint.ValueSizeConstraint(1, 1024)
# Note: This is a slightly bug-fixed version of same from ndg-httpsclient.
def get_subj_alt_name(peer_cert):
# Search through extensions
dns_name = []
if not SUBJ_ALT_NAME_SUPPORT:
return dns_name
general_names = SubjectAltName()
for i in range(peer_cert.get_extension_count()):
ext = peer_cert.get_extension(i)
ext_name = ext.get_short_name()
if ext_name != b'subjectAltName':
continue
# PyOpenSSL returns extension data in ASN.1 encoded form
ext_dat = ext.get_data()
decoded_dat = der_decoder.decode(ext_dat,
asn1Spec=general_names)
for name in decoded_dat:
if not isinstance(name, SubjectAltName):
continue
for entry in range(len(name)):
component = name.getComponentByPosition(entry)
if component.getName() != 'dNSName':
continue
dns_name.append(str(component.getComponent()))
return dns_name
def validate_certificate(value):
try:
return crypto.load_certificate(crypto.FILETYPE_PEM, value)
except crypto.Error as e:
raise ValidationError('Could not load certificate: {}'.format(e))
def validate_private_key(value):
try:
return crypto.load_privatekey(crypto.FILETYPE_PEM, value)
except crypto.Error as e:
raise ValidationError('Could not load private key: {}'.format(e))
def validate_cert_pair(certificate, private_key):
# Load and validate the certificate and private key
try:
cert = validate_certificate(certificate)
pkey = validate_private_key(private_key)
except ValidationError as e:
# The certificate and key should already have been validated
raise SuspiciousOperation(e)
if pkey.type() == crypto.TYPE_RSA:
# Compare modulus n, to the factors p and q
priv_numbers = pkey.to_cryptography_key().private_numbers()
pub_modulus = cert.get_pubkey().to_cryptography_key().public_numbers().n
if pub_modulus != (priv_numbers.p * priv_numbers.q):
raise ValidationError('Certificate and private key do not match!')
# Return tuple if everything went ok
return (cert, pkey)
class Certificate(AuditedModel):
"""
Public and private key pair used to secure application traffic at the router.
"""
owner = models.ForeignKey(User, on_delete=models.PROTECT)
name = models.CharField(max_length=253, unique=True, validators=[validate_label])
# there is no upper limit on the size of an x.509 certificate
certificate = models.TextField(validators=[validate_certificate])
key = models.TextField(validators=[validate_private_key])
# X.509 certificates allow any string of information as the common name.
common_name = models.TextField(editable=False, unique=False, null=True)
# A list of DNS records if certificate has SubjectAltName
san = models.JSONField(default=None, null=True)
# SHA256 fingerprint
fingerprint = models.CharField(max_length=96, editable=False)
# Expires and Start time of cert
expires = models.DateTimeField(editable=False)
starts = models.DateTimeField(editable=False)
issuer = models.TextField(editable=False)
subject = models.TextField(editable=False)
class Meta:
ordering = ['name', 'common_name', 'expires']
@property
def domains(self):
domains = []
for data in Domain.objects.filter(certificate=self).distinct().order_by('domain'):
domains.append(data.domain)
return domains
def __str__(self):
return self.name
def save(self, *args, **kwargs):
# Validate the provided certificate and key pair and test for a mismatch
certificate, _ = validate_cert_pair(self.certificate, self.key)
if not self.common_name:
self.common_name = certificate.get_subject().CN
# Grab expire date of the certificate
if not self.expires:
# https://pyopenssl.readthedocs.org/en/latest/api/crypto.html#OpenSSL.crypto.X509.get_notAfter
# Convert bytes to string
timestamp = certificate.get_notAfter().decode(encoding='UTF-8')
# convert openssl's expiry date format to Django's DateTimeField format
self.expires = datetime.strptime(timestamp, '%Y%m%d%H%M%SZ').replace(tzinfo=utc)
# Grab the start date of the certificate
if not self.starts:
# https://pyopenssl.readthedocs.org/en/latest/api/crypto.html#OpenSSL.crypto.X509.get_notBefore
# Convert bytes to string
timestamp = certificate.get_notBefore().decode(encoding='UTF-8')
# convert openssl's starts date format to Django's DateTimeField format
self.starts = datetime.strptime(timestamp, '%Y%m%d%H%M%SZ').replace(tzinfo=utc)
# process issuers - separate each key/value with a slash
issuer = certificate.get_issuer().get_components()
self.issuer = '/' + '/'.join('%s=%s' % (key.decode(encoding='UTF-8'), value.decode(encoding='UTF-8')) for key, value in issuer) # noqa
# process subject - separate each key/value with a slash
subject = certificate.get_subject().get_components()
self.subject = '/' + '/'.join('%s=%s' % (key.decode(encoding='UTF-8'), value.decode(encoding='UTF-8')) for key, value in subject) # noqa
# public fingerprint of certificate
self.fingerprint = certificate.digest('sha256').decode()
# SubjectAltName from the certificate - return a list
self.san = get_subj_alt_name(certificate)
return super(Certificate, self).save(*args, **kwargs)
def delete(self, *args, **kwargs):
# Remove from k8s and domain object if there are any
if self.domains:
for domain in self.domains:
kwargs['domain'] = domain
self.detach(*args, **kwargs)
del kwargs['domain']
# Delete from DB
return super(Certificate, self).delete(*args, **kwargs)
def attach(self, *args, **kwargs):
# add the certificate to the domain
domain = get_object_or_404(Domain, domain=kwargs['domain'])
if domain.certificate is not None:
raise AlreadyExists("Domain already has a certificate attached to it")
# create in kubernetes
self.attach_in_kubernetes(domain)
domain.certificate = self
domain.save()
def attach_in_kubernetes(self, domain):
"""Creates the certificate as a kubernetes secret"""
# only create if it exists - We raise an exception when a secret doesn't exist
try:
name = '%s-certificate' % self.name
namespace = domain.app.id
data = {
'tls.crt': self.certificate,
'tls.key': self.key
}
secret = self._scheduler.secret.get(namespace, name).json()['data']
except KubeException:
self._scheduler.secret.create(namespace, name, data)
else:
# update cert secret to the TLS Ingress format if required
if secret != data:
try:
self._scheduler.secret.update(namespace, name, data)
except KubeException as e:
msg = 'There was a problem updating the certificate secret ' \
'{} for {}'.format(name, namespace)
raise ServiceUnavailable(msg) from e
def detach(self, *args, **kwargs):
# remove the certificate from the domain
domain = get_object_or_404(Domain, domain=kwargs['domain'])
domain.certificate = None
domain.save()
name = '%s-certificate' % self.name
namespace = domain.app.id
# only delete if it exists and if no other domains depend on secret
if len(self.domains) == 0:
try:
# We raise an exception when a secret doesn't exist
self._scheduler.secret.get(namespace, name)
self._scheduler.secret.delete(namespace, name)
except KubeException as e:
raise ServiceUnavailable("Could not delete certificate secret {} for application {}".format(name, namespace)) from e # noqa