-
-
Notifications
You must be signed in to change notification settings - Fork 35
Expand file tree
/
Copy pathpreload.go
More file actions
318 lines (277 loc) · 9.32 KB
/
preload.go
File metadata and controls
318 lines (277 loc) · 9.32 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
package deck
import (
"bytes"
"context"
"fmt"
"log/slog"
"slices"
"sync"
"time"
"github.com/k1LoW/errors"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"
"google.golang.org/api/drive/v3"
"google.golang.org/api/slides/v1"
)
const maxPreloadWorkersNum = 4
// currentImageData holds the result of parallel image fetching.
type currentImageData struct {
currentImages []*Image
currentImageObjectIDMap map[*Image]string
}
// imageToPreload holds image information with slide context.
type imageToPreload struct {
slideIndex int
imageIndex int // index within the slide
existingURL string // URL of existing image
objectID string // objectID of existing image
isFromMarkdown bool // whether this image is from markdown
externalLink string // external link associated with the image, if any
}
// imageResult holds the result of image processing.
type imageResult struct {
slideIndex int
imageIndex int
image *Image
objectID string
}
// preloadCurrentImages pre-fetches current images for all slides that will be processed.
func (d *Deck) preloadCurrentImages(ctx context.Context, actions []*action) (map[int]*currentImageData, error) {
result := make(map[int]*currentImageData)
// Collect all images that need preloading
var imagesToPreload []imageToPreload
for _, action := range actions {
switch action.actionType {
case actionTypeUpdate:
// Extract existing images from the current slide
if action.index < len(d.presentation.Slides) {
currentSlide := d.presentation.Slides[action.index]
imageIndexInSlide := 0
for _, element := range currentSlide.PageElements {
if element.Image != nil && element.Image.Placeholder == nil && element.Image.ContentUrl != "" {
imagesToPreload = append(imagesToPreload, imageToPreload{
slideIndex: action.index,
imageIndex: imageIndexInSlide,
existingURL: element.Image.ContentUrl,
objectID: element.ObjectId,
isFromMarkdown: element.Description == descriptionImageFromMarkdown,
externalLink: func(img *slides.Image) string {
if img.ImageProperties != nil && img.ImageProperties.Link != nil {
return img.ImageProperties.Link.Url
}
return ""
}(element.Image),
})
imageIndexInSlide++
}
}
}
}
}
if len(imagesToPreload) == 0 {
return result, nil
}
d.logger.Info("preloading current images", slog.Int("count", len(imagesToPreload)))
// Process images in parallel
sem := semaphore.NewWeighted(maxPreloadWorkersNum)
eg, ctx := errgroup.WithContext(ctx)
resultCh := make(chan imageResult, len(imagesToPreload))
for _, imgToPreload := range imagesToPreload {
eg.Go(func() error {
// Try to acquire semaphore
if err := sem.Acquire(ctx, 1); err != nil {
return err
}
defer sem.Release(1)
var image *Image
var err error
// Create Image from existing URL
if imgToPreload.isFromMarkdown {
image, err = NewImageFromMarkdown(imgToPreload.existingURL)
} else {
image, err = NewImage(imgToPreload.existingURL)
}
if err != nil {
return fmt.Errorf("failed to preload image from URL %s: %w", imgToPreload.existingURL, err)
}
image.link = imgToPreload.externalLink
resultCh <- imageResult{
slideIndex: imgToPreload.slideIndex,
imageIndex: imgToPreload.imageIndex,
image: image,
objectID: imgToPreload.objectID,
}
return nil
})
}
if err := eg.Wait(); err != nil {
return nil, fmt.Errorf("failed to preload images: %w", err)
}
close(resultCh)
// Collect results and build currentImageData directly with proper ordering
for res := range resultCh {
if res.image != nil {
if result[res.slideIndex] == nil {
result[res.slideIndex] = ¤tImageData{
currentImages: []*Image{},
currentImageObjectIDMap: map[*Image]string{},
}
}
// Resize currentImages slice if needed
if len(result[res.slideIndex].currentImages) <= res.imageIndex {
newSize := res.imageIndex + 1
newSlice := make([]*Image, newSize)
copy(newSlice, result[res.slideIndex].currentImages)
result[res.slideIndex].currentImages = newSlice
}
// Place image at the correct index
result[res.slideIndex].currentImages[res.imageIndex] = res.image
result[res.slideIndex].currentImageObjectIDMap[res.image] = res.objectID
}
}
d.logger.Info("preloaded current images")
return result, nil
}
// uploadedImageInfo holds information about uploaded images for cleanup.
type uploadedImageInfo struct {
uploadedID string
image *Image
}
// startUploadingImages starts uploading new images asynchronously and returns a channel for cleanup.
func (d *Deck) startUploadingImages(
ctx context.Context, actions []*action, currentImages map[int]*currentImageData) <-chan uploadedImageInfo {
// Collect all images that need uploading
var imagesToUpload []*Image
for _, action := range actions {
switch action.actionType {
case actionTypeUpdate, actionTypeAppend:
if action.slide == nil {
continue
}
for _, image := range action.slide.Images {
// Check if this image already exists in current images
var found bool
if currentImagesForSlide, exists := currentImages[action.index]; exists {
found = slices.ContainsFunc(currentImagesForSlide.currentImages, func(currentImage *Image) bool {
return currentImage.Equivalent(image)
})
}
if !found && image.IsUploadNeeded() && !slices.Contains(imagesToUpload, image) {
imagesToUpload = append(imagesToUpload, image)
}
}
}
}
// Create channel for uploaded image IDs
uploadedCh := make(chan uploadedImageInfo, len(imagesToUpload))
if len(imagesToUpload) == 0 {
close(uploadedCh)
return uploadedCh
}
d.logger.Info("starting image upload", slog.Int("count", len(imagesToUpload)))
// Mark all images as upload in progress
for _, image := range imagesToUpload {
image.StartUpload()
}
// Start uploading images asynchronously
go func() {
// Process images in parallel
sem := semaphore.NewWeighted(maxPreloadWorkersNum)
eg, ctx := errgroup.WithContext(ctx)
for _, image := range imagesToUpload {
eg.Go(func() error {
if err := sem.Acquire(ctx, 1); err != nil {
// Context canceled, set upload error on remaining images
image.SetUploadResult("", err)
return err
}
defer sem.Release(1)
// Upload image to Google Drive
df := &drive.File{
Name: fmt.Sprintf("________tmp-for-deck-%s", time.Now().Format(time.RFC3339)),
MimeType: string(image.mimeType),
}
if d.folderID != "" {
df.Parents = []string{d.folderID}
}
uploaded, err := d.driveSrv.Files.Create(df).Media(bytes.NewBuffer(image.Bytes())).SupportsAllDrives(true).Do()
if err != nil {
image.SetUploadResult("", fmt.Errorf("failed to upload image: %w", err))
return err
}
defer func() {
if err != nil {
// Clean up uploaded file on error
if deleteErr := d.deleteOrTrashFile(ctx, uploaded.Id); deleteErr != nil {
err = errors.Join(err, deleteErr)
}
}
}()
// To specify a URL for CreateImageRequest, we must make the webContentURL readable to anyone
// and configure the necessary permissions for this purpose.
if err := d.AllowReadingByAnyone(ctx, uploaded.Id); err != nil {
image.SetUploadResult("", fmt.Errorf("failed to set permission for image: %w", err))
return err
}
// Get webContentLink
f, err := d.driveSrv.Files.Get(uploaded.Id).Fields("webContentLink").SupportsAllDrives(true).Do()
if err != nil {
image.SetUploadResult("", fmt.Errorf("failed to get webContentLink for image: %w", err))
return err
}
if f.WebContentLink == "" {
image.SetUploadResult("", fmt.Errorf("webContentLink is empty for image: %s", uploaded.Id))
return err
}
// Set successful upload result
image.SetUploadResult(f.WebContentLink, nil)
uploadedCh <- uploadedImageInfo{uploadedID: uploaded.Id, image: image}
return nil
})
}
// Wait for all workers to complete
if err := eg.Wait(); err != nil {
d.logger.Error("failed to upload images", slog.Any("error", err))
}
// Close the channel when all uploads are done
close(uploadedCh)
}()
return uploadedCh
}
// cleanupUploadedImages deletes uploaded images in parallel.
func (d *Deck) cleanupUploadedImages(ctx context.Context, uploadedCh <-chan uploadedImageInfo) error {
sem := semaphore.NewWeighted(maxPreloadWorkersNum)
var wg sync.WaitGroup
for {
select {
case info, ok := <-uploadedCh:
if !ok {
// Channel closed, wait for all deletions to complete
wg.Wait()
return nil
}
// Try to acquire semaphore
if err := sem.Acquire(ctx, 1); err != nil {
return fmt.Errorf("failed to acquire semaphore: %w", err)
}
wg.Add(1)
go func(info uploadedImageInfo) {
defer func() {
sem.Release(1)
wg.Done()
}()
// Delete uploaded image from Google Drive
// Note: We only log errors here instead of returning them to ensure
// all images are attempted to be deleted. A single deletion failure
// should not prevent cleanup of other successfully uploaded images.
if err := d.deleteOrTrashFile(ctx, info.uploadedID); err != nil {
d.logger.Error("failed to delete uploaded image",
slog.String("id", info.uploadedID),
slog.Any("error", err))
}
}(info)
case <-ctx.Done():
return ctx.Err()
}
}
}