-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathkeystore.py
More file actions
122 lines (96 loc) · 3.84 KB
/
keystore.py
File metadata and controls
122 lines (96 loc) · 3.84 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
import json, os, time
import logging
import etcd
import helpers.errors
logger = logging.getLogger(__name__)
class Etcd:
def __init__(self, config):
self.scope = config["scope"]
self.host, self.port = config["host"].split(":")
self.client = etcd.Client(host=self.host, port=int(self.port))
self.ttl = config["ttl"]
def get(self, path, max_attempts=1):
attempts = 0
response = None
while True:
try:
logger.debug("GET: /service/%s%s", self.scope, path)
response = self.client.read("/service/%s%s" % (self.scope, path))
break
except (etcd.EtcdKeyNotFound) as e:
attempts += 1
if attempts < max_attempts:
logger.info("Failed to return %s, trying again. (%s of %s)" % (path, attempts, max_attempts))
time.sleep(3)
else:
raise e
return (response.value or response)
def set(self, path, value, **kwargs):
logger.debug("SET: /service/%s%s > %s", self.scope, path, value)
self.client.write("/service/%s%s" % (self.scope, path),
value, **kwargs)
def delete(self, path, **kwargs):
logger.debug("DELETE: /service/%s%s > %s", self.scope, path)
self.client.delete("/service/%s%s" % (self.scope, path), **kwargs)
def current_leader(self):
try:
hostname = self.get("/leader")
address = self.get("/members/%s" % hostname)
return {"hostname": hostname, "address": address}
except etcd.EtcdKeyNotFound:
return None
except Exception:
raise helpers.errors.CurrentLeaderError("Etcd is not responding properly")
def members(self):
try:
members = []
members_dir = self.get("/members")
if members_dir:
for member in members_dir.children:
members.append({"hostname": member.key.split('/')[-1], "address": member.value})
return members
except etcd.EtcdKeyNotFound:
return None
except Exception:
raise helpers.errors.CurrentLeaderError("Etcd is not responding properly")
def touch_member(self, member, connection_string):
self.set("/members/%s" % member, connection_string, ttl=self.ttl)
def take_leader(self, value):
self.set("/leader", value, ttl=self.ttl)
def attempt_to_acquire_leader(self, value):
try:
self.set("/leader", value, ttl=self.ttl, prevExist=False)
return True
except etcd.EtcdAlreadyExist:
logger.info("Could not aquire leader: already exists")
return False
def update_leader(self, state_handler):
try:
self.set("/leader", state_handler.name, ttl=self.ttl, prevValue=state_handler.name)
self.set("/optime/leader", state_handler.last_operation())
except ValueError:
logger.error("Error updating leader lock and optime on ETCD for primary.")
return False
def last_leader_operation(self):
try:
return int(self.get("/optime/leader"))
except etcd.EtcdKeyNotFound:
logger.error("Error reading TTL on ETCD for primary.")
return None
def leader_unlocked(self):
try:
self.get("/leader")
return False
except etcd.EtcdKeyNotFound:
return True
return False
def am_i_leader(self, value):
leader = self.get("/leader")
logger.info("Lock owner: %s; I am %s", leader, value)
return leader == value
def race(self, path, value):
try:
self.set(path, value, prevExist=False)
return True
except etcd.EtcdAlreadyExist:
return False