Skip to content

Commit 25e9047

Browse files
authored
[AIRFLOW-6193] Do not use asserts in Airflow main code (#6749)
* [AIRFLOW-6193] Do not use asserts in Airflow main code
1 parent d087925 commit 25e9047

37 files changed

+348
-149
lines changed

airflow/api/common/experimental/trigger_dag.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,8 @@ def _trigger_dag(
5454

5555
execution_date = execution_date if execution_date else timezone.utcnow()
5656

57-
assert timezone.is_localized(execution_date)
57+
if not timezone.is_localized(execution_date):
58+
raise ValueError("The execution_date should be localized")
5859

5960
if replace_microseconds:
6061
execution_date = execution_date.replace(microsecond=0)

airflow/contrib/example_dags/example_kubernetes_executor.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,8 @@ def use_zip_binary():
7272
:rtype: bool
7373
"""
7474
return_code = os.system("zip")
75-
assert return_code == 0
75+
if return_code != 0:
76+
raise SystemError("The zip binary is missing")
7677

7778
# You don't have to use any special KubernetesExecutor configuration if you don't want to
7879
start_task = PythonOperator(

airflow/contrib/example_dags/example_kubernetes_executor_config.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,8 @@ def test_volume_mount():
4545
foo.write('Hello')
4646

4747
return_code = os.system("cat /foo/volume_mount_test.txt")
48-
assert return_code == 0
48+
if return_code != 0:
49+
raise ValueError(f"Error when checking volume mount. Return code {return_code}")
4950

5051
# You can use annotations on your kubernetes pods!
5152
start_task = PythonOperator(

airflow/example_dags/example_xcom.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,15 +50,20 @@ def puller(**kwargs):
5050

5151
# get value_1
5252
pulled_value_1 = ti.xcom_pull(key=None, task_ids='push')
53-
assert pulled_value_1 == value_1
53+
if pulled_value_1 != value_1:
54+
raise ValueError(f'The two values differ {pulled_value_1} and {value_1}')
5455

5556
# get value_2
5657
pulled_value_2 = ti.xcom_pull(task_ids='push_by_returning')
57-
assert pulled_value_2 == value_2
58+
if pulled_value_2 != value_2:
59+
raise ValueError(f'The two values differ {pulled_value_2} and {value_2}')
5860

5961
# get both value_1 and value_2
6062
pulled_value_1, pulled_value_2 = ti.xcom_pull(key=None, task_ids=['push', 'push_by_returning'])
61-
assert (pulled_value_1, pulled_value_2) == (value_1, value_2)
63+
if pulled_value_1 != value_1:
64+
raise ValueError(f'The two values differ {pulled_value_1} and {value_1}')
65+
if pulled_value_2 != value_2:
66+
raise ValueError(f'The two values differ {pulled_value_2} and {value_2}')
6267

6368

6469
push1 = PythonOperator(

airflow/executors/dask_executor.py

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
from distributed import Client, Future, as_completed
2323
from distributed.security import Security
2424

25+
from airflow import AirflowException
2526
from airflow.configuration import conf
2627
from airflow.executors.base_executor import NOT_STARTED_MESSAGE, BaseExecutor, CommandType
2728
from airflow.models.taskinstance import TaskInstanceKeyType
@@ -35,7 +36,8 @@ def __init__(self, cluster_address=None):
3536
super().__init__(parallelism=0)
3637
if cluster_address is None:
3738
cluster_address = conf.get('dask', 'cluster_address')
38-
assert cluster_address, 'Please provide a Dask cluster address in airflow.cfg'
39+
if not cluster_address:
40+
raise ValueError('Please provide a Dask cluster address in airflow.cfg')
3941
self.cluster_address = cluster_address
4042
# ssl / tls parameters
4143
self.tls_ca = conf.get('dask', 'tls_ca')
@@ -63,17 +65,21 @@ def execute_async(self,
6365
command: CommandType,
6466
queue: Optional[str] = None,
6567
executor_config: Optional[Any] = None) -> None:
66-
assert self.futures, NOT_STARTED_MESSAGE
68+
if not self.futures:
69+
raise AirflowException(NOT_STARTED_MESSAGE)
6770

6871
def airflow_run():
6972
return subprocess.check_call(command, close_fds=True)
7073

71-
assert self.client, "The Dask executor has not been started yet!"
74+
if not self.client:
75+
raise AirflowException(NOT_STARTED_MESSAGE)
76+
7277
future = self.client.submit(airflow_run, pure=False)
7378
self.futures[future] = key
7479

7580
def _process_future(self, future: Future) -> None:
76-
assert self.futures, NOT_STARTED_MESSAGE
81+
if not self.futures:
82+
raise AirflowException(NOT_STARTED_MESSAGE)
7783
if future.done():
7884
key = self.futures[future]
7985
if future.exception():
@@ -87,19 +93,23 @@ def _process_future(self, future: Future) -> None:
8793
self.futures.pop(future)
8894

8995
def sync(self) -> None:
90-
assert self.futures, NOT_STARTED_MESSAGE
96+
if not self.futures:
97+
raise AirflowException(NOT_STARTED_MESSAGE)
9198
# make a copy so futures can be popped during iteration
9299
for future in self.futures.copy():
93100
self._process_future(future)
94101

95102
def end(self) -> None:
96-
assert self.client, NOT_STARTED_MESSAGE
97-
assert self.futures, NOT_STARTED_MESSAGE
103+
if not self.client:
104+
raise AirflowException(NOT_STARTED_MESSAGE)
105+
if not self.futures:
106+
raise AirflowException(NOT_STARTED_MESSAGE)
98107
self.client.cancel(list(self.futures.keys()))
99108
for future in as_completed(self.futures.copy()):
100109
self._process_future(future)
101110

102111
def terminate(self):
103-
assert self.futures, NOT_STARTED_MESSAGE
112+
if not self.futures:
113+
raise AirflowException(NOT_STARTED_MESSAGE)
104114
self.client.cancel(self.futures.keys())
105115
self.end()

airflow/executors/executor_loader.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,9 @@ def _get_executor(executor_name: str) -> BaseExecutor:
7878
from airflow import plugins_manager
7979
plugins_manager.integrate_executor_plugins()
8080
executor_path = executor_name.split('.')
81-
assert len(executor_path) == 2, f"Executor {executor_name} not supported: " \
82-
f"please specify in format plugin_module.executor"
83-
84-
assert executor_path[0] in globals(), f"Executor {executor_name} not supported"
81+
if len(executor_path) != 2:
82+
raise ValueError(f"Executor {executor_name} not supported: "
83+
f"please specify in format plugin_module.executor")
84+
if executor_path[0] not in globals():
85+
raise ValueError(f"Executor {executor_name} not supported")
8586
return globals()[executor_path[0]].__dict__[executor_path[1]]()

airflow/executors/kubernetes_executor.py

Lines changed: 30 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -269,7 +269,8 @@ def __init__(self,
269269
def run(self) -> None:
270270
"""Performs watching"""
271271
kube_client: client.CoreV1Api = get_kube_client()
272-
assert self.worker_uuid, NOT_STARTED_MESSAGE
272+
if not self.worker_uuid:
273+
raise AirflowException(NOT_STARTED_MESSAGE)
273274
while True:
274275
try:
275276
self.resource_version = self._run(kube_client, self.resource_version,
@@ -657,7 +658,8 @@ def clear_not_launched_queued_tasks(self, session=None) -> None:
657658
proper support
658659
for State.LAUNCHED
659660
"""
660-
assert self.kube_client, NOT_STARTED_MESSAGE
661+
if not self.kube_client:
662+
raise AirflowException(NOT_STARTED_MESSAGE)
661663
queued_tasks = session\
662664
.query(TaskInstance)\
663665
.filter(TaskInstance.state == State.QUEUED).all()
@@ -737,7 +739,8 @@ def start(self) -> None:
737739
"""Starts the executor"""
738740
self.log.info('Start Kubernetes executor')
739741
self.worker_uuid = KubeWorkerIdentifier.get_or_create_current_kube_worker_uuid()
740-
assert self.worker_uuid, "Could not get worker_uuid"
742+
if not self.worker_uuid:
743+
raise AirflowException("Could not get worker uuid")
741744
self.log.debug('Start with worker_uuid: %s', self.worker_uuid)
742745
# always need to reset resource version since we don't know
743746
# when we last started, note for behavior below
@@ -764,7 +767,8 @@ def execute_async(self,
764767
)
765768

766769
kube_executor_config = PodGenerator.from_obj(executor_config)
767-
assert self.task_queue, NOT_STARTED_MESSAGE
770+
if not self.task_queue:
771+
raise AirflowException(NOT_STARTED_MESSAGE)
768772
self.task_queue.put((key, command, kube_executor_config))
769773

770774
def sync(self) -> None:
@@ -773,10 +777,16 @@ def sync(self) -> None:
773777
self.log.debug('self.running: %s', self.running)
774778
if self.queued_tasks:
775779
self.log.debug('self.queued: %s', self.queued_tasks)
776-
assert self.kube_scheduler, NOT_STARTED_MESSAGE
777-
assert self.kube_config, NOT_STARTED_MESSAGE
778-
assert self.result_queue, NOT_STARTED_MESSAGE
779-
assert self.task_queue, NOT_STARTED_MESSAGE
780+
if not self.worker_uuid:
781+
raise AirflowException(NOT_STARTED_MESSAGE)
782+
if not self.kube_scheduler:
783+
raise AirflowException(NOT_STARTED_MESSAGE)
784+
if not self.kube_config:
785+
raise AirflowException(NOT_STARTED_MESSAGE)
786+
if not self.result_queue:
787+
raise AirflowException(NOT_STARTED_MESSAGE)
788+
if not self.task_queue:
789+
raise AirflowException(NOT_STARTED_MESSAGE)
780790
self.kube_scheduler.sync()
781791

782792
last_resource_version = None
@@ -819,7 +829,8 @@ def sync(self) -> None:
819829
def _change_state(self, key: TaskInstanceKeyType, state: Optional[str], pod_id: str) -> None:
820830
if state != State.RUNNING:
821831
if self.kube_config.delete_worker_pods:
822-
assert self.kube_scheduler, NOT_STARTED_MESSAGE
832+
if not self.kube_scheduler:
833+
raise AirflowException(NOT_STARTED_MESSAGE)
823834
self.kube_scheduler.delete_pod(pod_id)
824835
self.log.info('Deleted pod: %s', str(key))
825836
try:
@@ -829,7 +840,8 @@ def _change_state(self, key: TaskInstanceKeyType, state: Optional[str], pod_id:
829840
self.event_buffer[key] = state
830841

831842
def _flush_task_queue(self) -> None:
832-
assert self.task_queue, NOT_STARTED_MESSAGE
843+
if not self.task_queue:
844+
raise AirflowException(NOT_STARTED_MESSAGE)
833845
self.log.debug('Executor shutting down, task_queue approximate size=%d', self.task_queue.qsize())
834846
while True:
835847
try:
@@ -841,7 +853,8 @@ def _flush_task_queue(self) -> None:
841853
break
842854

843855
def _flush_result_queue(self) -> None:
844-
assert self.result_queue, NOT_STARTED_MESSAGE
856+
if not self.result_queue:
857+
raise AirflowException(NOT_STARTED_MESSAGE)
845858
self.log.debug('Executor shutting down, result_queue approximate size=%d', self.result_queue.qsize())
846859
while True: # pylint: disable=too-many-nested-blocks
847860
try:
@@ -863,9 +876,12 @@ def _flush_result_queue(self) -> None:
863876

864877
def end(self) -> None:
865878
"""Called when the executor shuts down"""
866-
assert self.task_queue, NOT_STARTED_MESSAGE
867-
assert self.result_queue, NOT_STARTED_MESSAGE
868-
assert self.kube_scheduler, NOT_STARTED_MESSAGE
879+
if not self.task_queue:
880+
raise AirflowException(NOT_STARTED_MESSAGE)
881+
if not self.result_queue:
882+
raise AirflowException(NOT_STARTED_MESSAGE)
883+
if not self.kube_scheduler:
884+
raise AirflowException(NOT_STARTED_MESSAGE)
869885
self.log.info('Shutting down Kubernetes executor')
870886
self.log.debug('Flushing task_queue...')
871887
self._flush_task_queue()

airflow/executors/local_executor.py

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,8 @@ def execute_async(self,
188188
:param queue: Name of the queue
189189
:param executor_config: configuration for the executor
190190
"""
191-
assert self.executor.result_queue, NOT_STARTED_MESSAGE
191+
if not self.executor.result_queue:
192+
raise AirflowException(NOT_STARTED_MESSAGE)
192193
local_worker = LocalWorker(self.executor.result_queue, key=key, command=command)
193194
self.executor.workers_used += 1
194195
self.executor.workers_active += 1
@@ -228,10 +229,10 @@ def __init__(self, executor: 'LocalExecutor'):
228229
def start(self) -> None:
229230
"""Starts limited parallelism implementation."""
230231
if not self.executor.manager:
231-
raise AirflowException("Executor must be started!")
232+
raise AirflowException(NOT_STARTED_MESSAGE)
232233
self.queue = self.executor.manager.Queue()
233234
if not self.executor.result_queue:
234-
raise AirflowException("Executor must be started!")
235+
raise AirflowException(NOT_STARTED_MESSAGE)
235236
self.executor.workers = [
236237
QueuedLocalWorker(self.queue, self.executor.result_queue)
237238
for _ in range(self.executor.parallelism)
@@ -257,7 +258,8 @@ def execute_async(self,
257258
:param queue: name of the queue
258259
:param executor_config: configuration for the executor
259260
"""
260-
assert self.queue, NOT_STARTED_MESSAGE
261+
if not self.queue:
262+
raise AirflowException(NOT_STARTED_MESSAGE)
261263
self.queue.put((key, command))
262264

263265
def sync(self):
@@ -300,23 +302,27 @@ def execute_async(self, key: TaskInstanceKeyType,
300302
queue: Optional[str] = None,
301303
executor_config: Optional[Any] = None) -> None:
302304
"""Execute asynchronously."""
303-
assert self.impl, NOT_STARTED_MESSAGE
305+
if not self.impl:
306+
raise AirflowException(NOT_STARTED_MESSAGE)
304307
self.impl.execute_async(key=key, command=command, queue=queue, executor_config=executor_config)
305308

306309
def sync(self) -> None:
307310
"""
308311
Sync will get called periodically by the heartbeat method.
309312
"""
310-
assert self.impl, NOT_STARTED_MESSAGE
313+
if not self.impl:
314+
raise AirflowException(NOT_STARTED_MESSAGE)
311315
self.impl.sync()
312316

313317
def end(self) -> None:
314318
"""
315319
Ends the executor.
316320
:return:
317321
"""
318-
assert self.impl, NOT_STARTED_MESSAGE
319-
assert self.manager, NOT_STARTED_MESSAGE
322+
if not self.impl:
323+
raise AirflowException(NOT_STARTED_MESSAGE)
324+
if not self.manager:
325+
raise AirflowException(NOT_STARTED_MESSAGE)
320326
self.impl.end()
321327
self.manager.shutdown()
322328

0 commit comments

Comments
 (0)