Skip to content

Commit 02d9434

Browse files
authored
Don't use time.time() or timezone.utcnow() for duration calculations (#12353)
`time.time() - start`, or `timezone.utcnow() - start_dttm` will work fine in 99% of cases, but it has one fatal flaw: They both operate on system time, and that can go backwards. While this might be surprising, it can happen -- usually due to clocks being adjusted. And while it is might seem rare, for long running processes it is more common than we might expect. Most of these durations are harmless to get wrong (just being logs) it is better to be safe than sorry. Also the `utcnow()` style I have replaced will be much lighter weight - creating a date time object is a comparatively expensive operation, and computing a diff between two even more so, _especially_ when compared to just subtracting two floats. To make the "common" case easier of wanting to compute a duration for a block, I have made `Stats.timer()` return an object that has a `duration` field.
1 parent 8291fab commit 02d9434

File tree

19 files changed

+338
-232
lines changed

19 files changed

+338
-232
lines changed

CONTRIBUTING.rst

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -774,6 +774,47 @@ If this function is designed to be called by "end-users" (i.e. DAG authors) then
774774
...
775775
# You SHOULD not commit the session here. The wrapper will take care of commit()/rollback() if exception
776776
777+
Don't use time() for duration calcuations
778+
-----------------------------------------
779+
780+
If you wish to compute the time difference between two events with in the same process, use
781+
``time.monotonic()``, not ``time.time()`` nor ``timzeone.utcnow()``.
782+
783+
If you are measuring duration for performance reasons, then ``time.perf_counter()`` should be used. (On many
784+
platforms, this uses the same underlying clock mechanism as monotonic, but ``perf_counter`` is guaranteed to be
785+
the highest accuracy clock on the system, monotonic is simply "guaranteed" to not go backwards.)
786+
787+
If you wish to time how long a block of code takes, use ``Stats.timer()`` -- either with a metric name, which
788+
will be timed and submitted automatically:
789+
790+
.. code-block:: python
791+
792+
from airflow.stats import Stats
793+
794+
...
795+
796+
with Stats.timer("my_timer_metric"):
797+
...
798+
799+
or to time but not send a metric:
800+
801+
.. code-block:: python
802+
803+
from airflow.stats import Stats
804+
805+
...
806+
807+
with Stats.timer() as timer:
808+
...
809+
810+
log.info("Code took %.3f seconds", timer.duration)
811+
812+
For full docs on ``timer()`` check out `airflow/stats.py`_.
813+
814+
If the start_date of a duration calculation needs to be stored in a database, then this has to be done using
815+
datetime objects. In all other cases, using datetime for duration calculation MUST be avoided as creating and
816+
diffing datetime operations are (comparatively) slow.
817+
777818
Naming Conventions for provider packages
778819
----------------------------------------
779820

airflow/cli/commands/webserver_command.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ def __init__(
9999

100100
self._num_workers_running = 0
101101
self._num_ready_workers_running = 0
102-
self._last_refresh_time = time.time() if worker_refresh_interval > 0 else None
102+
self._last_refresh_time = time.monotonic() if worker_refresh_interval > 0 else None
103103
self._last_plugin_state = self._generate_plugin_state() if reload_on_plugin_change else None
104104
self._restart_on_next_plugin_check = False
105105

@@ -149,9 +149,9 @@ def _get_num_workers_running(self) -> int:
149149

150150
def _wait_until_true(self, fn, timeout: int = 0) -> None:
151151
"""Sleeps until fn is true"""
152-
start_time = time.time()
152+
start_time = time.monotonic()
153153
while not fn():
154-
if 0 < timeout <= time.time() - start_time:
154+
if 0 < timeout <= time.monotonic() - start_time:
155155
raise AirflowWebServerTimeout(f"No response from gunicorn master within {timeout} seconds")
156156
sleep(0.1)
157157

@@ -274,7 +274,7 @@ def _check_workers(self) -> None:
274274
# If workers should be restarted periodically.
275275
if self.worker_refresh_interval > 0 and self._last_refresh_time:
276276
# and we refreshed the workers a long time ago, refresh the workers
277-
last_refresh_diff = time.time() - self._last_refresh_time
277+
last_refresh_diff = time.monotonic() - self._last_refresh_time
278278
if self.worker_refresh_interval < last_refresh_diff:
279279
num_new_workers = self.worker_refresh_batch_size
280280
self.log.debug(
@@ -284,7 +284,7 @@ def _check_workers(self) -> None:
284284
num_new_workers,
285285
)
286286
self._spawn_new_workers(num_new_workers)
287-
self._last_refresh_time = time.time()
287+
self._last_refresh_time = time.monotonic()
288288
return
289289

290290
# if we should check the directory with the plugin,
@@ -308,7 +308,7 @@ def _check_workers(self) -> None:
308308
num_workers_running,
309309
)
310310
self._restart_on_next_plugin_check = False
311-
self._last_refresh_time = time.time()
311+
self._last_refresh_time = time.monotonic()
312312
self._reload_gunicorn()
313313

314314

airflow/jobs/scheduler_job.py

Lines changed: 24 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
import threading
2929
import time
3030
from collections import defaultdict
31-
from contextlib import ExitStack, redirect_stderr, redirect_stdout, suppress
31+
from contextlib import redirect_stderr, redirect_stdout, suppress
3232
from datetime import timedelta
3333
from multiprocessing.connection import Connection as MultiprocessingConnection
3434
from typing import Any, Callable, DefaultDict, Dict, Iterable, List, Optional, Set, Tuple
@@ -167,17 +167,16 @@ def _run_file_processor(
167167

168168
try:
169169
# redirect stdout/stderr to log
170-
with ExitStack() as exit_stack:
171-
exit_stack.enter_context(redirect_stdout(StreamLogWriter(log, logging.INFO))) # type: ignore
172-
exit_stack.enter_context(redirect_stderr(StreamLogWriter(log, logging.WARN))) # type: ignore
170+
with redirect_stdout(StreamLogWriter(log, logging.INFO)), redirect_stderr(
171+
StreamLogWriter(log, logging.WARN)
172+
), Stats.timer() as timer:
173173
# Re-configure the ORM engine as there are issues with multiple processes
174174
settings.configure_orm()
175175

176176
# Change the thread name to differentiate log lines. This is
177177
# really a separate process, but changing the name of the
178178
# process doesn't work, so changing the thread name instead.
179179
threading.current_thread().name = thread_name
180-
start_time = time.time()
181180

182181
log.info("Started process (PID=%s) to work on %s", os.getpid(), file_path)
183182
dag_file_processor = DagFileProcessor(dag_ids=dag_ids, log=log)
@@ -187,8 +186,7 @@ def _run_file_processor(
187186
callback_requests=callback_requests,
188187
)
189188
result_channel.send(result)
190-
end_time = time.time()
191-
log.info("Processing %s took %.3f seconds", file_path, end_time - start_time)
189+
log.info("Processing %s took %.3f seconds", file_path, timer.duration)
192190
except Exception: # pylint: disable=broad-except
193191
# Log exceptions through the logging framework.
194192
log.exception("Got an exception! Propagating...")
@@ -1372,34 +1370,32 @@ def repeat(*args, **kwargs):
13721370
)
13731371

13741372
for loop_count in itertools.count(start=1):
1375-
loop_start_time = time.time()
1373+
with Stats.timer() as timer:
13761374

1377-
if self.using_sqlite:
1378-
self.processor_agent.run_single_parsing_loop()
1379-
# For the sqlite case w/ 1 thread, wait until the processor
1380-
# is finished to avoid concurrent access to the DB.
1381-
self.log.debug("Waiting for processors to finish since we're using sqlite")
1382-
self.processor_agent.wait_until_finished()
1375+
if self.using_sqlite:
1376+
self.processor_agent.run_single_parsing_loop()
1377+
# For the sqlite case w/ 1 thread, wait until the processor
1378+
# is finished to avoid concurrent access to the DB.
1379+
self.log.debug("Waiting for processors to finish since we're using sqlite")
1380+
self.processor_agent.wait_until_finished()
13831381

1384-
with create_session() as session:
1385-
num_queued_tis = self._do_scheduling(session)
1382+
with create_session() as session:
1383+
num_queued_tis = self._do_scheduling(session)
13861384

1387-
self.executor.heartbeat()
1388-
session.expunge_all()
1389-
num_finished_events = self._process_executor_events(session=session)
1385+
self.executor.heartbeat()
1386+
session.expunge_all()
1387+
num_finished_events = self._process_executor_events(session=session)
13901388

1391-
self.processor_agent.heartbeat()
1389+
self.processor_agent.heartbeat()
13921390

1393-
# Heartbeat the scheduler periodically
1394-
self.heartbeat(only_if_necessary=True)
1391+
# Heartbeat the scheduler periodically
1392+
self.heartbeat(only_if_necessary=True)
13951393

1396-
# Run any pending timed events
1397-
next_event = timers.run(blocking=False)
1398-
self.log.debug("Next timed event is in %f", next_event)
1394+
# Run any pending timed events
1395+
next_event = timers.run(blocking=False)
1396+
self.log.debug("Next timed event is in %f", next_event)
13991397

1400-
loop_end_time = time.time()
1401-
loop_duration = loop_end_time - loop_start_time
1402-
self.log.debug("Ran scheduling loop in %.2f seconds", loop_duration)
1398+
self.log.debug("Ran scheduling loop in %.2f seconds", timer.duration)
14031399

14041400
if not is_unit_test and not num_queued_tis and not num_finished_events:
14051401
# If the scheduler is doing things, don't sleep. This means when there is work to do, the

airflow/models/dagbag.py

Lines changed: 44 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -438,38 +438,37 @@ def collect_dags(
438438
return
439439

440440
self.log.info("Filling up the DagBag from %s", dag_folder)
441-
start_dttm = timezone.utcnow()
442-
dag_folder = dag_folder or self.dag_folder
443-
# Used to store stats around DagBag processing
444-
stats = []
445-
446-
dag_folder = correct_maybe_zipped(dag_folder)
447-
for filepath in list_py_file_paths(
448-
dag_folder,
449-
safe_mode=safe_mode,
450-
include_examples=include_examples,
451-
include_smart_sensor=include_smart_sensor,
452-
):
453-
try:
454-
file_parse_start_dttm = timezone.utcnow()
455-
found_dags = self.process_file(filepath, only_if_updated=only_if_updated, safe_mode=safe_mode)
456-
457-
file_parse_end_dttm = timezone.utcnow()
458-
stats.append(
459-
FileLoadStat(
460-
file=filepath.replace(settings.DAGS_FOLDER, ''),
461-
duration=file_parse_end_dttm - file_parse_start_dttm,
462-
dag_num=len(found_dags),
463-
task_num=sum([len(dag.tasks) for dag in found_dags]),
464-
dags=str([dag.dag_id for dag in found_dags]),
441+
with Stats.timer('collect_dags'):
442+
dag_folder = dag_folder or self.dag_folder
443+
# Used to store stats around DagBag processing
444+
stats = []
445+
446+
dag_folder = correct_maybe_zipped(dag_folder)
447+
for filepath in list_py_file_paths(
448+
dag_folder,
449+
safe_mode=safe_mode,
450+
include_examples=include_examples,
451+
include_smart_sensor=include_smart_sensor,
452+
):
453+
try:
454+
file_parse_start_dttm = timezone.utcnow()
455+
found_dags = self.process_file(
456+
filepath, only_if_updated=only_if_updated, safe_mode=safe_mode
465457
)
466-
)
467-
except Exception as e: # pylint: disable=broad-except
468-
self.log.exception(e)
469458

470-
end_dttm = timezone.utcnow()
471-
durations = (end_dttm - start_dttm).total_seconds()
472-
Stats.gauge('collect_dags', durations, 1)
459+
file_parse_end_dttm = timezone.utcnow()
460+
stats.append(
461+
FileLoadStat(
462+
file=filepath.replace(settings.DAGS_FOLDER, ''),
463+
duration=file_parse_end_dttm - file_parse_start_dttm,
464+
dag_num=len(found_dags),
465+
task_num=sum([len(dag.tasks) for dag in found_dags]),
466+
dags=str([dag.dag_id for dag in found_dags]),
467+
)
468+
)
469+
except Exception as e: # pylint: disable=broad-except
470+
self.log.exception(e)
471+
473472
Stats.gauge('dagbag_size', len(self.dags), 1)
474473
Stats.gauge('dagbag_import_errors', len(self.import_errors), 1)
475474
self.dagbag_stats = sorted(stats, key=lambda x: x.duration, reverse=True)
@@ -483,23 +482,21 @@ def collect_dags_from_db(self):
483482
"""Collects DAGs from database."""
484483
from airflow.models.serialized_dag import SerializedDagModel
485484

486-
start_dttm = timezone.utcnow()
487-
self.log.info("Filling up the DagBag from database")
488-
489-
# The dagbag contains all rows in serialized_dag table. Deleted DAGs are deleted
490-
# from the table by the scheduler job.
491-
self.dags = SerializedDagModel.read_all_dags()
492-
493-
# Adds subdags.
494-
# DAG post-processing steps such as self.bag_dag and croniter are not needed as
495-
# they are done by scheduler before serialization.
496-
subdags = {}
497-
for dag in self.dags.values():
498-
for subdag in dag.subdags:
499-
subdags[subdag.dag_id] = subdag
500-
self.dags.update(subdags)
501-
502-
Stats.timing('collect_db_dags', timezone.utcnow() - start_dttm)
485+
with Stats.timer('collect_db_dags'):
486+
self.log.info("Filling up the DagBag from database")
487+
488+
# The dagbag contains all rows in serialized_dag table. Deleted DAGs are deleted
489+
# from the table by the scheduler job.
490+
self.dags = SerializedDagModel.read_all_dags()
491+
492+
# Adds subdags.
493+
# DAG post-processing steps such as self.bag_dag and croniter are not needed as
494+
# they are done by scheduler before serialization.
495+
subdags = {}
496+
for dag in self.dags.values():
497+
for subdag in dag.subdags:
498+
subdags[subdag.dag_id] = subdag
499+
self.dags.update(subdags)
503500

504501
def dagbag_report(self):
505502
"""Prints a report around DagBag loading stats"""

airflow/models/dagrun.py

Lines changed: 20 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -400,29 +400,26 @@ def update_state(
400400

401401
start_dttm = timezone.utcnow()
402402
self.last_scheduling_decision = start_dttm
403-
404-
dag = self.get_dag()
405-
info = self.task_instance_scheduling_decisions(session)
406-
407-
tis = info.tis
408-
schedulable_tis = info.schedulable_tis
409-
changed_tis = info.changed_tis
410-
finished_tasks = info.finished_tasks
411-
unfinished_tasks = info.unfinished_tasks
412-
413-
none_depends_on_past = all(not t.task.depends_on_past for t in unfinished_tasks)
414-
none_task_concurrency = all(t.task.task_concurrency is None for t in unfinished_tasks)
415-
416-
if unfinished_tasks and none_depends_on_past and none_task_concurrency:
417-
# small speed up
418-
are_runnable_tasks = (
419-
schedulable_tis
420-
or self._are_premature_tis(unfinished_tasks, finished_tasks, session)
421-
or changed_tis
422-
)
423-
424-
duration = timezone.utcnow() - start_dttm
425-
Stats.timing(f"dagrun.dependency-check.{self.dag_id}", duration)
403+
with Stats.timer(f"dagrun.dependency-check.{self.dag_id}"):
404+
dag = self.get_dag()
405+
info = self.task_instance_scheduling_decisions(session)
406+
407+
tis = info.tis
408+
schedulable_tis = info.schedulable_tis
409+
changed_tis = info.changed_tis
410+
finished_tasks = info.finished_tasks
411+
unfinished_tasks = info.unfinished_tasks
412+
413+
none_depends_on_past = all(not t.task.depends_on_past for t in unfinished_tasks)
414+
none_task_concurrency = all(t.task.task_concurrency is None for t in unfinished_tasks)
415+
416+
if unfinished_tasks and none_depends_on_past and none_task_concurrency:
417+
# small speed up
418+
are_runnable_tasks = (
419+
schedulable_tis
420+
or self._are_premature_tis(unfinished_tasks, finished_tasks, session)
421+
or changed_tis
422+
)
426423

427424
leaf_task_ids = {t.task_id for t in dag.leaves}
428425
leaf_tis = [ti for ti in tis if ti.task_id in leaf_task_ids]

0 commit comments

Comments
 (0)