Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -380,11 +380,12 @@ def sync(self) -> None:
body = {"message": e.body}

retries = self.task_publish_retries[key]
# In case of exceeded quota or conflict errors, requeue the task as per the task_publish_max_retries
# In case of exceeded quota, conflict errors, or rate limiting, requeue the task as per the task_publish_max_retries
message = body.get("message", "")
if (
(str(e.status) == "403" and "exceeded quota" in message)
or (str(e.status) == "409" and "object has been modified" in message)
or str(e.status) == "429" # Add support for rate limiting errors
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HTTP 429 also means that you should throttle calls as backend is overloaded. Can you also consider adding handling following the "Retry-After" response header?

) and (self.task_publish_max_retries == -1 or retries < self.task_publish_max_retries):
self.log.warning(
"[Try %s of %s] Kube ApiException for Task: (%s). Reason: %r. Message: %s",
Expand Down Expand Up @@ -682,6 +683,17 @@ def adopt_launched_task(
)
except ApiException as e:
self.log.info("Failed to adopt pod %s. Reason: %s", pod.metadata.name, e)

# Log detailed information for rate limiting errors (429) which can cause task history loss
if str(e.status) == "429":
Comment on lines +687 to +688
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is good to add explicit handling for 429 here. This helps avoid losing history when K8s throttles. Can we think about whether we want to apply the same backoff/retry cadence as 403/409 cases for consistency? Could also consider wrapping these transient checks into a small helper later, so error handling doesn’t get too scattered?

self.log.warning(
"Kubernetes API rate limiting (429) prevented adoption of pod %s for task %s. "
"This may cause task history loss if the task was previously running. "
"Consider implementing rate limiting backoff or increasing API quota.",
pod.metadata.name,
ti_key,
)
Comment on lines +689 to +695
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This log message is helpful & it gives great visibility when the scheduler hits throttling. Can we also surface a shorter summary in the UI or metrics so users notice API quota pressure sooner?


return

del tis_to_flush_by_key[ti_key]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -396,9 +396,9 @@ def setup_method(self) -> None:
pytest.param(
HTTPResponse(body="Too many requests, please try again later.", status=429),
1,
False,
State.FAILED,
id="429 Too Many Requests (non-JSON body) (task_publish_max_retries=1)",
True,
State.SUCCESS,
id="429 Too Many Requests (non-JSON body) (task_publish_max_retries=1) (retry succeeded)",
),
pytest.param(
HTTPResponse(body="", status=429),
Expand All @@ -407,6 +407,13 @@ def setup_method(self) -> None:
State.FAILED,
id="429 Too Many Requests (empty body)",
),
pytest.param(
HTTPResponse(body="", status=429),
1,
True,
State.SUCCESS,
id="429 Too Many Requests (empty body) (task_publish_max_retries=1) (retry succeeded)",
),
],
)
@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.KubernetesJobWatcher")
Expand Down