-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathgovernor.py
More file actions
executable file
·82 lines (68 loc) · 2.8 KB
/
governor.py
File metadata and controls
executable file
·82 lines (68 loc) · 2.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
#!/usr/bin/env python
import sys, os, yaml, time, urllib2, atexit
import logging
from helpers.keystore import Etcd
from helpers.postgresql import Postgresql
from helpers.ha import Ha
logging.basicConfig(format='%(asctime)s %(levelname)s: %(message)s', level=logging.INFO)
f = open(sys.argv[1], "r")
config = yaml.load(f.read())
f.close()
# HACK (bacongobbler): kubernetes provides us with etcd's host/port info
config['etcd']['host'] = "{}:{}".format(
os.getenv("ETCD_SERVICE_HOST", "127.0.0.1"),
os.getenv("ETCD_SERVICE_PORT", 4001))
etcd = Etcd(config['etcd'])
postgresql = Postgresql(config["postgresql"])
ha = Ha(postgresql, etcd)
logging.info("my name is {}".format(postgresql.name))
# stop governor on script exit
def stop_governor():
postgresql.stop()
atexit.register(stop_governor)
# wait for etcd to be available
etcd_ready = False
while not etcd_ready:
try:
etcd.touch_member(postgresql.name, postgresql.connection_string)
etcd_ready = True
except urllib2.URLError as e:
logging.info("waiting on etcd: {}".format(e))
time.sleep(5)
# is data directory empty?
if postgresql.data_directory_empty():
# racing to initialize
if etcd.race("/initialize", postgresql.name):
postgresql.initialize()
etcd.take_leader(postgresql.name)
postgresql.start()
else:
synced_from_leader = False
while not synced_from_leader:
leader = etcd.current_leader()
if not leader:
logging.info("leader is starting up. Checking again in 5 seconds")
time.sleep(5)
continue
if postgresql.sync_from_leader(leader):
postgresql.write_recovery_conf(leader)
postgresql.fix_data_dir_permissions()
postgresql.start()
synced_from_leader = True
else:
time.sleep(5)
else:
postgresql.follow_no_leader()
postgresql.start()
while True:
logging.info(ha.run_cycle())
# create replication slots
if postgresql.is_leader():
for member in etcd.members():
member = member['hostname']
if member != postgresql.name:
postgresql.query("DO LANGUAGE plpgsql $$DECLARE somevar VARCHAR; BEGIN SELECT slot_name INTO somevar FROM pg_replication_slots WHERE slot_name = '%(slot)s' LIMIT 1; IF NOT FOUND THEN PERFORM pg_create_physical_replication_slot('%(slot)s'); END IF; END$$;" % {"slot": member})
# HACK (bacongobbler): kubernetes provides us with the service's host ip addr
connection_string = postgresql.connection_string.replace(postgresql.host, os.getenv("DEIS_POSTGRES_SERVICE_HOST")).replace(postgresql.port, os.getenv("DEIS_POSTGRES_SERVICE_PORT"))
etcd.touch_member(postgresql.name, connection_string)
time.sleep(config["loop_wait"])