sync.go (14310B)
1 package cmd 2 3 import ( 4 "context" 5 "fmt" 6 "os" 7 "os/signal" 8 "path/filepath" 9 "regexp" 10 "strings" 11 "sync" 12 "syscall" 13 "time" 14 15 tea "github.com/charmbracelet/bubbletea" 16 "github.com/spf13/cobra" 17 18 "github.com/louloulibs/esync/internal/config" 19 "github.com/louloulibs/esync/internal/logger" 20 "github.com/louloulibs/esync/internal/syncer" 21 "github.com/louloulibs/esync/internal/tui" 22 "github.com/louloulibs/esync/internal/watcher" 23 ) 24 25 // --------------------------------------------------------------------------- 26 // Flags 27 // --------------------------------------------------------------------------- 28 29 var ( 30 localPath string 31 remotePath string 32 daemon bool 33 dryRun bool 34 initialSync bool 35 verbose bool 36 ) 37 38 // --------------------------------------------------------------------------- 39 // Command 40 // --------------------------------------------------------------------------- 41 42 var syncCmd = &cobra.Command{ 43 Use: "sync", 44 Short: "Watch and sync files to a remote destination", 45 Long: "Watch a local directory for changes and automatically sync them to a remote destination using rsync.", 46 RunE: runSync, 47 } 48 49 func init() { 50 syncCmd.Flags().StringVarP(&localPath, "local", "l", "", "local path to watch") 51 syncCmd.Flags().StringVarP(&remotePath, "remote", "r", "", "remote destination path") 52 syncCmd.Flags().BoolVar(&daemon, "daemon", false, "run in daemon mode (no TUI)") 53 syncCmd.Flags().BoolVar(&dryRun, "dry-run", false, "show what would be synced without syncing") 54 syncCmd.Flags().BoolVar(&initialSync, "initial-sync", false, "force a full sync on startup") 55 syncCmd.Flags().BoolVarP(&verbose, "verbose", "v", false, "verbose output") 56 57 rootCmd.AddCommand(syncCmd) 58 } 59 60 // reProgress2 matches the percentage in rsync --info=progress2 output lines. 61 var reProgress2 = regexp.MustCompile(`(\d+)%`) 62 63 // --------------------------------------------------------------------------- 64 // Config loading 65 // --------------------------------------------------------------------------- 66 67 // loadOrBuildConfig resolves configuration from CLI flags, a config file, or 68 // builds a minimal config in memory when --local and --remote are both given. 69 func loadOrBuildConfig() (*config.Config, error) { 70 // 1. Explicit config file via -c flag 71 if cfgFile != "" { 72 cfg, err := config.Load(cfgFile) 73 if err != nil { 74 return nil, fmt.Errorf("loading config %s: %w", cfgFile, err) 75 } 76 applyCLIOverrides(cfg) 77 return cfg, nil 78 } 79 80 // 2. Quick mode: both --local and --remote provided 81 if localPath != "" && remotePath != "" { 82 cfg := &config.Config{ 83 Sync: config.SyncSection{ 84 Local: localPath, 85 Remote: remotePath, 86 Interval: 1, 87 }, 88 Settings: config.Settings{ 89 WatcherDebounce: 500, 90 InitialSync: initialSync, 91 Ignore: []string{".git", "node_modules", ".DS_Store"}, 92 Rsync: config.RsyncSettings{ 93 Archive: true, 94 Compress: true, 95 }, 96 }, 97 } 98 return cfg, nil 99 } 100 101 // 3. Auto-detect config file 102 path := config.FindConfigFile() 103 if path == "" { 104 return nil, fmt.Errorf("no config file found; use -c, or provide both -l and -r") 105 } 106 107 cfg, err := config.Load(path) 108 if err != nil { 109 return nil, fmt.Errorf("loading config %s: %w", path, err) 110 } 111 applyCLIOverrides(cfg) 112 return cfg, nil 113 } 114 115 // applyCLIOverrides applies command-line flag values onto a loaded config. 116 func applyCLIOverrides(cfg *config.Config) { 117 if localPath != "" { 118 cfg.Sync.Local = localPath 119 } 120 if remotePath != "" { 121 cfg.Sync.Remote = remotePath 122 } 123 if initialSync { 124 cfg.Settings.InitialSync = true 125 } 126 } 127 128 // --------------------------------------------------------------------------- 129 // Run entry point 130 // --------------------------------------------------------------------------- 131 132 func runSync(cmd *cobra.Command, args []string) error { 133 cfg, err := loadOrBuildConfig() 134 if err != nil { 135 return err 136 } 137 138 s := syncer.New(cfg) 139 s.DryRun = dryRun 140 141 // Optional initial sync 142 if cfg.Settings.InitialSync { 143 if verbose { 144 fmt.Println("Running initial sync...") 145 } 146 result, err := s.Run() 147 if err != nil { 148 fmt.Fprintf(os.Stderr, "Initial sync error: %s\n", result.ErrorMessage) 149 } else if verbose { 150 fmt.Printf("Initial sync complete: %d files, %s\n", result.FilesCount, formatSize(result.BytesTotal)) 151 } 152 } 153 154 if daemon { 155 return runDaemon(cfg, s) 156 } 157 return runTUI(cfg, s) 158 } 159 160 // --------------------------------------------------------------------------- 161 // TUI mode 162 // --------------------------------------------------------------------------- 163 164 // watchState holds the watcher and syncer that can be torn down and rebuilt. 165 type watchState struct { 166 watcher *watcher.Watcher 167 cancel context.CancelFunc 168 inflight sync.WaitGroup 169 } 170 171 // startWatching creates a syncer, watcher, and sync handler from the given config. 172 func startWatching(cfg *config.Config, syncCh chan<- tui.SyncEvent, logCh chan<- tui.LogEntry) (*watchState, error) { 173 ctx, cancel := context.WithCancel(context.Background()) 174 175 s := syncer.New(cfg) 176 s.DryRun = dryRun 177 178 ws := &watchState{cancel: cancel} 179 180 handler := func() { 181 ws.inflight.Add(1) 182 defer ws.inflight.Done() 183 184 syncCh <- tui.SyncEvent{Status: "status:syncing"} 185 186 var lastPct string 187 onLine := func(line string) { 188 trimmed := strings.TrimSpace(line) 189 if trimmed == "" { 190 return 191 } 192 select { 193 case logCh <- tui.LogEntry{Time: time.Now(), Level: "INF", Message: trimmed}: 194 default: 195 } 196 if m := reProgress2.FindStringSubmatch(trimmed); len(m) > 1 { 197 pct := m[1] 198 if pct != lastPct { 199 lastPct = pct 200 select { 201 case syncCh <- tui.SyncEvent{Status: "status:syncing " + pct + "%"}: 202 default: 203 } 204 } 205 } 206 } 207 208 result, err := s.RunWithProgress(ctx, onLine) 209 now := time.Now() 210 211 if err != nil { 212 syncCh <- tui.SyncEvent{ 213 File: "sync error", 214 Status: "error", 215 Time: now, 216 } 217 syncCh <- tui.SyncEvent{Status: "status:watching"} 218 return 219 } 220 221 groups := groupFilesByTopLevel(result.Files) 222 223 totalGroupBytes := int64(0) 224 totalGroupFiles := 0 225 for _, g := range groups { 226 totalGroupBytes += g.bytes 227 totalGroupFiles += g.count 228 } 229 230 for _, g := range groups { 231 file := g.name 232 bytes := g.bytes 233 if totalGroupBytes == 0 && result.BytesTotal > 0 && totalGroupFiles > 0 { 234 bytes = result.BytesTotal * int64(g.count) / int64(totalGroupFiles) 235 } 236 size := formatSize(bytes) 237 syncCh <- tui.SyncEvent{ 238 File: file, 239 Size: size, 240 Duration: result.Duration, 241 Status: "synced", 242 Time: now, 243 Files: truncateFiles(g.files, 10), 244 FileCount: g.count, 245 } 246 } 247 248 if len(groups) == 0 && result.FilesCount > 0 { 249 syncCh <- tui.SyncEvent{ 250 File: fmt.Sprintf("%d files", result.FilesCount), 251 Size: formatSize(result.BytesTotal), 252 Duration: result.Duration, 253 Status: "synced", 254 Time: now, 255 } 256 } 257 258 syncCh <- tui.SyncEvent{Status: "status:watching"} 259 } 260 261 w, err := watcher.New( 262 cfg.Sync.Local, 263 cfg.Settings.WatcherDebounce, 264 cfg.AllIgnorePatterns(), 265 cfg.Settings.Include, 266 handler, 267 ) 268 if err != nil { 269 cancel() 270 return nil, fmt.Errorf("creating watcher: %w", err) 271 } 272 273 if err := w.Start(); err != nil { 274 cancel() 275 return nil, fmt.Errorf("starting watcher: %w", err) 276 } 277 278 ws.watcher = w 279 return ws, nil 280 } 281 282 // reportBrokenSymlinks sends prominent warnings about broken symlinks 283 // to the TUI log channel. 284 func reportBrokenSymlinks(broken []watcher.BrokenSymlink, logCh chan<- tui.LogEntry) { 285 if len(broken) == 0 { 286 return 287 } 288 now := time.Now() 289 logCh <- tui.LogEntry{ 290 Time: now, 291 Level: "WRN", 292 Message: fmt.Sprintf("Found %d broken symlink(s) — directories containing them are not watched:", len(broken)), 293 } 294 for _, b := range broken { 295 logCh <- tui.LogEntry{ 296 Time: now, 297 Level: "WRN", 298 Message: fmt.Sprintf(" %s -> %s", b.Path, b.Target), 299 } 300 } 301 } 302 303 func runTUI(cfg *config.Config, s *syncer.Syncer) error { 304 app := tui.NewApp(cfg.Sync.Local, cfg.Sync.Remote) 305 syncCh := app.SyncEventChan() 306 logCh := app.LogEntryChan() 307 308 ws, err := startWatching(cfg, syncCh, logCh) 309 if err != nil { 310 return err 311 } 312 reportBrokenSymlinks(ws.watcher.BrokenSymlinks, logCh) 313 app.SetWarnings(len(ws.watcher.BrokenSymlinks)) 314 315 var wsMu sync.Mutex 316 317 // Handle resync requests 318 resyncCh := app.ResyncChan() 319 go func() { 320 for range resyncCh { 321 wsMu.Lock() 322 w := ws 323 wsMu.Unlock() 324 w.watcher.TriggerSync() 325 } 326 }() 327 328 p := tea.NewProgram(app, tea.WithAltScreen()) 329 330 // Handle config reload 331 configCh := app.ConfigReloadChan() 332 go func() { 333 for newCfg := range configCh { 334 wsMu.Lock() 335 oldWs := ws 336 wsMu.Unlock() 337 338 // Tear down: stop watcher, wait for in-flight syncs 339 oldWs.watcher.Stop() 340 oldWs.inflight.Wait() 341 oldWs.cancel() 342 343 // Rebuild with new config 344 newWs, err := startWatching(newCfg, syncCh, logCh) 345 if err != nil { 346 select { 347 case syncCh <- tui.SyncEvent{Status: "status:error"}: 348 default: 349 } 350 continue 351 } 352 353 wsMu.Lock() 354 ws = newWs 355 wsMu.Unlock() 356 357 // Update paths via Bubbletea message (safe — goes through Update loop) 358 p.Send(tui.ConfigReloadedMsg{ 359 Local: newCfg.Sync.Local, 360 Remote: newCfg.Sync.Remote, 361 }) 362 363 select { 364 case syncCh <- tui.SyncEvent{Status: "status:watching"}: 365 default: 366 } 367 } 368 }() 369 370 if _, err := p.Run(); err != nil { 371 wsMu.Lock() 372 w := ws 373 wsMu.Unlock() 374 w.watcher.Stop() 375 w.cancel() 376 return fmt.Errorf("TUI error: %w", err) 377 } 378 379 wsMu.Lock() 380 w := ws 381 wsMu.Unlock() 382 w.watcher.Stop() 383 w.cancel() 384 return nil 385 } 386 387 // --------------------------------------------------------------------------- 388 // Daemon mode 389 // --------------------------------------------------------------------------- 390 391 func runDaemon(cfg *config.Config, s *syncer.Syncer) error { 392 // Write PID file so `esync status` can find us 393 pidPath := filepath.Join(os.TempDir(), "esync.pid") 394 os.WriteFile(pidPath, []byte(fmt.Sprintf("%d", os.Getpid())), 0644) 395 defer os.Remove(pidPath) 396 397 var log *logger.Logger 398 if cfg.Settings.Log.File != "" { 399 var err error 400 log, err = logger.New(cfg.Settings.Log.File, cfg.Settings.Log.Format) 401 if err != nil { 402 return fmt.Errorf("creating logger: %w", err) 403 } 404 defer log.Close() 405 } 406 407 fmt.Printf("esync daemon started (PID %d)\n", os.Getpid()) 408 fmt.Printf("Watching: %s -> %s\n", cfg.Sync.Local, cfg.Sync.Remote) 409 410 if log != nil { 411 log.Info("started", map[string]interface{}{ 412 "local": cfg.Sync.Local, 413 "remote": cfg.Sync.Remote, 414 "pid": os.Getpid(), 415 }) 416 } 417 418 handler := func() { 419 result, err := s.Run() 420 421 if err != nil { 422 msg := result.ErrorMessage 423 if verbose { 424 fmt.Fprintf(os.Stderr, "Sync error: %s\n", msg) 425 } 426 if log != nil { 427 log.Error("sync_failed", map[string]interface{}{ 428 "error": msg, 429 }) 430 } 431 // Terminal bell on error 432 fmt.Print("\a") 433 return 434 } 435 436 if verbose { 437 fmt.Printf("Synced %d files (%s) in %s\n", 438 result.FilesCount, 439 formatSize(result.BytesTotal), 440 result.Duration.Truncate(time.Millisecond), 441 ) 442 } 443 if log != nil { 444 log.Info("sync_complete", map[string]interface{}{ 445 "files": result.FilesCount, 446 "bytes": result.BytesTotal, 447 "duration": result.Duration.String(), 448 }) 449 } 450 } 451 452 w, err := watcher.New( 453 cfg.Sync.Local, 454 cfg.Settings.WatcherDebounce, 455 cfg.AllIgnorePatterns(), 456 cfg.Settings.Include, 457 handler, 458 ) 459 if err != nil { 460 return fmt.Errorf("creating watcher: %w", err) 461 } 462 463 if err := w.Start(); err != nil { 464 return fmt.Errorf("starting watcher: %w", err) 465 } 466 defer w.Stop() 467 468 if len(w.BrokenSymlinks) > 0 { 469 fmt.Fprintf(os.Stderr, "Warning: %d broken symlink(s) found — directories containing them are not watched:\n", len(w.BrokenSymlinks)) 470 for _, b := range w.BrokenSymlinks { 471 fmt.Fprintf(os.Stderr, " %s -> %s\n", b.Path, b.Target) 472 } 473 } 474 475 // Block until SIGINT or SIGTERM 476 sigCh := make(chan os.Signal, 1) 477 signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) 478 <-sigCh 479 480 if log != nil { 481 log.Info("stopping", nil) 482 } 483 fmt.Println("\nesync daemon stopped.") 484 return nil 485 } 486 487 // --------------------------------------------------------------------------- 488 // Helpers 489 // --------------------------------------------------------------------------- 490 491 // formatSize converts a byte count to a human-readable string (B, KB, MB, GB). 492 func formatSize(bytes int64) string { 493 switch { 494 case bytes >= 1<<30: 495 return fmt.Sprintf("%.1fGB", float64(bytes)/float64(1<<30)) 496 case bytes >= 1<<20: 497 return fmt.Sprintf("%.1fMB", float64(bytes)/float64(1<<20)) 498 case bytes >= 1<<10: 499 return fmt.Sprintf("%.1fKB", float64(bytes)/float64(1<<10)) 500 default: 501 return fmt.Sprintf("%dB", bytes) 502 } 503 } 504 505 // groupedEvent represents a top-level directory or root file for the TUI. 506 type groupedEvent struct { 507 name string // "cmd/" or "main.go" 508 count int // number of files (1 for root files) 509 bytes int64 // total bytes 510 files []string // individual file paths within the group 511 } 512 513 // groupFilesByTopLevel collapses file entries into top-level directories 514 // and root files. "cmd/sync.go" + "cmd/init.go" become one entry "cmd/" with count=2. 515 // When a directory contains only one file, the full relative path is kept. 516 func groupFilesByTopLevel(files []syncer.FileEntry) []groupedEvent { 517 dirMap := make(map[string]*groupedEvent) 518 // Track the original filename for single-file groups. 519 dirFirstFile := make(map[string]string) 520 var rootFiles []groupedEvent 521 var dirOrder []string 522 523 for _, f := range files { 524 parts := strings.SplitN(f.Name, "/", 2) 525 if len(parts) == 1 { 526 // Root-level file 527 rootFiles = append(rootFiles, groupedEvent{ 528 name: f.Name, 529 count: 1, 530 bytes: f.Bytes, 531 }) 532 } else { 533 dir := parts[0] + "/" 534 if g, ok := dirMap[dir]; ok { 535 g.count++ 536 g.bytes += f.Bytes 537 g.files = append(g.files, f.Name) 538 } else { 539 dirMap[dir] = &groupedEvent{ 540 name: dir, 541 count: 1, 542 bytes: f.Bytes, 543 files: []string{f.Name}, 544 } 545 dirFirstFile[dir] = f.Name 546 dirOrder = append(dirOrder, dir) 547 } 548 } 549 } 550 551 var out []groupedEvent 552 for _, dir := range dirOrder { 553 g := *dirMap[dir] 554 if g.count == 1 { 555 g.name = dirFirstFile[dir] 556 g.files = nil 557 } 558 out = append(out, g) 559 } 560 out = append(out, rootFiles...) 561 return out 562 } 563 564 // truncateFiles returns at most n elements from files. 565 func truncateFiles(files []string, n int) []string { 566 if len(files) <= n { 567 return files 568 } 569 return files[:n] 570 }