Skip to content

Commit 712e6fd

Browse files
authored
feat: google drive error resolution (onyx-dot-app#9842)
1 parent f1a9a3b commit 712e6fd

4 files changed

Lines changed: 433 additions & 2 deletions

File tree

backend/onyx/connectors/google_drive/connector.py

Lines changed: 84 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,9 @@
4242
get_all_files_in_my_drive_and_shared,
4343
)
4444
from onyx.connectors.google_drive.file_retrieval import get_external_access_for_folder
45+
from onyx.connectors.google_drive.file_retrieval import (
46+
get_files_by_web_view_links_batch,
47+
)
4548
from onyx.connectors.google_drive.file_retrieval import get_files_in_shared_drive
4649
from onyx.connectors.google_drive.file_retrieval import get_folder_metadata
4750
from onyx.connectors.google_drive.file_retrieval import get_root_folder_id
@@ -70,11 +73,13 @@
7073
from onyx.connectors.interfaces import CheckpointOutput
7174
from onyx.connectors.interfaces import GenerateSlimDocumentOutput
7275
from onyx.connectors.interfaces import NormalizationResult
76+
from onyx.connectors.interfaces import Resolver
7377
from onyx.connectors.interfaces import SecondsSinceUnixEpoch
7478
from onyx.connectors.interfaces import SlimConnectorWithPermSync
7579
from onyx.connectors.models import ConnectorFailure
7680
from onyx.connectors.models import ConnectorMissingCredentialError
7781
from onyx.connectors.models import Document
82+
from onyx.connectors.models import DocumentFailure
7883
from onyx.connectors.models import EntityFailure
7984
from onyx.connectors.models import HierarchyNode
8085
from onyx.connectors.models import SlimDocument
@@ -202,7 +207,9 @@ class DriveIdStatus(Enum):
202207

203208

204209
class GoogleDriveConnector(
205-
SlimConnectorWithPermSync, CheckpointedConnectorWithPermSync[GoogleDriveCheckpoint]
210+
SlimConnectorWithPermSync,
211+
CheckpointedConnectorWithPermSync[GoogleDriveCheckpoint],
212+
Resolver,
206213
):
207214
def __init__(
208215
self,
@@ -1665,6 +1672,82 @@ def load_from_checkpoint_with_perm_sync(
16651672
start, end, checkpoint, include_permissions=True
16661673
)
16671674

1675+
@override
1676+
def resolve_errors(
1677+
self,
1678+
errors: list[ConnectorFailure],
1679+
include_permissions: bool = False,
1680+
) -> Generator[Document | ConnectorFailure | HierarchyNode, None, None]:
1681+
if self._creds is None or self._primary_admin_email is None:
1682+
raise RuntimeError(
1683+
"Credentials missing, should not call this method before calling load_credentials"
1684+
)
1685+
1686+
logger.info(f"Resolving {len(errors)} errors")
1687+
doc_ids = [
1688+
failure.failed_document.document_id
1689+
for failure in errors
1690+
if failure.failed_document
1691+
]
1692+
service = get_drive_service(self.creds, self.primary_admin_email)
1693+
field_type = (
1694+
DriveFileFieldType.WITH_PERMISSIONS
1695+
if include_permissions or self.exclude_domain_link_only
1696+
else DriveFileFieldType.STANDARD
1697+
)
1698+
batch_result = get_files_by_web_view_links_batch(service, doc_ids, field_type)
1699+
1700+
for doc_id, error in batch_result.errors.items():
1701+
yield ConnectorFailure(
1702+
failed_document=DocumentFailure(
1703+
document_id=doc_id,
1704+
document_link=doc_id,
1705+
),
1706+
failure_message=f"Failed to retrieve file during error resolution: {error}",
1707+
exception=error,
1708+
)
1709+
1710+
permission_sync_context = (
1711+
PermissionSyncContext(
1712+
primary_admin_email=self.primary_admin_email,
1713+
google_domain=self.google_domain,
1714+
)
1715+
if include_permissions
1716+
else None
1717+
)
1718+
1719+
retrieved_files = [
1720+
RetrievedDriveFile(
1721+
drive_file=file,
1722+
user_email=self.primary_admin_email,
1723+
completion_stage=DriveRetrievalStage.DONE,
1724+
)
1725+
for file in batch_result.files.values()
1726+
]
1727+
1728+
yield from self._get_new_ancestors_for_files(
1729+
files=retrieved_files,
1730+
seen_hierarchy_node_raw_ids=ThreadSafeSet(),
1731+
fully_walked_hierarchy_node_raw_ids=ThreadSafeSet(),
1732+
permission_sync_context=permission_sync_context,
1733+
add_prefix=True,
1734+
)
1735+
1736+
func_with_args = [
1737+
(
1738+
self._convert_retrieved_file_to_document,
1739+
(rf, permission_sync_context),
1740+
)
1741+
for rf in retrieved_files
1742+
]
1743+
results = cast(
1744+
list[Document | ConnectorFailure | None],
1745+
run_functions_tuples_in_parallel(func_with_args, max_workers=8),
1746+
)
1747+
for result in results:
1748+
if result is not None:
1749+
yield result
1750+
16681751
def _extract_slim_docs_from_google_drive(
16691752
self,
16701753
checkpoint: GoogleDriveCheckpoint,

backend/onyx/connectors/google_drive/file_retrieval.py

Lines changed: 94 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
from googleapiclient.discovery import Resource # type: ignore
1111
from googleapiclient.errors import HttpError # type: ignore
12+
from googleapiclient.http import BatchHttpRequest # type: ignore
1213

1314
from onyx.access.models import ExternalAccess
1415
from onyx.connectors.google_drive.constants import DRIVE_FOLDER_TYPE
@@ -60,6 +61,8 @@ class DriveFileFieldType(Enum):
6061
)
6162
FOLDER_FIELDS = "nextPageToken, files(id, name, permissions, modifiedTime, webViewLink, shortcutDetails)"
6263

64+
MAX_BATCH_SIZE = 100
65+
6366
HIERARCHY_FIELDS = "id, name, parents, webViewLink, mimeType, driveId"
6467

6568
HIERARCHY_FIELDS_WITH_PERMISSIONS = (
@@ -216,7 +219,7 @@ def get_external_access_for_folder(
216219

217220

218221
def _get_fields_for_file_type(field_type: DriveFileFieldType) -> str:
219-
"""Get the appropriate fields string based on the field type enum"""
222+
"""Get the appropriate fields string for files().list() based on the field type enum."""
220223
if field_type == DriveFileFieldType.SLIM:
221224
return SLIM_FILE_FIELDS
222225
elif field_type == DriveFileFieldType.WITH_PERMISSIONS:
@@ -225,6 +228,25 @@ def _get_fields_for_file_type(field_type: DriveFileFieldType) -> str:
225228
return FILE_FIELDS
226229

227230

231+
def _extract_single_file_fields(list_fields: str) -> str:
232+
"""Convert a files().list() fields string to one suitable for files().get().
233+
234+
List fields look like "nextPageToken, files(field1, field2, ...)"
235+
Single-file fields should be just "field1, field2, ..."
236+
"""
237+
start = list_fields.find("files(")
238+
if start == -1:
239+
return list_fields
240+
inner_start = start + len("files(")
241+
inner_end = list_fields.rfind(")")
242+
return list_fields[inner_start:inner_end]
243+
244+
245+
def _get_single_file_fields(field_type: DriveFileFieldType) -> str:
246+
"""Get the appropriate fields string for files().get() based on the field type enum."""
247+
return _extract_single_file_fields(_get_fields_for_file_type(field_type))
248+
249+
228250
def _get_files_in_parent(
229251
service: Resource,
230252
parent_id: str,
@@ -536,3 +558,74 @@ def get_file_by_web_view_link(
536558
)
537559
.execute()
538560
)
561+
562+
563+
class BatchRetrievalResult:
564+
"""Result of a batch file retrieval, separating successes from errors."""
565+
566+
def __init__(self) -> None:
567+
self.files: dict[str, GoogleDriveFileType] = {}
568+
self.errors: dict[str, Exception] = {}
569+
570+
571+
def get_files_by_web_view_links_batch(
572+
service: GoogleDriveService,
573+
web_view_links: list[str],
574+
field_type: DriveFileFieldType,
575+
) -> BatchRetrievalResult:
576+
"""Retrieve multiple Google Drive files by webViewLink using the batch API.
577+
578+
Returns a BatchRetrievalResult containing successful file retrievals
579+
and errors for any files that could not be fetched.
580+
Automatically splits into chunks of MAX_BATCH_SIZE.
581+
"""
582+
fields = _get_single_file_fields(field_type)
583+
if len(web_view_links) <= MAX_BATCH_SIZE:
584+
return _get_files_by_web_view_links_batch(service, web_view_links, fields)
585+
586+
combined = BatchRetrievalResult()
587+
for i in range(0, len(web_view_links), MAX_BATCH_SIZE):
588+
chunk = web_view_links[i : i + MAX_BATCH_SIZE]
589+
chunk_result = _get_files_by_web_view_links_batch(service, chunk, fields)
590+
combined.files.update(chunk_result.files)
591+
combined.errors.update(chunk_result.errors)
592+
return combined
593+
594+
595+
def _get_files_by_web_view_links_batch(
596+
service: GoogleDriveService,
597+
web_view_links: list[str],
598+
fields: str,
599+
) -> BatchRetrievalResult:
600+
"""Single-batch implementation."""
601+
602+
result = BatchRetrievalResult()
603+
604+
def callback(
605+
request_id: str,
606+
response: GoogleDriveFileType,
607+
exception: Exception | None,
608+
) -> None:
609+
if exception:
610+
logger.warning(f"Error retrieving file {request_id}: {exception}")
611+
result.errors[request_id] = exception
612+
else:
613+
result.files[request_id] = response
614+
615+
batch = cast(BatchHttpRequest, service.new_batch_http_request(callback=callback))
616+
617+
for web_view_link in web_view_links:
618+
try:
619+
file_id = _extract_file_id_from_web_view_link(web_view_link)
620+
request = service.files().get(
621+
fileId=file_id,
622+
supportsAllDrives=True,
623+
fields=fields,
624+
)
625+
batch.add(request, request_id=web_view_link)
626+
except ValueError as e:
627+
logger.warning(f"Failed to extract file ID from {web_view_link}: {e}")
628+
result.errors[web_view_link] = e
629+
630+
batch.execute()
631+
return result

backend/onyx/connectors/interfaces.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -298,6 +298,22 @@ def load_from_checkpoint_with_perm_sync(
298298
raise NotImplementedError
299299

300300

301+
class Resolver(BaseConnector):
302+
@abc.abstractmethod
303+
def resolve_errors(
304+
self,
305+
errors: list[ConnectorFailure],
306+
include_permissions: bool = False,
307+
) -> Generator[Document | ConnectorFailure | HierarchyNode, None, None]:
308+
"""Attempts to yield back ALL the documents described by the errors, no checkpointing.
309+
310+
Caller's responsibility is to delete the old ConnectorFailures and replace with the new ones.
311+
If include_permissions is True, the documents will have permissions synced.
312+
May also yield HierarchyNode objects for ancestor folders of resolved documents.
313+
"""
314+
raise NotImplementedError
315+
316+
301317
class HierarchyConnector(BaseConnector):
302318
@abc.abstractmethod
303319
def load_hierarchy(

0 commit comments

Comments
 (0)