Skip to content

Commit 1b18a50

Browse files
dstandishuranusjrjedcunninghambbovenzi
authored
Enable individual trigger logging (#27758)
FileTaskHandler is updated so that it can be used by triggerer job to write messages from distinct triggers such that they are visible in web UI task logs. Messages from all relevant sources are merged and interleaved in the same task log pane. Due to variation in FileTaskHandler implementations, we needed some extra params set so triggerer job can use them appropriately. For some we need to use QueueListener pattern, for some we need to put many instances (one per trigger) behind a wrapper class. For this purpose we introduce some config params you can set on handler instance or class. Context vars are used so that handler can map messages to the correct TI. And these changes to log handling in triggerer job are entirely disableable. Also add log serving capability to triggerer service for live logs with non-streaming handlers. Added a TaskReturnCode for signaling so that log messages can be more accurate when deferring. And when resuming we now have better messaging i.e. "resuming from deferral". New method in FileTaskHandler which is for reading remote logs. It's called along with other sources in _read and interleaved there. Meta messages and actual log content are now separated so that messages are displayed at top after interleaving. For interleaving we need to parse log timestamp so we make this pluggable in case user formatter is weird. TI.next_kwargs is used to infer "has deferred" for "resuming from deferral messaging". TabWithTooltip class is added in web UI js. It can be used for adding tooltips for tabs. We didn't end up needing it, but Brent went through the trouble to write it and it may come in handy. In the chart, we need triggerer to be a service (instead of deployment) in order for logs serving to work out of the box. This is only enabled when airflowVersion in your chart values is >= 2.6.0. --------- Co-authored-by: Tzu-ping Chung <[email protected]> Co-authored-by: Jed Cunningham <[email protected]> Co-authored-by: Brent Bovenzi <[email protected]>
1 parent 8338926 commit 1b18a50

File tree

54 files changed

+2390
-488
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

54 files changed

+2390
-488
lines changed

airflow/api_connexion/endpoints/log_endpoint.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
from flask import Response, request
2222
from itsdangerous.exc import BadSignature
2323
from itsdangerous.url_safe import URLSafeSerializer
24+
from sqlalchemy.orm import joinedload
2425
from sqlalchemy.orm.session import Session
2526

2627
from airflow.api_connexion import security
@@ -73,9 +74,10 @@ def get_log(
7374
metadata["download_logs"] = False
7475

7576
task_log_reader = TaskLogReader()
77+
7678
if not task_log_reader.supports_read:
7779
raise BadRequest("Task log handler does not support read logs.")
78-
ti = (
80+
query = (
7981
session.query(TaskInstance)
8082
.filter(
8183
TaskInstance.task_id == task_id,
@@ -84,8 +86,10 @@ def get_log(
8486
TaskInstance.map_index == map_index,
8587
)
8688
.join(TaskInstance.dag_run)
87-
.one_or_none()
89+
.options(joinedload("trigger"))
90+
.options(joinedload("trigger.triggerer_job"))
8891
)
92+
ti = query.one_or_none()
8993
if ti is None:
9094
metadata["end_of_log"] = True
9195
raise NotFound(title="TaskInstance not found")

airflow/cli/cli_parser.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2108,6 +2108,7 @@ class GroupCommand(NamedTuple):
21082108
ARG_LOG_FILE,
21092109
ARG_CAPACITY,
21102110
ARG_VERBOSE,
2111+
ARG_SKIP_SERVE_LOGS,
21112112
),
21122113
),
21132114
ActionCommand(

airflow/cli/commands/task_command.py

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
from airflow.models.dag import DAG
4545
from airflow.models.dagrun import DagRun
4646
from airflow.models.operator import needs_expansion
47+
from airflow.models.taskinstance import TaskReturnCode
4748
from airflow.settings import IS_K8S_EXECUTOR_POD
4849
from airflow.ti_deps.dep_context import DepContext
4950
from airflow.ti_deps.dependencies_deps import SCHEDULER_QUEUED_DEPS
@@ -58,6 +59,7 @@
5859
suppress_logs_and_warning,
5960
)
6061
from airflow.utils.dates import timezone
62+
from airflow.utils.log.file_task_handler import _set_task_deferred_context_var
6163
from airflow.utils.log.logging_mixin import StreamLogWriter
6264
from airflow.utils.log.secrets_masker import RedactedIO
6365
from airflow.utils.net import get_hostname
@@ -182,7 +184,7 @@ def _get_ti(
182184
return ti, dr_created
183185

184186

185-
def _run_task_by_selected_method(args, dag: DAG, ti: TaskInstance) -> None:
187+
def _run_task_by_selected_method(args, dag: DAG, ti: TaskInstance) -> None | TaskReturnCode:
186188
"""
187189
Runs the task based on a mode.
188190
@@ -193,11 +195,11 @@ def _run_task_by_selected_method(args, dag: DAG, ti: TaskInstance) -> None:
193195
- by executor
194196
"""
195197
if args.local:
196-
_run_task_by_local_task_job(args, ti)
198+
return _run_task_by_local_task_job(args, ti)
197199
elif args.raw:
198-
_run_raw_task(args, ti)
200+
return _run_raw_task(args, ti)
199201
else:
200-
_run_task_by_executor(args, dag, ti)
202+
return _run_task_by_executor(args, dag, ti)
201203

202204

203205
def _run_task_by_executor(args, dag, ti):
@@ -239,7 +241,7 @@ def _run_task_by_executor(args, dag, ti):
239241
executor.end()
240242

241243

242-
def _run_task_by_local_task_job(args, ti):
244+
def _run_task_by_local_task_job(args, ti) -> TaskReturnCode | None:
243245
"""Run LocalTaskJob, which monitors the raw task execution process."""
244246
run_job = LocalTaskJob(
245247
task_instance=ti,
@@ -254,11 +256,14 @@ def _run_task_by_local_task_job(args, ti):
254256
external_executor_id=_extract_external_executor_id(args),
255257
)
256258
try:
257-
run_job.run()
259+
ret = run_job.run()
258260

259261
finally:
260262
if args.shut_down_logging:
261263
logging.shutdown()
264+
with suppress(ValueError):
265+
return TaskReturnCode(ret)
266+
return None
262267

263268

264269
RAW_TASK_UNSUPPORTED_OPTION = [
@@ -269,9 +274,9 @@ def _run_task_by_local_task_job(args, ti):
269274
]
270275

271276

272-
def _run_raw_task(args, ti: TaskInstance) -> None:
277+
def _run_raw_task(args, ti: TaskInstance) -> None | TaskReturnCode:
273278
"""Runs the main task handling code."""
274-
ti._run_raw_task(
279+
return ti._run_raw_task(
275280
mark_success=args.mark_success,
276281
job_id=args.job_id,
277282
pool=args.pool,
@@ -407,18 +412,21 @@ def task_run(args, dag=None):
407412
# this should be last thing before running, to reduce likelihood of an open session
408413
# which can cause trouble if running process in a fork.
409414
settings.reconfigure_orm(disable_connection_pool=True)
410-
415+
task_return_code = None
411416
try:
412417
if args.interactive:
413-
_run_task_by_selected_method(args, dag, ti)
418+
task_return_code = _run_task_by_selected_method(args, dag, ti)
414419
else:
415420
with _move_task_handlers_to_root(ti), _redirect_stdout_to_ti_log(ti):
416-
_run_task_by_selected_method(args, dag, ti)
421+
task_return_code = _run_task_by_selected_method(args, dag, ti)
422+
if task_return_code == TaskReturnCode.DEFERRED:
423+
_set_task_deferred_context_var()
417424
finally:
418425
try:
419426
get_listener_manager().hook.before_stopping(component=TaskCommandMarker())
420427
except Exception:
421428
pass
429+
return task_return_code
422430

423431

424432
@cli_utils.action_cli(check_db=False)

airflow/cli/commands/triggerer_command.py

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,35 @@
1818
from __future__ import annotations
1919

2020
import signal
21+
from contextlib import contextmanager
22+
from functools import partial
23+
from multiprocessing import Process
24+
from typing import Generator
2125

2226
import daemon
2327
from daemon.pidfile import TimeoutPIDLockFile
2428

2529
from airflow import settings
30+
from airflow.configuration import conf
2631
from airflow.jobs.triggerer_job import TriggererJob
2732
from airflow.utils import cli as cli_utils
2833
from airflow.utils.cli import setup_locations, setup_logging, sigint_handler, sigquit_handler
34+
from airflow.utils.serve_logs import serve_logs
35+
36+
37+
@contextmanager
38+
def _serve_logs(skip_serve_logs: bool = False) -> Generator[None, None, None]:
39+
"""Starts serve_logs sub-process"""
40+
sub_proc = None
41+
if skip_serve_logs is False:
42+
port = conf.getint("logging", "trigger_log_server_port", fallback=8794)
43+
sub_proc = Process(target=partial(serve_logs, port=port))
44+
sub_proc.start()
45+
try:
46+
yield
47+
finally:
48+
if sub_proc:
49+
sub_proc.terminate()
2950

3051

3152
@cli_utils.action_cli
@@ -44,18 +65,18 @@ def triggerer(args):
4465
stdout_handle.truncate(0)
4566
stderr_handle.truncate(0)
4667

47-
ctx = daemon.DaemonContext(
68+
daemon_context = daemon.DaemonContext(
4869
pidfile=TimeoutPIDLockFile(pid, -1),
4970
files_preserve=[handle],
5071
stdout=stdout_handle,
5172
stderr=stderr_handle,
5273
umask=int(settings.DAEMON_UMASK, 8),
5374
)
54-
with ctx:
75+
with daemon_context, _serve_logs(args.skip_serve_logs):
5576
job.run()
56-
5777
else:
5878
signal.signal(signal.SIGINT, sigint_handler)
5979
signal.signal(signal.SIGTERM, sigint_handler)
6080
signal.signal(signal.SIGQUIT, sigquit_handler)
61-
job.run()
81+
with _serve_logs(args.skip_serve_logs):
82+
job.run()

airflow/config_templates/config.yml

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -788,6 +788,24 @@ logging:
788788
type: string
789789
example: ~
790790
default: "8793"
791+
trigger_log_server_port:
792+
description: |
793+
Port to serve logs from for triggerer. See worker_log_server_port description
794+
for more info.
795+
version_added: 2.6.0
796+
type: string
797+
example: ~
798+
default: "8794"
799+
interleave_timestamp_parser:
800+
description: |
801+
We must parse timestamps to interleave logs between trigger and task. To do so,
802+
we need to parse timestamps in log files. In case your log format is non-standard,
803+
you may provide import path to callable which takes a string log line and returns
804+
the timestamp (datetime.datetime compatible).
805+
version_added: 2.6.0
806+
type: string
807+
example: path.to.my_func
808+
default: ~
791809
metrics:
792810
description: |
793811
StatsD (https://github.com/etsy/statsd) integration settings.

airflow/config_templates/default_airflow.cfg

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -434,6 +434,17 @@ extra_logger_names =
434434
# visible from the main web server to connect into the workers.
435435
worker_log_server_port = 8793
436436

437+
# Port to serve logs from for triggerer. See worker_log_server_port description
438+
# for more info.
439+
trigger_log_server_port = 8794
440+
441+
# We must parse timestamps to interleave logs between trigger and task. To do so,
442+
# we need to parse timestamps in log files. In case your log format is non-standard,
443+
# you may provide import path to callable which takes a string log line and returns
444+
# the timestamp (datetime.datetime compatible).
445+
# Example: interleave_timestamp_parser = path.to.my_func
446+
# interleave_timestamp_parser =
447+
437448
[metrics]
438449

439450
# StatsD (https://github.com/etsy/statsd) integration settings.

airflow/example_dags/example_time_delta_sensor_async.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,6 @@
3636
catchup=False,
3737
tags=["example"],
3838
) as dag:
39-
wait = TimeDeltaSensorAsync(task_id="wait", delta=datetime.timedelta(seconds=10))
39+
wait = TimeDeltaSensorAsync(task_id="wait", delta=datetime.timedelta(seconds=30))
4040
finish = EmptyOperator(task_id="finish")
4141
wait >> finish

airflow/executors/base_executor.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -355,14 +355,15 @@ def execute_async(
355355
"""
356356
raise NotImplementedError()
357357

358-
def get_task_log(self, ti: TaskInstance, log: str = "") -> None | str | tuple[str, dict[str, bool]]:
358+
def get_task_log(self, ti: TaskInstance) -> tuple[list[str], list[str]]:
359359
"""
360360
This method can be implemented by any child class to return the task logs.
361361
362362
:param ti: A TaskInstance object
363363
:param log: log str
364364
:return: logs or tuple of logs and meta dict
365365
"""
366+
return [], []
366367

367368
def end(self) -> None: # pragma: no cover
368369
"""Wait synchronously for the previously submitted job to complete."""

airflow/executors/celery_kubernetes_executor.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -141,11 +141,11 @@ def queue_task_instance(
141141
cfg_path=cfg_path,
142142
)
143143

144-
def get_task_log(self, ti: TaskInstance, log: str = "") -> None | str | tuple[str, dict[str, bool]]:
144+
def get_task_log(self, ti: TaskInstance) -> tuple[list[str], list[str]]:
145145
"""Fetch task log from Kubernetes executor"""
146146
if ti.queue == self.kubernetes_executor.kubernetes_queue:
147-
return self.kubernetes_executor.get_task_log(ti=ti, log=log)
148-
return None
147+
return self.kubernetes_executor.get_task_log(ti=ti)
148+
return [], []
149149

150150
def has_task(self, task_instance: TaskInstance) -> bool:
151151
"""

airflow/executors/kubernetes_executor.py

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -781,14 +781,16 @@ def _get_pod_namespace(ti: TaskInstance):
781781
namespace = pod_override.metadata.namespace
782782
return namespace or conf.get("kubernetes_executor", "namespace", fallback="default")
783783

784-
def get_task_log(self, ti: TaskInstance, log: str = "") -> str | tuple[str, dict[str, bool]]:
785-
784+
def get_task_log(self, ti: TaskInstance) -> tuple[list[str], list[str]]:
785+
messages = []
786+
log = []
786787
try:
788+
from airflow.kubernetes.kube_client import get_kube_client
787789
from airflow.kubernetes.pod_generator import PodGenerator
788790

789791
client = get_kube_client()
790792

791-
log += f"*** Trying to get logs (last 100 lines) from worker pod {ti.hostname} ***\n\n"
793+
messages.append(f"Trying to get logs (last 100 lines) from worker pod {ti.hostname}")
792794
selector = PodGenerator.build_selector_for_k8s_executor_pod(
793795
dag_id=ti.dag_id,
794796
task_id=ti.task_id,
@@ -816,13 +818,10 @@ def get_task_log(self, ti: TaskInstance, log: str = "") -> str | tuple[str, dict
816818
)
817819

818820
for line in res:
819-
log += line.decode()
820-
821-
return log
822-
823-
except Exception as f:
824-
log += f"*** Unable to fetch logs from worker pod {ti.hostname} ***\n{str(f)}\n\n"
825-
return log, {"end_of_log": True}
821+
log.append(line.decode())
822+
except Exception as e:
823+
messages.append(f"Reading from k8s pod logs failed: {str(e)}")
824+
return messages, ["\n".join(log)]
826825

827826
def try_adopt_task_instances(self, tis: Sequence[TaskInstance]) -> Sequence[TaskInstance]:
828827
tis_to_flush = [ti for ti in tis if not ti.queued_by_job_id]

0 commit comments

Comments
 (0)