-
Notifications
You must be signed in to change notification settings - Fork 117
adlfs user agent #501
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
adlfs user agent #501
Changes from 6 commits
9f890a5
d261085
871890c
9002f5e
eec8b18
f386454
c5b799e
2b93c24
273b32c
fd7275b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3,7 +3,7 @@ | |
| Unreleased | ||
| ---------- | ||
|
|
||
| - . | ||
| - Add adlfs user agent | ||
|
|
||
|
|
||
| 2024.12.0 | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,11 +1,5 @@ | ||
| from .gen1 import AzureDatalakeFileSystem | ||
| from .spec import AzureBlobFile, AzureBlobFileSystem | ||
| from .utils import __version__, version_tuple # noqa: F401 | ||
|
|
||
| __all__ = ["AzureBlobFileSystem", "AzureBlobFile", "AzureDatalakeFileSystem"] | ||
|
|
||
| try: | ||
| from ._version import version as __version__ # type: ignore[import] | ||
| from ._version import version_tuple # type: ignore[import] | ||
| except ImportError: | ||
| __version__ = "UNKNOWN" | ||
| version_tuple = (0, 0, __version__) # type: ignore[assignment] |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -38,6 +38,7 @@ | |
| from fsspec.utils import infer_storage_options | ||
|
|
||
| from .utils import ( | ||
| __version__, | ||
| close_container_client, | ||
| close_credential, | ||
| close_service_client, | ||
|
|
@@ -72,6 +73,8 @@ | |
|
|
||
| _SOCKET_TIMEOUT_DEFAULT = object() | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should also make sure to add a changelog entry for this update here: https://github.com/fsspec/adlfs/blob/main/CHANGELOG.md |
||
|
|
||
| _USER_AGENT = f"adlfs/{__version__}" | ||
|
|
||
|
|
||
| # https://github.com/Azure/azure-sdk-for-python/issues/11419#issuecomment-628143480 | ||
| def make_callback(key, callback): | ||
|
|
@@ -118,6 +121,31 @@ def _coalesce_version_id(*args) -> Optional[str]: | |
| return version_ids.pop() | ||
|
|
||
|
|
||
| def _create_aio_blob_service_client( | ||
| account_url: str, | ||
| location_mode: Optional[str] = None, | ||
| credential: Optional[str] = None, | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm technically this is not the correct type; it can be more than a string. But it looks like the initializer for the filesystem use this type as well. I think it is fine to keep it as is but it may be worth doing a pass at a later time to add correct typing to the project and integrate |
||
| ) -> AIOBlobServiceClient: | ||
| service_client_kwargs = { | ||
| "account_url": account_url, | ||
| "user_agent": _USER_AGENT, | ||
| } | ||
| if credential is not None: | ||
| service_client_kwargs["credential"] = credential | ||
| elif location_mode is not None: | ||
|
||
| service_client_kwargs["_location_mode"] = location_mode | ||
| return AIOBlobServiceClient(**service_client_kwargs) | ||
|
|
||
|
|
||
| def _create_aio_blob_service_client_from_connection_string( | ||
| connection_string: str, | ||
| ) -> AIOBlobServiceClient: | ||
| return AIOBlobServiceClient.from_connection_string( | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just a check from my ignorance: does this ever happen to happen within an async context? Other storage services with async functionality and resources can be sensitive to that kind of thing.
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @martindurant it looks like the service clients are not created inside any async contexts; they are really only created at initialization time or the explicit call to a connect method is made, which don't leverage any async or I/O. Generally, any async operation should happen only after the clients are setup. I'm curious what sort of sensitivity you are referring to? Any prior GitHub issues or examples would be helpful for illustration. Mainly I do want to confirm it is not an issue in
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It may have been specific to aiohttp and aiobotocore - they expect their clients to be initialised via a coroutine. I don't know if they actually do anything async in that call.
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Gotcha. Yeah I'd imagine with botocore/aiobotocore creating a client would need to have async involved; there's quite a bit of I/O when a botocore client gets created (e.g., reading model files from disk to dynamically create the client class, reading configuration from disk, etc.). The async azure SDK client does not really have any I/O operations as part of client creation and so it does not use coroutines to instantiate it. So, I think the current approach to client creation (before and after) this PR should be fine. |
||
| conn_str=connection_string, | ||
| user_agent=_USER_AGENT, | ||
| ) | ||
|
|
||
|
|
||
| class AzureBlobFileSystem(AsyncFileSystem): | ||
| """ | ||
| Access Azure Datalake Gen2 and Azure Storage if it were a file system using Multiprotocol Access | ||
|
|
@@ -473,12 +501,17 @@ def do_connect(self): | |
|
|
||
| try: | ||
| if self.connection_string is not None: | ||
| self.service_client = AIOBlobServiceClient.from_connection_string( | ||
| conn_str=self.connection_string | ||
| self.service_client = ( | ||
| _create_aio_blob_service_client_from_connection_string( | ||
| connection_string=self.connection_string, | ||
| ) | ||
| ) | ||
| elif self.account_name is not None: | ||
| if hasattr(self, "account_host"): | ||
| self.account_url: str = f"https://{self.account_host}" | ||
| if self.account_host.startswith("http://"): | ||
| self.account_url: str = self.account_host | ||
| else: | ||
| self.account_url: str = f"https://{self.account_host}" | ||
| else: | ||
| self.account_url: str = ( | ||
| f"https://{self.account_name}.blob.core.windows.net" | ||
|
|
@@ -487,26 +520,25 @@ def do_connect(self): | |
| creds = [self.credential, self.account_key] | ||
| if any(creds): | ||
| self.service_client = [ | ||
| AIOBlobServiceClient( | ||
| _create_aio_blob_service_client( | ||
| account_url=self.account_url, | ||
| location_mode=self.location_mode, | ||
| credential=cred, | ||
| _location_mode=self.location_mode, | ||
| ) | ||
| for cred in creds | ||
| if cred is not None | ||
| ][0] | ||
| elif self.sas_token is not None: | ||
| if not self.sas_token.startswith("?"): | ||
| self.sas_token = f"?{self.sas_token}" | ||
| self.service_client = AIOBlobServiceClient( | ||
| self.service_client = _create_aio_blob_service_client( | ||
| account_url=self.account_url + self.sas_token, | ||
| credential=None, | ||
| _location_mode=self.location_mode, | ||
| location_mode=self.location_mode, | ||
| ) | ||
| else: | ||
| # Fall back to anonymous login, and assume public container | ||
| self.service_client = AIOBlobServiceClient( | ||
| account_url=self.account_url | ||
| self.service_client = _create_aio_blob_service_client( | ||
| account_url=self.account_url, | ||
| ) | ||
| else: | ||
| raise ValueError( | ||
|
|
@@ -2038,7 +2070,10 @@ def connect_client(self): | |
| """ | ||
| try: | ||
| if hasattr(self.fs, "account_host"): | ||
| self.fs.account_url: str = f"https://{self.fs.account_host}" | ||
| if self.fs.account_host.startswith("http://"): | ||
| self.account_url: str = self.fs.account_host | ||
| else: | ||
| self.fs.account_url: str = f"https://{self.fs.account_host}" | ||
| else: | ||
| self.fs.account_url: str = ( | ||
| f"https://{self.fs.account_name}.blob.core.windows.net" | ||
|
|
@@ -2047,27 +2082,28 @@ def connect_client(self): | |
| creds = [self.fs.sync_credential, self.fs.account_key, self.fs.credential] | ||
| if any(creds): | ||
| self.container_client = [ | ||
| AIOBlobServiceClient( | ||
| _create_aio_blob_service_client( | ||
| account_url=self.fs.account_url, | ||
| credential=cred, | ||
| _location_mode=self.fs.location_mode, | ||
| location_mode=self.fs.location_mode, | ||
| ).get_container_client(self.container_name) | ||
| for cred in creds | ||
| if cred is not None | ||
| ][0] | ||
| elif self.fs.connection_string is not None: | ||
| self.container_client = AIOBlobServiceClient.from_connection_string( | ||
| conn_str=self.fs.connection_string | ||
| ).get_container_client(self.container_name) | ||
| self.container_client = ( | ||
| _create_aio_blob_service_client_from_connection_string( | ||
| connection_string=self.fs.connection_string, | ||
| ).get_container_client(self.container_name) | ||
| ) | ||
| elif self.fs.sas_token is not None: | ||
| self.container_client = AIOBlobServiceClient( | ||
| account_url=self.fs.account_url + self.fs.sas_token, credential=None | ||
| self.container_client = _create_aio_blob_service_client( | ||
| account_url=self.fs.account_url + self.fs.sas_token, | ||
| ).get_container_client(self.container_name) | ||
| else: | ||
| self.container_client = AIOBlobServiceClient( | ||
| account_url=self.fs.account_url | ||
| self.container_client = _create_aio_blob_service_client( | ||
| account_url=self.fs.account_url, | ||
| ).get_container_client(self.container_name) | ||
|
|
||
| except Exception as e: | ||
| raise ValueError( | ||
| f"Unable to fetch container_client with provided params for {e}!!" | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,107 @@ | ||
| import datetime | ||
|
|
||
| import azure.storage.blob | ||
| import pytest | ||
| from azure.storage.blob import BlobServiceClient, PublicAccess | ||
| from azure.storage.blob.aio import BlobServiceClient as AIOBlobServiceClient | ||
|
|
||
| from adlfs import AzureBlobFile, AzureBlobFileSystem | ||
| from adlfs.utils import __version__ as __version__ | ||
|
|
||
| URL = "http://127.0.0.1:10000" | ||
| ACCOUNT_NAME = "devstoreaccount1" | ||
| KEY = "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==" # NOQA | ||
| CONN_STR = f"DefaultEndpointsProtocol=http;AccountName={ACCOUNT_NAME};AccountKey={KEY};BlobEndpoint={URL}/{ACCOUNT_NAME};" # NOQA | ||
| DEFAULT_VERSION_ID = "1970-01-01T00:00:00.0000000Z" | ||
| LATEST_VERSION_ID = "2022-01-01T00:00:00.0000000Z" | ||
|
||
|
|
||
| _USER_AGENT = f"adlfs/{__version__}" | ||
|
|
||
|
|
||
| def assert_sets_adlfs_user_agent(mock_client): | ||
|
||
| mock_client.assert_called_once() | ||
|
||
| assert "user_agent" in mock_client.call_args.kwargs | ||
| assert mock_client.call_args.kwargs["user_agent"] == _USER_AGENT | ||
|
||
|
|
||
|
|
||
| @pytest.fixture() | ||
| def mock_from_connection_string(mocker): | ||
| return mocker.patch.object( | ||
| AIOBlobServiceClient, | ||
| "from_connection_string", | ||
| autospec=True, | ||
| side_effect=AIOBlobServiceClient.from_connection_string, | ||
| ) | ||
|
|
||
|
|
||
| @pytest.fixture() | ||
| def mock_service_client_init(mocker): | ||
| return mocker.patch.object( | ||
| AIOBlobServiceClient, | ||
| "__init__", | ||
| autospec=True, | ||
| side_effect=AIOBlobServiceClient.__init__, | ||
| ) | ||
|
|
||
|
|
||
| @pytest.fixture(scope="function") | ||
| def mock_container(host): | ||
| conn_str = f"DefaultEndpointsProtocol=http;AccountName={ACCOUNT_NAME};AccountKey={KEY};BlobEndpoint={URL}/{ACCOUNT_NAME};" # NOQA | ||
|
|
||
| bbs = BlobServiceClient.from_connection_string(conn_str=conn_str) | ||
| if "data2" not in [c["name"] for c in bbs.list_containers()]: | ||
| bbs.create_container("data2", public_access=PublicAccess.Container) | ||
| container_client = bbs.get_container_client(container="data2") | ||
| bbs.insert_time = datetime.datetime.now(tz=datetime.timezone.utc).replace( | ||
| microsecond=0 | ||
| ) | ||
| container_client.upload_blob("root/a/file.txt", b"0123456789") | ||
|
|
||
| yield bbs | ||
|
|
||
| bbs.delete_container("data2") | ||
|
||
|
|
||
|
|
||
| def test_user_agent_blob_file_connection_str( | ||
| storage: azure.storage.blob.BlobServiceClient, mock_from_connection_string | ||
| ): | ||
| fs = AzureBlobFileSystem( | ||
| account_name=storage.account_name, | ||
| connection_string=CONN_STR, | ||
| skip_instance_cache=True, | ||
| ) | ||
| AzureBlobFile(fs, "data/root/a/file.txt", mode="rb") | ||
|
|
||
| assert_sets_adlfs_user_agent(mock_from_connection_string) | ||
|
|
||
|
|
||
| def test_user_agent_blob_file_initializer(mock_container, mock_service_client_init): | ||
| fs = AzureBlobFileSystem( | ||
| account_name=ACCOUNT_NAME, | ||
| account_host="http://127.0.0.1:10000/devstoreaccount1", | ||
| account_key=KEY, | ||
| skip_instance_cache=True, | ||
| ) | ||
| AzureBlobFile(fs, "data2/root/a/file.txt", mode="rb") | ||
| assert_sets_adlfs_user_agent(mock_service_client_init) | ||
|
||
|
|
||
|
|
||
| def test_user_agent_connection_str( | ||
| storage: azure.storage.blob.BlobServiceClient, mock_from_connection_string | ||
| ): | ||
| AzureBlobFileSystem( | ||
| account_name=storage.account_name, | ||
| connection_string=CONN_STR, | ||
| skip_instance_cache=True, | ||
| ) | ||
| assert_sets_adlfs_user_agent(mock_from_connection_string) | ||
|
|
||
|
|
||
| def test_user_agent_initializer( | ||
| storage: azure.storage.blob.BlobServiceClient, mock_service_client_init | ||
| ): | ||
| AzureBlobFileSystem( | ||
| account_name=storage.account_name, | ||
| skip_instance_cache=True, | ||
| ) | ||
| assert_sets_adlfs_user_agent(mock_service_client_init) | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's expand this out to be a little more specific of how the user agent changed to help users understand the change better. Maybe something like?