Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,4 @@ __pycache__/

# Claude Code
.claude/
.mcp.json
106 changes: 103 additions & 3 deletions cmd/msgvault/cmd/syncfull.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ var (
syncBefore string
syncAfter string
syncLimit int
syncClean bool
syncCleanYes bool
)

var syncFullCmd = &cobra.Command{
Expand All @@ -37,18 +39,30 @@ Date filters:
--after 2024-01-01 Only messages on or after this date
--before 2024-12-31 Only messages before this date

Clean sync:
--clean Delete all local data for the account and re-sync
from scratch. Use this to reset staging/deletion
state or recover from a corrupted local database.
Requires confirmation (use --yes to skip).

Examples:
msgvault sync-full # Sync all accounts
msgvault sync-full you@gmail.com
msgvault sync-full you@gmail.com --after 2024-01-01
msgvault sync-full you@gmail.com --query "from:someone@example.com"
msgvault sync-full you@gmail.com --noresume # Force fresh sync`,
msgvault sync-full you@gmail.com --noresume # Force fresh sync
msgvault sync-full you@gmail.com --clean # Delete local data and re-sync
msgvault sync-full you@gmail.com --clean --yes # Skip confirmation`,
Args: cobra.MaximumNArgs(1),
RunE: func(cmd *cobra.Command, args []string) error {
if syncLimit < 0 {
return fmt.Errorf("--limit must be a non-negative number")
}

if syncClean && len(args) == 0 {
return fmt.Errorf("--clean requires specifying an account email")
}

// Validate config
if cfg.OAuth.ClientSecrets == "" {
return errOAuthNotConfigured()
Expand Down Expand Up @@ -100,15 +114,99 @@ Examples:
ctx, cancel := context.WithCancel(cmd.Context())
defer cancel()

// Handle Ctrl+C gracefully
// Handle Ctrl+C gracefully (first = graceful, second = force exit)
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigChan
fmt.Println("\nInterrupted. Saving checkpoint...")
fmt.Println("\nInterrupted. Saving checkpoint... (press Ctrl+C again to force quit)")
cancel()
// Wait for second signal to force exit
<-sigChan
fmt.Println("\nForce quit.")
os.Exit(1)
}()

// Handle --clean: delete all local data for the account before syncing
if syncClean {
email := emails[0] // Already validated that exactly one email is specified
source, err := s.GetSourceByIdentifier(email)
if err != nil {
return fmt.Errorf("lookup account: %w", err)
}
if source == nil {
return fmt.Errorf("account %s not found in database", email)
}

// Count what will be deleted
fmt.Printf("Preparing to clean local data for %s...\n", email)
var msgCount, convCount, labelCount int64
_ = s.DB().QueryRow("SELECT COUNT(*) FROM messages WHERE source_id = ?", source.ID).Scan(&msgCount)
_ = s.DB().QueryRow("SELECT COUNT(*) FROM conversations WHERE source_id = ?", source.ID).Scan(&convCount)
_ = s.DB().QueryRow("SELECT COUNT(*) FROM labels WHERE source_id = ?", source.ID).Scan(&labelCount)

fmt.Println()
fmt.Println("This will permanently delete from the LOCAL database:")
fmt.Printf(" • %d messages\n", msgCount)
fmt.Printf(" • %d conversations\n", convCount)
fmt.Printf(" • %d labels\n", labelCount)
fmt.Printf(" • All sync history and checkpoints\n")
fmt.Println()
fmt.Println("Note: This does NOT delete anything from Gmail.")
fmt.Println(" After cleaning, a full re-sync will download all messages again.")
fmt.Println()

// Require confirmation unless --yes is provided
if !syncCleanYes {
fmt.Print("Proceed with clean? [y/N]: ")
var response string
_, _ = fmt.Scanln(&response)
if response != "y" && response != "Y" {
fmt.Println("Cancelled.")
return nil
}
fmt.Println()
}

// Perform the clean with progress reporting
fmt.Println("Deleting local data...")
lastTable := ""
lastPrint := time.Now()
deleted, err := s.ResetSourceDataWithProgress(source.ID, func(p store.ResetProgress) {
// Throttle output to avoid spamming
if time.Since(lastPrint) < 500*time.Millisecond && p.Phase != "complete" {
return
}
lastPrint = time.Now()

switch p.Phase {
case "counting":
fmt.Printf(" Found %d messages to delete\n", p.TotalMessages)
case "deleting":
if p.CurrentTable != lastTable {
if lastTable != "" {
fmt.Println(" done")
}
fmt.Printf(" Cleaning %s...", p.CurrentTable)
lastTable = p.CurrentTable
}
if p.CurrentTable == "messages" {
pct := float64(p.DeletedMessages) / float64(p.TotalMessages) * 100
fmt.Printf("\r Cleaning messages... %d/%d (%.1f%%) ", p.DeletedMessages, p.TotalMessages, pct)
}
case "complete":
if lastTable != "" {
fmt.Println(" done")
}
}
})
if err != nil {
fmt.Println("\nClean failed:", err)
return fmt.Errorf("reset account data: %w", err)
}
fmt.Printf("Deleted %d messages from local database.\n\n", deleted)
}

var syncErrors []string
for _, email := range emails {
if ctx.Err() != nil {
Expand Down Expand Up @@ -332,5 +430,7 @@ func init() {
syncFullCmd.Flags().StringVar(&syncBefore, "before", "", "Only messages before this date (YYYY-MM-DD)")
syncFullCmd.Flags().StringVar(&syncAfter, "after", "", "Only messages after this date (YYYY-MM-DD)")
syncFullCmd.Flags().IntVar(&syncLimit, "limit", 0, "Limit number of messages (for testing)")
syncFullCmd.Flags().BoolVar(&syncClean, "clean", false, "Delete all local data for the account and re-sync from scratch")
syncFullCmd.Flags().BoolVarP(&syncCleanYes, "yes", "y", false, "Skip confirmation prompt for --clean")
rootCmd.AddCommand(syncFullCmd)
}
195 changes: 195 additions & 0 deletions internal/store/sync.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package store

import (
"context"
"database/sql"
"fmt"
"time"
Expand Down Expand Up @@ -384,3 +385,197 @@ func (s *Store) GetSourceByIdentifier(identifier string) (*Source, error) {

return source, nil
}

// ResetProgress reports progress during a reset operation.
type ResetProgress struct {
Phase string // "counting", "message_bodies", "message_raw", etc.
TotalMessages int64 // Total messages to delete
DeletedMessages int64 // Messages deleted so far
CurrentTable string // Table currently being processed
RowsInBatch int64 // Rows deleted in this batch
}

// ResetProgressFunc is called periodically during reset to report progress.
type ResetProgressFunc func(p ResetProgress)

// ResetSourceData deletes all synced data for a source while keeping the source
// entry itself. This allows a clean re-sync from Gmail without losing the account
// configuration. Returns the number of messages deleted.
func (s *Store) ResetSourceData(sourceID int64) (int64, error) {
return s.ResetSourceDataWithProgress(sourceID, nil)
}

// ResetSourceDataWithProgress is like ResetSourceData but reports progress via callback.
// Uses batched deletes with FK checks disabled for much better performance.
func (s *Store) ResetSourceDataWithProgress(sourceID int64, progress ResetProgressFunc) (int64, error) {
if progress == nil {
progress = func(ResetProgress) {} // no-op
}

const batchSize = 5000
ctx := context.Background()

// Count messages first
var totalMessages int64
if err := s.db.QueryRow("SELECT COUNT(*) FROM messages WHERE source_id = ?", sourceID).Scan(&totalMessages); err != nil {
return 0, fmt.Errorf("count messages: %w", err)
}

progress(ResetProgress{Phase: "counting", TotalMessages: totalMessages})

// Use a dedicated connection to ensure PRAGMA applies to all our operations.
// This is critical because *sql.DB is a connection pool - without this,
// PRAGMA foreign_keys = OFF might run on a different connection than deletes.
conn, err := s.db.Conn(ctx)
if err != nil {
return 0, fmt.Errorf("acquire connection: %w", err)
}
defer conn.Close()

// Disable foreign keys for bulk delete performance on this connection
if _, err := conn.ExecContext(ctx, "PRAGMA foreign_keys = OFF"); err != nil {
return 0, fmt.Errorf("disable foreign keys: %w", err)
}

// Helper to delete from a child table in batches.
// Uses rowid-based deletion to ensure each batch finds actual rows to delete.
deleteChildBatched := func(table, fkColumn string, onProgress func()) error {
// Query selects child table rowids by joining to messages filtered by source.
// This ensures each iteration finds actual existing child rows.
query := fmt.Sprintf(`
DELETE FROM %s WHERE rowid IN (
SELECT %s.rowid FROM %s
JOIN messages ON messages.id = %s.%s
WHERE messages.source_id = ?
LIMIT ?
)
`, table, table, table, table, fkColumn)

for {
result, err := conn.ExecContext(ctx, query, sourceID, batchSize)
if err != nil {
return fmt.Errorf("delete from %s: %w", table, err)
}
rows, _ := result.RowsAffected()
if rows == 0 {
break
}
onProgress()
}
return nil
}

var deletedMessages int64

// Delete child tables explicitly (avoiding CASCADE overhead)
// Order: children before parents

// Child tables of messages
childTables := []struct {
table string
fkColumn string
}{
{"message_bodies", "message_id"},
{"message_raw", "message_id"},
{"message_recipients", "message_id"},
{"message_labels", "message_id"},
{"attachments", "message_id"},
{"reactions", "message_id"},
}

for _, ct := range childTables {
tableName := ct.table
if err := deleteChildBatched(ct.table, ct.fkColumn, func() {
progress(ResetProgress{
Phase: "deleting",
TotalMessages: totalMessages,
DeletedMessages: deletedMessages,
CurrentTable: tableName,
})
}); err != nil {
return 0, err
}
}

// Delete messages in batches (parent table)
for {
result, err := conn.ExecContext(ctx, `
DELETE FROM messages WHERE id IN (
SELECT id FROM messages WHERE source_id = ? LIMIT ?
)
`, sourceID, batchSize)
if err != nil {
return deletedMessages, fmt.Errorf("delete messages batch: %w", err)
}

rows, _ := result.RowsAffected()
if rows == 0 {
break
}
deletedMessages += rows

progress(ResetProgress{
Phase: "deleting",
TotalMessages: totalMessages,
DeletedMessages: deletedMessages,
CurrentTable: "messages",
RowsInBatch: rows,
})
}

// Delete conversation_participants (child of conversations)
if _, err := conn.ExecContext(ctx, `
DELETE FROM conversation_participants WHERE conversation_id IN (
SELECT id FROM conversations WHERE source_id = ?
)
`, sourceID); err != nil {
return deletedMessages, fmt.Errorf("delete conversation_participants: %w", err)
}

// Delete conversations
if _, err := conn.ExecContext(ctx, "DELETE FROM conversations WHERE source_id = ?", sourceID); err != nil {
return deletedMessages, fmt.Errorf("delete conversations: %w", err)
}

progress(ResetProgress{
Phase: "deleting",
TotalMessages: totalMessages,
DeletedMessages: deletedMessages,
CurrentTable: "conversations",
})

// Delete labels
if _, err := conn.ExecContext(ctx, "DELETE FROM labels WHERE source_id = ?", sourceID); err != nil {
return deletedMessages, fmt.Errorf("delete labels: %w", err)
}

// Delete sync history
if _, err := conn.ExecContext(ctx, "DELETE FROM sync_runs WHERE source_id = ?", sourceID); err != nil {
return deletedMessages, fmt.Errorf("delete sync_runs: %w", err)
}
if _, err := conn.ExecContext(ctx, "DELETE FROM sync_checkpoints WHERE source_id = ?", sourceID); err != nil {
return deletedMessages, fmt.Errorf("delete sync_checkpoints: %w", err)
}

// Reset the source's sync cursor
if _, err := conn.ExecContext(ctx, `
UPDATE sources
SET sync_cursor = NULL, last_sync_at = NULL, updated_at = datetime('now')
WHERE id = ?
`, sourceID); err != nil {
return deletedMessages, fmt.Errorf("reset source: %w", err)
}

// Re-enable foreign keys on this connection before returning it to pool
if _, err := conn.ExecContext(ctx, "PRAGMA foreign_keys = ON"); err != nil {
return deletedMessages, fmt.Errorf("re-enable foreign keys: %w", err)
}

progress(ResetProgress{
Phase: "complete",
TotalMessages: totalMessages,
DeletedMessages: deletedMessages,
})

return deletedMessages, nil
}
Loading