-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathcertificate.py
More file actions
227 lines (182 loc) · 8.57 KB
/
certificate.py
File metadata and controls
227 lines (182 loc) · 8.57 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
from OpenSSL import crypto
from pyasn1.codec.der import decoder as der_decoder
from pyasn1.type import univ, constraint
from ndg.httpsclient.ssl_peer_verification import SUBJ_ALT_NAME_SUPPORT
from ndg.httpsclient.subj_alt_name import SubjectAltName as BaseSubjectAltName
from datetime import datetime
from django.shortcuts import get_object_or_404
from django.db import models
from django.conf import settings
from django.core.exceptions import ValidationError, SuspiciousOperation
from django.contrib.postgres.fields import ArrayField
from api.models import AuditedModel, validate_label, AlreadyExists
from api.models.domain import Domain
import logging
logger = logging.getLogger(__name__)
# Note: This is a slightly bug-fixed version of same from ndg-httpsclient.
class SubjectAltName(BaseSubjectAltName):
'''ASN.1 implementation for subjectAltNames support'''
# There is no limit to how many SAN certificates a certificate may have,
# however this needs to have some limit so we'll set an arbitrarily high
# limit.
sizeSpec = univ.SequenceOf.sizeSpec + \
constraint.ValueSizeConstraint(1, 1024)
# Note: This is a slightly bug-fixed version of same from ndg-httpsclient.
def get_subj_alt_name(peer_cert):
# Search through extensions
dns_name = []
if not SUBJ_ALT_NAME_SUPPORT:
return dns_name
general_names = SubjectAltName()
for i in range(peer_cert.get_extension_count()):
ext = peer_cert.get_extension(i)
ext_name = ext.get_short_name()
if ext_name != b'subjectAltName':
continue
# PyOpenSSL returns extension data in ASN.1 encoded form
ext_dat = ext.get_data()
decoded_dat = der_decoder.decode(ext_dat,
asn1Spec=general_names)
for name in decoded_dat:
if not isinstance(name, SubjectAltName):
continue
for entry in range(len(name)):
component = name.getComponentByPosition(entry)
if component.getName() != 'dNSName':
continue
dns_name.append(str(component.getComponent()))
return dns_name
def validate_certificate(value):
try:
crypto.load_certificate(crypto.FILETYPE_PEM, value)
except crypto.Error as e:
raise ValidationError('Could not load certificate: {}'.format(e))
class Certificate(AuditedModel):
"""
Public and private key pair used to secure application traffic at the router.
"""
owner = models.ForeignKey(settings.AUTH_USER_MODEL)
name = models.CharField(max_length=253, unique=True, validators=[validate_label])
# there is no upper limit on the size of an x.509 certificate
certificate = models.TextField(validators=[validate_certificate])
key = models.TextField()
# X.509 certificates allow any string of information as the common name.
common_name = models.TextField(editable=False, unique=False)
# A list of DNS records if certificate has SubjectAltName
san = ArrayField(models.CharField(max_length=253), null=True)
# SHA256 fingerprint
fingerprint = models.CharField(max_length=96, editable=False)
# Expires and Start time of cert
expires = models.DateTimeField(editable=False)
starts = models.DateTimeField(editable=False)
issuer = models.TextField(editable=False)
subject = models.TextField(editable=False)
@property
def domains(self):
domains = []
for data in Domain.objects.filter(certificate=self).distinct().order_by('domain'):
domains.append(data.domain)
return domains
def __str__(self):
return self.name
def _get_certificate(self):
try:
return crypto.load_certificate(crypto.FILETYPE_PEM, self.certificate)
except crypto.Error as e:
raise SuspiciousOperation(e)
def save(self, *args, **kwargs):
certificate = self._get_certificate()
if not self.common_name:
self.common_name = certificate.get_subject().CN
# Grab expire date of the certificate
if not self.expires:
# https://pyopenssl.readthedocs.org/en/latest/api/crypto.html#OpenSSL.crypto.X509.get_notAfter
# Convert bytes to string
timestamp = certificate.get_notAfter().decode(encoding='UTF-8')
# convert openssl's expiry date format to Django's DateTimeField format
self.expires = datetime.strptime(timestamp, '%Y%m%d%H%M%SZ')
# Grab the start date of the certificate
if not self.starts:
# https://pyopenssl.readthedocs.org/en/latest/api/crypto.html#OpenSSL.crypto.X509.get_notBefore
# Convert bytes to string
timestamp = certificate.get_notBefore().decode(encoding='UTF-8')
# convert openssl's starts date format to Django's DateTimeField format
self.starts = datetime.strptime(timestamp, '%Y%m%d%H%M%SZ')
# process issuers - separate each key/value with a slash
issuer = certificate.get_issuer().get_components()
self.issuer = '/' + '/'.join('%s=%s' % (key.decode(encoding='UTF-8'), value.decode(encoding='UTF-8')) for key, value in issuer) # noqa
# process subject - separate each key/value with a slash
subject = certificate.get_subject().get_components()
self.subject = '/' + '/'.join('%s=%s' % (key.decode(encoding='UTF-8'), value.decode(encoding='UTF-8')) for key, value in subject) # noqa
# public fingerprint of certificate
self.fingerprint = certificate.digest('sha256')
# SubjectAltName from the certificate - return a list
self.san = get_subj_alt_name(certificate)
return super(Certificate, self).save(*args, **kwargs)
def delete(self, *args, **kwargs):
# Remove from k8s and domain object if there are any
if self.domains:
for domain in self.domains:
kwargs['domain'] = domain
logger.critical(kwargs)
self.detach(*args, **kwargs)
del kwargs['domain']
# Delete from DB
return super(Certificate, self).delete(*args, **kwargs)
def attach(self, *args, **kwargs):
data = {
'cert': self.certificate,
'key': self.key
}
# add the certificate to the domain
domain = get_object_or_404(Domain, domain=kwargs['domain'])
if domain.certificate is not None:
raise AlreadyExists("Domain already has a certificate attached to it")
domain.certificate = self
domain.save()
name = '%s-cert' % self.name
app = domain.app
# only create if it exists
try:
# We raise an exception when a secret doesn't exist
self._scheduler._get_secret(app, name)
except RuntimeError:
self._scheduler._create_secret(app, name, data)
# get config for the service
config = self._load_service_config(app, 'router')
# See if certificates are available
if 'certificates' not in config:
config['certificates'] = ''
# convert from string to list to work with and filter out empty strings
cert = '{}:{}'.format(domain.domain, self.name)
certificates = [_f for _f in config['certificates'].split(',') if _f]
if cert not in certificates:
certificates.append(cert)
config['certificates'] = ','.join(certificates)
self._save_service_config(app, 'router', config)
def detach(self, *args, **kwargs):
# remove the certificate from the domain
domain = get_object_or_404(Domain, domain=kwargs['domain'])
domain.certificate = None
domain.save()
name = '%s-cert' % self.name
app = domain.app
# only delete if it exists
try:
# We raise an exception when a secret doesn't exist
self._scheduler._get_secret(app, name)
self._scheduler._delete_secret(app, name)
except RuntimeError as e:
logger.critical(e)
# get config for the service
config = self._load_service_config(app, 'router')
# See if certificates are available
if 'certificates' not in config:
config['certificates'] = ''
# convert from string to list to work with and filter out empty strings
cert = '{}:{}'.format(domain.domain, self.name)
certificates = [_f for _f in config['certificates'].split(',') if _f]
if cert in certificates:
certificates.remove(cert)
config['certificates'] = ','.join(certificates)
self._save_service_config(app, 'router', config)