Skip to content

Commit 43919c2

Browse files
authored
Avoid logging empty line KPO (#38247)
* Avoid logging empty line KPO * cleanup * Apply review suggestions * Apply review feedback * Update airflow/providers/cncf/kubernetes/operators/pod.py
1 parent ad1e473 commit 43919c2

File tree

3 files changed

+21
-3
lines changed

3 files changed

+21
-3
lines changed

airflow/providers/cncf/kubernetes/operators/pod.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -791,7 +791,8 @@ def write_logs(self, pod: k8s.V1Pod, follow: bool = False, since_time: DateTime
791791
)
792792
for raw_line in logs:
793793
line = raw_line.decode("utf-8", errors="backslashreplace").rstrip("\n")
794-
self.log.info("Container logs: %s", line)
794+
if line:
795+
self.log.info("Container logs: %s", line)
795796
except HTTPError as e:
796797
self.log.warning(
797798
"Reading of logs interrupted with error %r; will retry. "

airflow/providers/cncf/kubernetes/utils/pod_manager.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -464,7 +464,8 @@ def consume_logs(*, since_time: DateTime | None = None) -> tuple[DateTime | None
464464
self._callbacks.progress_callback(
465465
line=line, client=self._client, mode=ExecutionMode.SYNC
466466
)
467-
self.log.info("[%s] %s", container_name, message_to_log)
467+
if message_to_log is not None:
468+
self.log.info("[%s] %s", container_name, message_to_log)
468469
last_captured_timestamp = message_timestamp
469470
message_to_log = message
470471
message_timestamp = line_timestamp
@@ -481,7 +482,8 @@ def consume_logs(*, since_time: DateTime | None = None) -> tuple[DateTime | None
481482
self._callbacks.progress_callback(
482483
line=line, client=self._client, mode=ExecutionMode.SYNC
483484
)
484-
self.log.info("[%s] %s", container_name, message_to_log)
485+
if message_to_log is not None:
486+
self.log.info("[%s] %s", container_name, message_to_log)
485487
last_captured_timestamp = message_timestamp
486488
except TimeoutError as e:
487489
# in case of timeout, increment return time by 2 seconds to avoid

tests/providers/cncf/kubernetes/utils/test_pod_manager.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,21 @@ def test_read_pod_logs_retries_successfully(self):
9393
]
9494
)
9595

96+
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.container_is_running")
97+
def test_fetch_container_logs_do_not_log_none(self, mock_container_is_running, caplog):
98+
MockWrapper.reset()
99+
caplog.set_level(logging.INFO)
100+
101+
def consumer_iter():
102+
"""This will simulate a container that hasn't produced any logs in the last read_timeout window"""
103+
yield from ()
104+
105+
with mock.patch.object(PodLogsConsumer, "__iter__") as mock_consumer_iter:
106+
mock_consumer_iter.side_effect = consumer_iter
107+
mock_container_is_running.side_effect = [True, True, False]
108+
self.pod_manager.fetch_container_logs(mock.MagicMock(), "container-name", follow=True)
109+
assert "[container-name] None" not in (record.message for record in caplog.records)
110+
96111
def test_read_pod_logs_retries_fails(self):
97112
mock.sentinel.metadata = mock.MagicMock()
98113
self.mock_kube_client.read_namespaced_pod_log.side_effect = [

0 commit comments

Comments
 (0)