OXIESEC PANEL
- Current Dir:
/
/
opt
/
gsutil
/
gslib
/
tests
Server IP: 2a02:4780:11:1594:0:ef5:22d7:a
Upload:
Create Dir:
Name
Size
Modified
Perms
📁
..
-
02/11/2025 08:19:48 AM
rwxr-xr-x
📄
__init__.py
808 bytes
12/09/2024 05:26:03 PM
rw-r--r--
📁
__pycache__
-
02/11/2025 08:19:49 AM
rwxr-xr-x
📄
mock_cloud_api.py
7.97 KB
12/09/2024 05:26:03 PM
rw-r--r--
📄
mock_logging_handler.py
1.28 KB
12/09/2024 05:26:03 PM
rw-r--r--
📄
rewrite_helper.py
2.8 KB
12/09/2024 05:26:03 PM
rw-r--r--
📄
signurl_signatures.py
5.7 KB
12/09/2024 05:26:03 PM
rw-r--r--
📄
test_Doption.py
9.72 KB
12/09/2024 05:26:03 PM
rw-r--r--
📄
test_acl.py
55.99 KB
12/09/2024 05:26:03 PM
rw-r--r--
📄
test_autoclass.py
6.85 KB
12/09/2024 05:26:03 PM
rw-r--r--
📄
test_boto_util.py
9.65 KB
12/09/2024 05:26:03 PM
rw-r--r--
📄
test_bucketconfig.py
4.98 KB
12/09/2024 05:26:03 PM
rw-r--r--
📄
test_bucketpolicyonly.py
3.78 KB
12/09/2024 05:26:03 PM
rw-r--r--
📄
test_cat.py
11.55 KB
12/09/2024 05:26:03 PM
rw-r--r--
📄
test_cloud_api_delegator.py
2 KB
12/09/2024 05:26:03 PM
rw-r--r--
📄
test_command.py
3.39 KB
12/09/2024 05:26:03 PM
rw-r--r--
📄
test_command_runner.py
20.66 KB
12/09/2024 05:26:03 PM
rw-r--r--
📄
test_compose.py
14.12 KB
12/09/2024 05:26:03 PM
rw-r--r--
📄
test_context_config.py
18.78 KB
12/09/2024 05:26:03 PM
rw-r--r--
📄
test_copy_helper_funcs.py
39.76 KB
12/09/2024 05:26:03 PM
rw-r--r--
📄
test_copy_objects_iterator.py
4.49 KB
12/09/2024 05:26:03 PM
rw-r--r--
📄
test_cors.py
12.45 KB
12/09/2024 05:26:03 PM
rw-r--r--
📄
test_cp.py
216 KB
12/09/2024 05:26:03 PM
rw-r--r--
📄
test_creds_config.py
8.64 KB
12/09/2024 05:26:03 PM
rw-r--r--
📄
test_daisy_chain_wrapper.py
14.13 KB
12/09/2024 05:26:03 PM
rw-r--r--
📁
test_data
-
12/09/2024 05:26:03 PM
rwxr-xr-x
📄
test_defacl.py
14.36 KB
12/09/2024 05:26:03 PM
rw-r--r--
📄
test_defstorageclass.py
5.43 KB
12/09/2024 05:26:03 PM
rw-r--r--
📄
test_du.py
10.61 KB
12/09/2024 05:26:03 PM
rw-r--r--
📄
test_encryption_helper.py
4.62 KB
12/09/2024 05:26:03 PM
rw-r--r--
📄
test_execution_util.py
3.88 KB
12/09/2024 05:26:03 PM
rw-r--r--
📄
test_file_part.py
3.38 KB
12/09/2024 05:26:03 PM
rw-r--r--
📄
test_gcs_json_api.py
2.9 KB
12/09/2024 05:26:03 PM
rw-r--r--
📄
test_gcs_json_credentials.py
9.85 KB
12/09/2024 05:26:03 PM
rw-r--r--
📄
test_gcs_json_media.py
7.44 KB
12/09/2024 05:26:03 PM
rw-r--r--
📄
test_gsutil.py
4.68 KB
12/09/2024 05:26:03 PM
rw-r--r--
📄
test_hash.py
9.58 KB
12/09/2024 05:26:03 PM
rw-r--r--
📄
test_hashing_helper.py
10.78 KB
12/09/2024 05:26:03 PM
rw-r--r--
📄
test_help.py
3.5 KB
12/09/2024 05:26:03 PM
rw-r--r--
📄
test_hmac.py
23.9 KB
12/09/2024 05:26:03 PM
rw-r--r--
📄
test_iam.py
90.67 KB
12/09/2024 05:26:03 PM
rw-r--r--
📄
test_kms.py
16.68 KB
12/09/2024 05:26:03 PM
rw-r--r--
📄
test_label.py
11.55 KB
12/09/2024 05:26:03 PM
rw-r--r--
📄
test_lifecycle.py
13.8 KB
12/09/2024 05:26:03 PM
rw-r--r--
📄
test_logging.py
3.5 KB
12/09/2024 05:26:03 PM
rw-r--r--
📄
test_ls.py
53.16 KB
12/09/2024 05:26:03 PM
rw-r--r--
📄
test_mb.py
19.64 KB
12/09/2024 05:26:03 PM
rw-r--r--
📄
test_metrics.py
51.65 KB
12/09/2024 05:26:03 PM
rw-r--r--
📄
test_mtls.py
2.01 KB
12/09/2024 05:26:03 PM
rw-r--r--
📄
test_mv.py
13.04 KB
12/09/2024 05:26:03 PM
rw-r--r--
📄
test_naming.py
63.11 KB
12/09/2024 05:26:03 PM
rw-r--r--
📄
test_notification.py
5.9 KB
12/09/2024 05:26:03 PM
rw-r--r--
📄
test_notification_pubsub.py
5.46 KB
12/09/2024 05:26:03 PM
rw-r--r--
📄
test_pap.py
5.91 KB
12/09/2024 05:26:03 PM
rw-r--r--
📄
test_parallel_cp.py
10.15 KB
12/09/2024 05:26:03 PM
rw-r--r--
📄
test_parallelism_framework.py
33.09 KB
12/09/2024 05:26:03 PM
rw-r--r--
📄
test_perfdiag.py
12.62 KB
12/09/2024 05:26:03 PM
rw-r--r--
📄
test_plurality_checkable_iterator.py
7.53 KB
12/09/2024 05:26:03 PM
rw-r--r--
📄
test_posix_util.py
2.03 KB
12/09/2024 05:26:03 PM
rw-r--r--
📄
test_psc.py
5.88 KB
12/09/2024 05:26:03 PM
rw-r--r--
📄
test_rb.py
2.93 KB
12/09/2024 05:26:03 PM
rw-r--r--
📄
test_requester_pays.py
11.7 KB
12/09/2024 05:26:03 PM
rw-r--r--
📄
test_resumable_streaming.py
12.36 KB
12/09/2024 05:26:03 PM
rw-r--r--
📄
test_retention.py
28.9 KB
12/09/2024 05:26:03 PM
rw-r--r--
📄
test_retention_util.py
5.49 KB
12/09/2024 05:26:03 PM
rw-r--r--
📄
test_rewrite.py
31.62 KB
12/09/2024 05:26:03 PM
rw-r--r--
📄
test_rm.py
33.72 KB
12/09/2024 05:26:03 PM
rw-r--r--
📄
test_rpo.py
10.22 KB
12/09/2024 05:26:03 PM
rw-r--r--
📄
test_rsync.py
149.33 KB
12/09/2024 05:26:03 PM
rw-r--r--
📄
test_rsync_funcs.py
3.36 KB
12/09/2024 05:26:03 PM
rw-r--r--
📄
test_seek_ahead_thread.py
8.79 KB
12/09/2024 05:26:03 PM
rw-r--r--
📄
test_setmeta.py
12.54 KB
12/09/2024 05:26:03 PM
rw-r--r--
📄
test_shim_util.py
64.19 KB
12/09/2024 05:26:03 PM
rw-r--r--
📄
test_signurl.py
24.69 KB
12/09/2024 05:26:03 PM
rw-r--r--
📄
test_stat.py
11.38 KB
12/09/2024 05:26:03 PM
rw-r--r--
📄
test_stet_cp.py
5.77 KB
12/09/2024 05:26:03 PM
rw-r--r--
📄
test_stet_util.py
7.38 KB
12/09/2024 05:26:03 PM
rw-r--r--
📄
test_storage_url.py
7.02 KB
12/09/2024 05:26:03 PM
rw-r--r--
📄
test_tabcomplete.py
14.31 KB
12/09/2024 05:26:03 PM
rw-r--r--
📄
test_temporary_file_util.py
1.54 KB
12/09/2024 05:26:03 PM
rw-r--r--
📄
test_trace.py
1.76 KB
12/09/2024 05:26:03 PM
rw-r--r--
📄
test_tracker_file.py
9.9 KB
12/09/2024 05:26:03 PM
rw-r--r--
📄
test_ubla.py
3.88 KB
12/09/2024 05:26:03 PM
rw-r--r--
📄
test_ui.py
67.42 KB
12/09/2024 05:26:03 PM
rw-r--r--
📄
test_update.py
10.37 KB
12/09/2024 05:26:03 PM
rw-r--r--
📄
test_user_agent_helper.py
5.34 KB
12/09/2024 05:26:03 PM
rw-r--r--
📄
test_util.py
19.85 KB
12/09/2024 05:26:03 PM
rw-r--r--
📄
test_versioning.py
3.61 KB
12/09/2024 05:26:03 PM
rw-r--r--
📄
test_web.py
6.54 KB
12/09/2024 05:26:03 PM
rw-r--r--
📄
test_wildcard_iterator.py
22.18 KB
12/09/2024 05:26:03 PM
rw-r--r--
📄
test_wrapped_credentials.py
13.95 KB
12/09/2024 05:26:03 PM
rw-r--r--
📁
testcase
-
12/09/2024 05:26:03 PM
rwxr-xr-x
📄
util.py
29.01 KB
12/09/2024 05:26:03 PM
rw-r--r--
Editing: test_gcs_json_credentials.py
Close
# -*- coding: utf-8 -*- # Copyright 2022 Google LLC. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Tests for gcs_json_credentials.py.""" from __future__ import absolute_import from __future__ import print_function from __future__ import division from __future__ import unicode_literals from apitools.base.py import GceAssertionCredentials from google_reauth import reauth_creds from gslib import gcs_json_api from gslib import gcs_json_credentials from gslib.cred_types import CredTypes from gslib.exception import CommandException from gslib.tests import testcase from gslib.tests.util import SetBotoConfigForTest from gslib.tests.util import unittest from gslib.utils.wrapped_credentials import WrappedCredentials import logging from oauth2client.service_account import ServiceAccountCredentials import pkgutil from six import add_move, MovedModule add_move(MovedModule("mock", "mock", "unittest.mock")) from six.moves import mock try: from OpenSSL.crypto import load_pkcs12 HAS_OPENSSL = True except ImportError: HAS_OPENSSL = False try: import cryptography HAS_CRYPTO = True except ImportError: HAS_CRYPTO = False ERROR_MESSAGE = "This is the error message" def getBotoCredentialsConfig( service_account_creds=None, user_account_creds=None, gce_creds=None, external_account_creds=None, external_account_authorized_user_creds=None, ): config = [] if service_account_creds: config.append(("Credentials", "gs_service_key_file", service_account_creds["keyfile"])) config.append(("Credentials", "gs_service_client_id", service_account_creds["client_id"])) else: config.append(("Credentials", "gs_service_key_file", None)) config.extend([("Credentials", "gs_oauth2_refresh_token", user_account_creds), ("GoogleCompute", "service_account", gce_creds), ("Credentials", "gs_external_account_file", external_account_creds), ("Credentials", "gs_external_account_authorized_user_file", external_account_authorized_user_creds)]) return config class TestGcsJsonCredentials(testcase.GsUtilUnitTestCase): """Test logic for interacting with GCS JSON Credentials.""" @mock.patch.object(gcs_json_credentials.P12Credentials, "from_service_account_pkcs12_keystring", return_value=gcs_json_credentials.P12Credentials(mock.Mock(), token_uri='123', service_account_email='123', scopes=['a', 'b'])) def testOauth2ServiceAccountCredential(self, _): contents = pkgutil.get_data("gslib", "tests/test_data/test.p12") tmpfile = self.CreateTempFile(contents=contents) with SetBotoConfigForTest( getBotoCredentialsConfig(service_account_creds={ "keyfile": tmpfile, "client_id": "?", })): self.assertTrue(gcs_json_credentials._HasOauth2ServiceAccountCreds()) client = gcs_json_api.GcsJsonApi(None, None, None, None) self.assertEqual(client.credentials.service_account_email, '123') self.assertIsInstance(client.credentials, gcs_json_credentials.P12Credentials) def testP12CredentialsthrowsErrorIfProvidedWithMissingFields(self): contents = pkgutil.get_data("gslib", "tests/test_data/test.p12") tmpfile = self.CreateTempFile(contents=contents) with self.assertRaises(Exception) as exc: gcs_json_credentials.CreateP12ServiceAccount(tmpfile) @unittest.skipUnless(HAS_CRYPTO, 'p12credentials requires cryptography.') @mock.patch.object(gcs_json_credentials.P12Credentials, "__init__", side_effect=ValueError(ERROR_MESSAGE)) def testOauth2ServiceAccountFailure(self, _): contents = pkgutil.get_data("gslib", "tests/test_data/test.p12") tmpfile = self.CreateTempFile(contents=contents) with SetBotoConfigForTest( getBotoCredentialsConfig(service_account_creds={ "keyfile": tmpfile, "client_id": "?", })): with self.assertLogs() as logger: with self.assertRaises(Exception) as exc: gcs_json_api.GcsJsonApi(None, logging.getLogger(), None, None) self.assertIn(ERROR_MESSAGE, str(exc.exception)) self.assertIn(CredTypes.OAUTH2_SERVICE_ACCOUNT, logger.output[0]) def testOauth2UserCredential(self): with SetBotoConfigForTest(getBotoCredentialsConfig(user_account_creds="?")): self.assertTrue(gcs_json_credentials._HasOauth2UserAccountCreds()) client = gcs_json_api.GcsJsonApi(None, None, None, None) self.assertIsInstance(client.credentials, reauth_creds.Oauth2WithReauthCredentials) @mock.patch.object(reauth_creds.Oauth2WithReauthCredentials, "__init__", side_effect=ValueError(ERROR_MESSAGE)) def testOauth2UserFailure(self, _): with SetBotoConfigForTest(getBotoCredentialsConfig(user_account_creds="?")): with self.assertLogs() as logger: with self.assertRaises(Exception) as exc: gcs_json_api.GcsJsonApi(None, logging.getLogger(), None, None) self.assertIn(ERROR_MESSAGE, str(exc.exception)) self.assertIn(CredTypes.OAUTH2_USER_ACCOUNT, logger.output[0]) @mock.patch.object(gcs_json_credentials.credentials_lib, 'GceAssertionCredentials', autospec=True) def testGCECredential(self, mock_credentials): def set_store(store): mock_credentials.return_value.store = store mock_credentials.return_value.client_id = None mock_credentials.return_value.refresh_token = "rEfrEshtOkEn" mock_credentials.return_value.set_store = set_store with SetBotoConfigForTest(getBotoCredentialsConfig(gce_creds="?")): self.assertTrue(gcs_json_credentials._HasGceCreds()) client = gcs_json_api.GcsJsonApi(None, None, None, None) self.assertIsInstance(client.credentials, GceAssertionCredentials) self.assertEqual(client.credentials.refresh_token, "rEfrEshtOkEn") self.assertIs(client.credentials.client_id, None) @mock.patch.object(GceAssertionCredentials, "__init__", side_effect=ValueError(ERROR_MESSAGE)) def testGCECredentialFailure(self, _): with SetBotoConfigForTest(getBotoCredentialsConfig(gce_creds="?")): with self.assertLogs() as logger: with self.assertRaises(Exception) as exc: gcs_json_api.GcsJsonApi(None, logging.getLogger(), None, None) self.assertIn(ERROR_MESSAGE, str(exc.exception)) self.assertIn(CredTypes.GCE, logger.output[0]) def testExternalAccountCredential(self): contents = pkgutil.get_data( "gslib", "tests/test_data/test_external_account_credentials.json") tmpfile = self.CreateTempFile(contents=contents) with SetBotoConfigForTest( getBotoCredentialsConfig(external_account_creds=tmpfile)): client = gcs_json_api.GcsJsonApi(None, None, None, None) self.assertIsInstance(client.credentials, WrappedCredentials) @mock.patch.object(WrappedCredentials, "__init__", side_effect=ValueError(ERROR_MESSAGE)) def testExternalAccountFailure(self, _): contents = pkgutil.get_data( "gslib", "tests/test_data/test_external_account_credentials.json") tmpfile = self.CreateTempFile(contents=contents) with SetBotoConfigForTest( getBotoCredentialsConfig(external_account_creds=tmpfile)): with self.assertLogs() as logger: with self.assertRaises(Exception) as exc: gcs_json_api.GcsJsonApi(None, logging.getLogger(), None, None) self.assertIn(ERROR_MESSAGE, str(exc.exception)) self.assertIn(CredTypes.EXTERNAL_ACCOUNT, logger.output[0]) def testExternalAccountAuthorizedUserCredential(self): contents = pkgutil.get_data( "gslib", "tests/test_data/test_external_account_authorized_user_credentials.json" ) tmpfile = self.CreateTempFile(contents=contents) with SetBotoConfigForTest( getBotoCredentialsConfig( external_account_authorized_user_creds=tmpfile)): client = gcs_json_api.GcsJsonApi(None, None, None, None) self.assertIsInstance(client.credentials, WrappedCredentials) @mock.patch.object(WrappedCredentials, "__init__", side_effect=ValueError(ERROR_MESSAGE)) def testExternalAccountAuthorizedUserFailure(self, _): contents = pkgutil.get_data( "gslib", "tests/test_data/test_external_account_authorized_user_credentials.json" ) tmpfile = self.CreateTempFile(contents=contents) with SetBotoConfigForTest( getBotoCredentialsConfig( external_account_authorized_user_creds=tmpfile)): with self.assertLogs() as logger: with self.assertRaises(Exception) as exc: gcs_json_api.GcsJsonApi(None, logging.getLogger(), None, None) self.assertIn(ERROR_MESSAGE, str(exc.exception)) self.assertIn(CredTypes.EXTERNAL_ACCOUNT_AUTHORIZED_USER, logger.output[0]) def testOauth2ServiceAccountAndOauth2UserCredential(self): with SetBotoConfigForTest( getBotoCredentialsConfig(user_account_creds="?", service_account_creds={ "keyfile": "?", "client_id": "?", })): with self.assertRaises(CommandException): gcs_json_api.GcsJsonApi(None, None, None, None)