feat: add progress_data to worker_metadata#202
Conversation
Started returning `progress_data` in `worker_metadata`.
radovanjorgic
left a comment
There was a problem hiding this comment.
No tests at all? :/
| const itemsToUpload = batch || this.items; | ||
|
|
||
| if (itemsToUpload.length > 0) { | ||
| for (const item of itemsToUpload) { |
There was a problem hiding this comment.
Huh, I have few questions:
- Do we really need to do this for each item? How is that from performance perspective? Let's say you have 100+ repos at once each with 5000 items in it?
- What if timeout comes at this point?
- Can we offload this work to backend? After storing the files, extractor-adapter/snap-in manager scans them and picks progress from there.
There was a problem hiding this comment.
For the #3, we'll need @GasperSenk 's input, but I don't think so.
There was a problem hiding this comment.
No, adapter doesn't know the normalization function to find the correct field name for the dates.
| if (itemsToUpload.length > 0) { | ||
| for (const item of itemsToUpload) { | ||
| if ( | ||
| item != null && |
There was a problem hiding this comment.
Simply just if (item?.created_date)?
There was a problem hiding this comment.
This didn't work before I added created_date to the NormalizedAttachment.
Good catch!
| 'created_date' in item && | ||
| item.created_date != null | ||
| ) { | ||
| const created_date = new Date(item['created_date']).getTime(); |
There was a problem hiding this comment.
Don't use snake case for variable names please.
| min: 0, | ||
| max: 0, |
There was a problem hiding this comment.
What do min and max mean here? Maybe we should use oldest and newest instead?
There was a problem hiding this comment.
These are min and max timestamps. Due to them being numbers, we decided to go with the min and max.
Discussed in the ISS comments.
There was a problem hiding this comment.
Might make sense to use newest and oldest, we do that in the backend, but I see why you would use min and max when you are dealing with longs.
| eventType: ExtractorEventType | LoaderEventType; | ||
| data?: EventData; | ||
| worker_metadata?: { | ||
| progress_data: Record<string, { min: number; max: number }>; |
| newEventType == ExtractorEventType.AttachmentExtractionProgress) | ||
| ) { | ||
| for (const repo of this.repos) { | ||
| itemTimestamps[repo.itemType] = repo.itemTimestamps; |
There was a problem hiding this comment.
We said we only want one entry for progress.
The latest from the list.
| public uploadedArtifacts: Artifact[]; | ||
| public itemTimestamps: { min: number; max: number } = { | ||
| min: 0, | ||
| max: 0, |
There was a problem hiding this comment.
We said to have a single record type and contain created and modified dates.
This PR flattens extraction progress metadata into
worker_metadataand adds timestamp bounds the platform can use to detect incremental syncs that are not advancing.NormalizedAttachment.created_dateandNormalizedAttachment.modified_date(optional): attachments can now contribute source timestamps the same way normalized items do.Repo.dateRanges: each repo now tracks the oldest/newest validcreated_dateandmodified_dateseen across uploaded batches.worker_metadataprogress fields: extraction progress/done events now send the latest extracted repo’sitem_type,oldest_created_date,newest_created_date,oldest_modified_date, andnewest_modified_date.worker_metadata:oldest_state_dateandnewest_state_dateare copied fromevent_context.extract_from/extract_toon emitted callback payloads.Connected Issues
Checklist
npm run testOR no tests needed.npm run test:backwards-compatibility.npm run lint.airdrop-templatelinked to this PR.Migration note
If your connector normalizes attachments, populate
created_dateand/ormodified_dateon eachNormalizedAttachmentusing the source system timestamp in RFC3339 format.Notes:
created_date/modified_date; no connector-side change is needed there.What
worker_metadatanow containsworker_metadatais part of the callback payload. It is not insideevent_data.Repo progress fields
These fields are attached only for:
DATA_EXTRACTION_PROGRESSDATA_EXTRACTION_DONEATTACHMENT_EXTRACTION_PROGRESSATTACHMENT_EXTRACTION_DONEShape:
Behavior:
Example:
{ "worker_metadata": { "item_type": "issues", "oldest_created_date": "2024-01-01T00:00:00.000Z", "newest_created_date": "2024-05-01T00:00:00.000Z", "oldest_modified_date": "2024-02-01T00:00:00.000Z", "newest_modified_date": "2024-06-01T00:00:00.000Z", "oldest_state_date": "2024-01-01T00:00:00.000Z", "newest_state_date": "2024-06-01T00:00:00.000Z", "adaas_library_version": "..." } }How repo progress fields are computed
Repo.upload(), the SDK scans uploaded items for validcreated_dateandmodified_datevalues.dateRanges.creationDateanddateRanges.modifiedDate.WorkerAdapteremits the bounds for the repo identified by the latest uploadeditemType.How state window fields are computed
oldest_state_dateis copied fromevent_context.extract_from.newest_state_dateis copied fromevent_context.extract_to.control-protocol.emit()and override caller-provided state-date values.Why this exists
The platform can compare the emitted repo timestamp bounds together with the absolute extraction window. If repeated runs keep reporting the same latest-item bounds and state window, that is a signal that the incremental sync is not making forward progress.