35
35
_AUTHORIZED_USER_TYPE = "authorized_user"
36
36
_SERVICE_ACCOUNT_TYPE = "service_account"
37
37
_EXTERNAL_ACCOUNT_TYPE = "external_account"
38
- _VALID_TYPES = (_AUTHORIZED_USER_TYPE , _SERVICE_ACCOUNT_TYPE , _EXTERNAL_ACCOUNT_TYPE )
38
+ _IMPERSONATED_SERVICE_ACCOUNT_TYPE = "impersonated_service_account"
39
+ _VALID_TYPES = (
40
+ _AUTHORIZED_USER_TYPE ,
41
+ _SERVICE_ACCOUNT_TYPE ,
42
+ _EXTERNAL_ACCOUNT_TYPE ,
43
+ _IMPERSONATED_SERVICE_ACCOUNT_TYPE ,
44
+ )
39
45
40
46
# Help message when no credentials can be found.
41
47
_HELP_MESSAGE = """\
@@ -79,7 +85,8 @@ def load_credentials_from_file(
79
85
"""Loads Google credentials from a file.
80
86
81
87
The credentials file must be a service account key, stored authorized
82
- user credentials or external account credentials.
88
+ user credentials, external account credentials, or impersonated service
89
+ account credentials.
83
90
84
91
Args:
85
92
filename (str): The full path to the credentials file.
@@ -119,42 +126,25 @@ def load_credentials_from_file(
119
126
"File {} is not a valid json file." .format (filename ), caught_exc
120
127
)
121
128
six .raise_from (new_exc , caught_exc )
129
+ return _load_credentials_from_info (
130
+ filename , info , scopes , default_scopes , quota_project_id , request
131
+ )
132
+
122
133
123
- # The type key should indicate that the file is either a service account
124
- # credentials file or an authorized user credentials file.
134
+ def _load_credentials_from_info (
135
+ filename , info , scopes , default_scopes , quota_project_id , request
136
+ ):
125
137
credential_type = info .get ("type" )
126
138
127
139
if credential_type == _AUTHORIZED_USER_TYPE :
128
- from google .oauth2 import credentials
129
-
130
- try :
131
- credentials = credentials .Credentials .from_authorized_user_info (
132
- info , scopes = scopes
133
- )
134
- except ValueError as caught_exc :
135
- msg = "Failed to load authorized user credentials from {}" .format (filename )
136
- new_exc = exceptions .DefaultCredentialsError (msg , caught_exc )
137
- six .raise_from (new_exc , caught_exc )
138
- if quota_project_id :
139
- credentials = credentials .with_quota_project (quota_project_id )
140
- if not credentials .quota_project_id :
141
- _warn_about_problematic_credentials (credentials )
142
- return credentials , None
140
+ credentials , project_id = _get_authorized_user_credentials (
141
+ filename , info , scopes
142
+ )
143
143
144
144
elif credential_type == _SERVICE_ACCOUNT_TYPE :
145
- from google .oauth2 import service_account
146
-
147
- try :
148
- credentials = service_account .Credentials .from_service_account_info (
149
- info , scopes = scopes , default_scopes = default_scopes
150
- )
151
- except ValueError as caught_exc :
152
- msg = "Failed to load service account credentials from {}" .format (filename )
153
- new_exc = exceptions .DefaultCredentialsError (msg , caught_exc )
154
- six .raise_from (new_exc , caught_exc )
155
- if quota_project_id :
156
- credentials = credentials .with_quota_project (quota_project_id )
157
- return credentials , info .get ("project_id" )
145
+ credentials , project_id = _get_service_account_credentials (
146
+ filename , info , scopes , default_scopes
147
+ )
158
148
159
149
elif credential_type == _EXTERNAL_ACCOUNT_TYPE :
160
150
credentials , project_id = _get_external_account_credentials (
@@ -164,17 +154,19 @@ def load_credentials_from_file(
164
154
default_scopes = default_scopes ,
165
155
request = request ,
166
156
)
167
- if quota_project_id :
168
- credentials = credentials . with_quota_project ( quota_project_id )
169
- return credentials , project_id
170
-
157
+ elif credential_type == _IMPERSONATED_SERVICE_ACCOUNT_TYPE :
158
+ credentials , project_id = _get_impersonated_service_account_credentials (
159
+ filename , info , scopes
160
+ )
171
161
else :
172
162
raise exceptions .DefaultCredentialsError (
173
163
"The file {file} does not have a valid type. "
174
164
"Type is {type}, expected one of {valid_types}." .format (
175
165
file = filename , type = credential_type , valid_types = _VALID_TYPES
176
166
)
177
167
)
168
+ credentials = _apply_quota_project_id (credentials , quota_project_id )
169
+ return credentials , project_id
178
170
179
171
180
172
def _get_gcloud_sdk_credentials (quota_project_id = None ):
@@ -371,6 +363,93 @@ def get_api_key_credentials(api_key_value):
371
363
return api_key .Credentials (api_key_value )
372
364
373
365
366
+ def _get_authorized_user_credentials (filename , info , scopes = None ):
367
+ from google .oauth2 import credentials
368
+
369
+ try :
370
+ credentials = credentials .Credentials .from_authorized_user_info (
371
+ info , scopes = scopes
372
+ )
373
+ except ValueError as caught_exc :
374
+ msg = "Failed to load authorized user credentials from {}" .format (filename )
375
+ new_exc = exceptions .DefaultCredentialsError (msg , caught_exc )
376
+ six .raise_from (new_exc , caught_exc )
377
+ return credentials , None
378
+
379
+
380
+ def _get_service_account_credentials (filename , info , scopes = None , default_scopes = None ):
381
+ from google .oauth2 import service_account
382
+
383
+ try :
384
+ credentials = service_account .Credentials .from_service_account_info (
385
+ info , scopes = scopes , default_scopes = default_scopes
386
+ )
387
+ except ValueError as caught_exc :
388
+ msg = "Failed to load service account credentials from {}" .format (filename )
389
+ new_exc = exceptions .DefaultCredentialsError (msg , caught_exc )
390
+ six .raise_from (new_exc , caught_exc )
391
+ return credentials , info .get ("project_id" )
392
+
393
+
394
+ def _get_impersonated_service_account_credentials (filename , info , scopes ):
395
+ from google .auth import impersonated_credentials
396
+
397
+ try :
398
+ source_credentials_info = info .get ("source_credentials" )
399
+ source_credentials_type = source_credentials_info .get ("type" )
400
+ if source_credentials_type == _AUTHORIZED_USER_TYPE :
401
+ source_credentials , _ = _get_authorized_user_credentials (
402
+ filename , source_credentials_info
403
+ )
404
+ elif source_credentials_type == _SERVICE_ACCOUNT_TYPE :
405
+ source_credentials , _ = _get_service_account_credentials (
406
+ filename , source_credentials_info
407
+ )
408
+ else :
409
+ raise ValueError (
410
+ "source credential of type {} is not supported." .format (
411
+ source_credentials_type
412
+ )
413
+ )
414
+ impersonation_url = info .get ("service_account_impersonation_url" )
415
+ start_index = impersonation_url .rfind ("/" )
416
+ end_index = impersonation_url .find (":generateAccessToken" )
417
+ if start_index == - 1 or end_index == - 1 or start_index > end_index :
418
+ raise ValueError (
419
+ "Cannot extract target principal from {}" .format (impersonation_url )
420
+ )
421
+ target_principal = impersonation_url [start_index + 1 : end_index ]
422
+ delegates = info .get ("delegates" )
423
+ quota_project_id = info .get ("quota_project_id" )
424
+ credentials = impersonated_credentials .Credentials (
425
+ source_credentials ,
426
+ target_principal ,
427
+ scopes ,
428
+ delegates ,
429
+ quota_project_id = quota_project_id ,
430
+ )
431
+ except ValueError as caught_exc :
432
+ msg = "Failed to load impersonated service account credentials from {}" .format (
433
+ filename
434
+ )
435
+ new_exc = exceptions .DefaultCredentialsError (msg , caught_exc )
436
+ six .raise_from (new_exc , caught_exc )
437
+ return credentials , None
438
+
439
+
440
+ def _apply_quota_project_id (credentials , quota_project_id ):
441
+ if quota_project_id :
442
+ credentials = credentials .with_quota_project (quota_project_id )
443
+
444
+ from google .oauth2 import credentials as authorized_user_credentials
445
+
446
+ if isinstance (credentials , authorized_user_credentials .Credentials ) and (
447
+ not credentials .quota_project_id
448
+ ):
449
+ _warn_about_problematic_credentials (credentials )
450
+ return credentials
451
+
452
+
374
453
def default (scopes = None , request = None , quota_project_id = None , default_scopes = None ):
375
454
"""Gets the default credentials for the current environment.
376
455
0 commit comments