Skip to content

Commit 3e72765

Browse files
committed
chore(controller): add celery retry mechanism
1 parent aa74418 commit 3e72765

1 file changed

Lines changed: 41 additions & 24 deletions

File tree

rootfs/api/tasks.py

Lines changed: 41 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -13,89 +13,106 @@
1313
logger = logging.getLogger(__name__)
1414

1515

16-
@shared_task
17-
def retrieve_resource(resource):
16+
@shared_task(bind=True)
17+
def retrieve_resource(self, resource):
1818
task_id = uuid.uuid4().hex
1919
signals.request_started.send(sender=task_id)
2020
try:
2121
if not resource.retrieve():
2222
t = time.time() - resource.created.timestamp()
2323
if t < 3600:
24-
retrieve_resource.apply_async(
25-
args=(resource, ),
26-
eta=now() + timedelta(seconds=30))
24+
raise self.retry(exc=None, countdown=30)
2725
elif t < 3600 * 12:
28-
retrieve_resource.apply_async(
29-
args=(resource, ),
30-
eta=now() + timedelta(seconds=1800))
26+
raise self.retry(exc=None, countdown=1800)
3127
else:
3228
resource.detach_resource()
3329
except Resource.DoesNotExist:
34-
logger.exception("retrieve task not found resource: {}".format(resource.id)) # noqa
30+
logger.exception(
31+
"retrieve task not found resource: {}".format(resource.id))
3532
finally:
3633
signals.request_finished.send(sender=task_id)
3734

3835

39-
@shared_task
36+
@shared_task(
37+
autoretry_for=(Exception, ),
38+
retry_backoff=8,
39+
retry_jitter=True,
40+
retry_backoff_max=3600,
41+
retry_kwargs={'max_retries': None}
42+
)
4043
def measure_config(config: List[Dict[str, str]]):
4144
task_id = uuid.uuid4().hex
4245
signals.request_started.send(sender=task_id)
4346
try:
4447
measurement = manager.Measurement()
4548
measurement.post_config(config)
46-
except Exception as e:
47-
logger.exception("write influxdb point fail: {}".format(e))
4849
finally:
4950
signals.request_finished.send(sender=task_id)
5051

5152

52-
@shared_task
53+
@shared_task(
54+
autoretry_for=(Exception, ),
55+
retry_backoff=8,
56+
retry_jitter=True,
57+
retry_backoff_max=3600,
58+
retry_kwargs={'max_retries': None}
59+
)
5360
def measure_volumes(volumes: List[Dict[str, str]]):
5461
task_id = uuid.uuid4().hex
5562
signals.request_started.send(sender=task_id)
5663
try:
5764
measurement = manager.Measurement()
5865
measurement.post_volumes(volumes)
59-
except Exception as e:
60-
logger.exception("write influxdb point fail: {}".format(e))
6166
finally:
6267
signals.request_finished.send(sender=task_id)
6368

6469

65-
@shared_task
70+
@shared_task(
71+
autoretry_for=(Exception, ),
72+
retry_backoff=8,
73+
retry_jitter=True,
74+
retry_backoff_max=3600,
75+
retry_kwargs={'max_retries': None}
76+
)
6677
def measure_networks(networks: List[Dict[str, str]]):
6778
task_id = uuid.uuid4().hex
6879
signals.request_started.send(sender=task_id)
6980
try:
7081
measurement = manager.Measurement()
7182
measurement.post_networks(networks)
72-
except Exception as e:
73-
logger.exception("write influxdb point fail: {}".format(e))
7483
finally:
7584
signals.request_finished.send(sender=task_id)
7685

7786

78-
@shared_task
87+
@shared_task(
88+
autoretry_for=(Exception, ),
89+
retry_backoff=8,
90+
retry_jitter=True,
91+
retry_backoff_max=3600,
92+
retry_kwargs={'max_retries': None}
93+
)
7994
def measure_instances(instances: List[Dict[str, str]]):
8095
task_id = uuid.uuid4().hex
8196
signals.request_started.send(sender=task_id)
8297
try:
8398
measurement = manager.Measurement()
8499
measurement.post_instances(instances)
85-
except Exception as e:
86-
logger.exception("write influxdb point fail: {}".format(e))
87100
finally:
88101
signals.request_finished.send(sender=task_id)
89102

90103

91-
@shared_task
104+
@shared_task(
105+
autoretry_for=(Exception, ),
106+
retry_backoff=8,
107+
retry_jitter=True,
108+
retry_backoff_max=3600,
109+
retry_kwargs={'max_retries': None}
110+
)
92111
def measure_resources(resources: List[Dict[str, str]]):
93112
task_id = uuid.uuid4().hex
94113
signals.request_started.send(sender=task_id)
95114
try:
96115
measurement = manager.Measurement()
97116
measurement.post_resources(resources)
98-
except Exception as e:
99-
logger.exception("write influxdb point fail: {}".format(e))
100117
finally:
101118
signals.request_finished.send(sender=task_id)

0 commit comments

Comments
 (0)