Skip to content

Commit 87769db

Browse files
leahecolekaxil
andauthored
Allow omission of initial_node_count if node_pools is specified (#17820)
* Add error check for config_file parameter * Move error check to init * fix static checks * Apply suggestions from code review * Apply suggestions from code review * Modify validation to allow node_pools * WIP: pass test cases 0, 1, 3 * WIP: pass test case 4 * WIP: pass test case 2 * WIP - pass test cases 5 and 6, remove trailing comma * WIP - pass test case 7 * WIP pass test case 8 * cleanup * fix static check Co-authored-by: Kaxil Naik <[email protected]>
1 parent 83f1f07 commit 87769db

File tree

2 files changed

+80
-7
lines changed

2 files changed

+80
-7
lines changed

airflow/providers/google/cloud/operators/kubernetes_engine.py

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -210,15 +210,35 @@ def __init__(
210210
self._check_input()
211211

212212
def _check_input(self) -> None:
213-
if not all([self.project_id, self.location, self.body]) or not (
214-
(isinstance(self.body, dict) and "name" in self.body and "initial_node_count" in self.body)
215-
or (getattr(self.body, "name", None) and getattr(self.body, "initial_node_count", None))
213+
if (
214+
not all([self.project_id, self.location, self.body])
215+
or (isinstance(self.body, dict) and not ("name" in self.body))
216+
or (
217+
isinstance(self.body, dict)
218+
and ("initial_node_count" not in self.body and "node_pools" not in self.body)
219+
)
220+
or (not (isinstance(self.body, dict)) and not (getattr(self.body, "name", None)))
221+
or (
222+
not (isinstance(self.body, dict))
223+
and (
224+
not (getattr(self.body, "initial_node_count", None))
225+
and not (getattr(self.body, "node_pools", None))
226+
)
227+
)
216228
):
217229
self.log.error(
218230
"One of (project_id, location, body, body['name'], "
219-
"body['initial_node_count']) is missing or incorrect"
231+
"body['initial_node_count']), body['node_pools'] is missing or incorrect"
220232
)
221233
raise AirflowException("Operator has incorrect or missing input.")
234+
elif (
235+
isinstance(self.body, dict) and ("initial_node_count" in self.body and "node_pools" in self.body)
236+
) or (
237+
not (isinstance(self.body, dict))
238+
and (getattr(self.body, "initial_node_count", None) and getattr(self.body, "node_pools", None))
239+
):
240+
self.log.error("Only one of body['initial_node_count']) and body['node_pools'] may be specified")
241+
raise AirflowException("Operator has incorrect or missing input.")
222242

223243
def execute(self, context) -> str:
224244
hook = GKEHook(

tests/providers/google/cloud/operators/test_kubernetes_engine.py

Lines changed: 56 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,17 @@
4040

4141
PROJECT_BODY = {'name': 'test-name'}
4242
PROJECT_BODY_CREATE_DICT = {'name': 'test-name', 'initial_node_count': 1}
43+
PROJECT_BODY_CREATE_DICT_NODE_POOLS = {
44+
'name': 'test-name',
45+
'node_pools': [{'name': 'a_node_pool', 'initial_node_count': 1}],
46+
}
47+
4348
PROJECT_BODY_CREATE_CLUSTER = type("Cluster", (object,), {"name": "test-name", "initial_node_count": 1})()
49+
PROJECT_BODY_CREATE_CLUSTER_NODE_POOLS = type(
50+
'Cluster',
51+
(object,),
52+
{'name': 'test-name', 'node_pools': [{'name': 'a_node_pool', 'initial_node_count': 1}]},
53+
)()
4454

4555
TASK_NAME = 'test-task-name'
4656
NAMESPACE = ('default',)
@@ -52,7 +62,15 @@
5262

5363

5464
class TestGoogleCloudPlatformContainerOperator(unittest.TestCase):
55-
@parameterized.expand((body,) for body in [PROJECT_BODY_CREATE_DICT, PROJECT_BODY_CREATE_CLUSTER])
65+
@parameterized.expand(
66+
(body,)
67+
for body in [
68+
PROJECT_BODY_CREATE_DICT,
69+
PROJECT_BODY_CREATE_DICT_NODE_POOLS,
70+
PROJECT_BODY_CREATE_CLUSTER,
71+
PROJECT_BODY_CREATE_CLUSTER_NODE_POOLS,
72+
]
73+
)
5674
@mock.patch('airflow.providers.google.cloud.operators.kubernetes_engine.GKEHook')
5775
def test_create_execute(self, body, mock_hook):
5876
operator = GKECreateClusterOperator(
@@ -69,9 +87,44 @@ def test_create_execute(self, body, mock_hook):
6987
for body in [
7088
None,
7189
{'missing_name': 'test-name', 'initial_node_count': 1},
72-
{'name': 'test-name', 'missing_initial_node_count': 1},
90+
{
91+
'name': 'test-name',
92+
'initial_node_count': 1,
93+
'node_pools': [{'name': 'a_node_pool', 'initial_node_count': 1}],
94+
},
95+
{'missing_name': 'test-name', 'node_pools': [{'name': 'a_node_pool', 'initial_node_count': 1}]},
96+
{
97+
'name': 'test-name',
98+
'missing_initial_node_count': 1,
99+
'missing_node_pools': [{'name': 'a_node_pool', 'initial_node_count': 1}],
100+
},
73101
type('Cluster', (object,), {'missing_name': 'test-name', 'initial_node_count': 1})(),
74-
type('Cluster', (object,), {'name': 'test-name', 'missing_initial_node_count': 1})(),
102+
type(
103+
'Cluster',
104+
(object,),
105+
{
106+
'missing_name': 'test-name',
107+
'node_pools': [{'name': 'a_node_pool', 'initial_node_count': 1}],
108+
},
109+
)(),
110+
type(
111+
'Cluster',
112+
(object,),
113+
{
114+
'name': 'test-name',
115+
'missing_initial_node_count': 1,
116+
'missing_node_pools': [{'name': 'a_node_pool', 'initial_node_count': 1}],
117+
},
118+
)(),
119+
type(
120+
'Cluster',
121+
(object,),
122+
{
123+
'name': 'test-name',
124+
'initial_node_count': 1,
125+
'node_pools': [{'name': 'a_node_pool', 'initial_node_count': 1}],
126+
},
127+
)(),
75128
]
76129
)
77130
@mock.patch('airflow.providers.google.cloud.operators.kubernetes_engine.GKEHook')

0 commit comments

Comments
 (0)