From 7f6e189dbc49675f160c11f9053a3dfa2ea14ca8 Mon Sep 17 00:00:00 2001 From: Nick Santos Date: Thu, 11 Jul 2019 11:40:40 -0400 Subject: [PATCH] watch: change the watcher interface to better match how we actually use it (#1835) --- pkg/watch/notify.go | 39 +++++++++- pkg/watch/notify_test.go | 140 ++++++++++++++---------------------- pkg/watch/watcher_darwin.go | 56 +++++++-------- pkg/watch/watcher_naive.go | 89 ++++++++++++----------- 4 files changed, 168 insertions(+), 156 deletions(-) diff --git a/pkg/watch/notify.go b/pkg/watch/notify.go index e40b04dc..02f7d735 100644 --- a/pkg/watch/notify.go +++ b/pkg/watch/notify.go @@ -1,12 +1,49 @@ package watch +import "github.com/windmilleng/tilt/internal/logger" + type FileEvent struct { Path string } type Notify interface { + // Start watching the paths set at init time + Start() error + + // Stop watching and close all channels Close() error - Add(name string) error + + // A channel to read off incoming file changes Events() chan FileEvent + + // A channel to read off show-stopping errors Errors() chan error } + +// When we specify directories to watch, we often want to +// ignore some subset of the files under those directories. +// +// For example: +// - Watch /src/repo, but ignore /src/repo/.git +// - Watch /src/repo, but ignore everything in /src/repo/bazel-bin except /src/repo/bazel-bin/app-binary +// +// The PathMatcher inteface helps us manage these ignores. +// By design, fileutils.PatternMatcher (the interface that implements dockerignore) +// satisfies this interface +// https://godoc.org/github.com/docker/docker/pkg/fileutils#PatternMatcher +type PathMatcher interface { + Matches(file string) (bool, error) + Exclusions() bool +} + +type EmptyMatcher struct { +} + +func (EmptyMatcher) Matches(f string) (bool, error) { return false, nil } +func (EmptyMatcher) Exclusions() bool { return false } + +var _ PathMatcher = EmptyMatcher{} + +func NewWatcher(paths []string, ignore PathMatcher, l logger.Logger) (Notify, error) { + return newWatcher(paths, ignore, l) +} diff --git a/pkg/watch/notify_test.go b/pkg/watch/notify_test.go index b66e625b..83086b63 100644 --- a/pkg/watch/notify_test.go +++ b/pkg/watch/notify_test.go @@ -36,10 +36,7 @@ func TestEventOrdering(t *testing.T) { for i, _ := range dirs { dir := f.TempDir("watched") dirs[i] = dir - err := f.notify.Add(dir) - if err != nil { - t.Fatal(err) - } + f.watch(dir) } f.fsync() @@ -71,10 +68,7 @@ func TestGitBranchSwitch(t *testing.T) { for i, _ := range dirs { dir := f.TempDir("watched") dirs[i] = dir - err := f.notify.Add(dir) - if err != nil { - t.Fatal(err) - } + f.watch(dir) } f.fsync() @@ -129,16 +123,13 @@ func TestWatchesAreRecursive(t *testing.T) { f.MkdirAll(subPath) // watch parent - err := f.notify.Add(root) - if err != nil { - t.Fatal(err) - } + f.watch(root) f.fsync() f.events = nil // change sub directory changeFilePath := filepath.Join(subPath, "change") - _, err = os.OpenFile(changeFilePath, os.O_RDONLY|os.O_CREATE, 0666) + _, err := os.OpenFile(changeFilePath, os.O_RDONLY|os.O_CREATE, 0666) if err != nil { t.Fatal(err) } @@ -153,10 +144,7 @@ func TestNewDirectoriesAreRecursivelyWatched(t *testing.T) { root := f.TempDir("root") // watch parent - err := f.notify.Add(root) - if err != nil { - t.Fatal(err) - } + f.watch(root) f.fsync() f.events = nil @@ -166,7 +154,7 @@ func TestNewDirectoriesAreRecursivelyWatched(t *testing.T) { // change something inside sub directory changeFilePath := filepath.Join(subPath, "change") - _, err = os.OpenFile(changeFilePath, os.O_RDONLY|os.O_CREATE, 0666) + _, err := os.OpenFile(changeFilePath, os.O_RDONLY|os.O_CREATE, 0666) if err != nil { t.Fatal(err) } @@ -180,11 +168,7 @@ func TestWatchNonExistentPath(t *testing.T) { root := f.TempDir("root") path := filepath.Join(root, "change") - err := f.notify.Add(path) - if err != nil { - t.Fatal(err) - } - + f.watch(path) f.fsync() d1 := "hello\ngo\n" @@ -200,11 +184,7 @@ func TestWatchNonExistentPathDoesNotFireSiblingEvent(t *testing.T) { watchedFile := filepath.Join(root, "a.txt") unwatchedSibling := filepath.Join(root, "b.txt") - err := f.notify.Add(watchedFile) - if err != nil { - t.Fatal(err) - } - + f.watch(watchedFile) f.fsync() d1 := "hello\ngo\n" @@ -222,13 +202,10 @@ func TestRemove(t *testing.T) { d1 := "hello\ngo\n" f.WriteFile(path, d1) - err := f.notify.Add(path) - if err != nil { - t.Fatal(err) - } + f.watch(path) f.fsync() f.events = nil - err = os.Remove(path) + err := os.Remove(path) if err != nil { t.Fatal(err) } @@ -239,17 +216,14 @@ func TestRemoveAndAddBack(t *testing.T) { f := newNotifyFixture(t) defer f.tearDown() - path := filepath.Join(f.watched, "change") + path := filepath.Join(f.paths[0], "change") d1 := []byte("hello\ngo\n") err := ioutil.WriteFile(path, d1, 0644) if err != nil { t.Fatal(err) } - err = f.notify.Add(path) - if err != nil { - t.Fatal(err) - } + f.watch(path) f.assertEvents(path) err = os.Remove(path) @@ -278,14 +252,11 @@ func TestSingleFile(t *testing.T) { d1 := "hello\ngo\n" f.WriteFile(path, d1) - err := f.notify.Add(path) - if err != nil { - t.Fatal(err) - } + f.watch(path) f.fsync() d2 := []byte("hello\nworld\n") - err = ioutil.WriteFile(path, d2, 0644) + err := ioutil.WriteFile(path, d2, 0644) if err != nil { t.Fatal(err) } @@ -296,8 +267,8 @@ func TestWriteBrokenLink(t *testing.T) { f := newNotifyFixture(t) defer f.tearDown() - link := filepath.Join(f.watched, "brokenLink") - missingFile := filepath.Join(f.watched, "missingFile") + link := filepath.Join(f.paths[0], "brokenLink") + missingFile := filepath.Join(f.paths[0], "missingFile") err := os.Symlink(missingFile, link) if err != nil { t.Fatal(err) @@ -310,13 +281,13 @@ func TestWriteGoodLink(t *testing.T) { f := newNotifyFixture(t) defer f.tearDown() - goodFile := filepath.Join(f.watched, "goodFile") + goodFile := filepath.Join(f.paths[0], "goodFile") err := ioutil.WriteFile(goodFile, []byte("hello"), 0644) if err != nil { t.Fatal(err) } - link := filepath.Join(f.watched, "goodFileSymlink") + link := filepath.Join(f.paths[0], "goodFileSymlink") err = os.Symlink(goodFile, link) if err != nil { t.Fatal(err) @@ -342,11 +313,7 @@ func TestWatchBrokenLink(t *testing.T) { t.Fatal(err) } - err = f.notify.Add(newRoot.Path()) - if err != nil { - t.Fatal(err) - } - + f.watch(newRoot.Path()) os.Remove(link) f.assertEvents(link) } @@ -359,15 +326,11 @@ func TestMoveAndReplace(t *testing.T) { file := filepath.Join(root, "myfile") f.WriteFile(file, "hello") - err := f.notify.Add(file) - if err != nil { - t.Fatal(err) - } - + f.watch(file) tmpFile := filepath.Join(root, ".myfile.swp") f.WriteFile(tmpFile, "world") - err = os.Rename(tmpFile, file) + err := os.Rename(tmpFile, file) if err != nil { t.Fatal(err) } @@ -478,37 +441,40 @@ func TestWatchNonexistentDirectory(t *testing.T) { type notifyFixture struct { out *bytes.Buffer *tempdir.TempDirFixture - notify Notify - watched string - events []FileEvent + notify Notify + paths []string + events []FileEvent } func newNotifyFixture(t *testing.T) *notifyFixture { out := bytes.NewBuffer(nil) - notify, err := NewWatcher(logger.NewLogger(logger.DebugLvl, out)) - if err != nil { - t.Fatal(err) - } - - f := tempdir.NewTempDirFixture(t) - watched := f.TempDir("watched") - - err = notify.Add(watched) - if err != nil { - t.Fatal(err) - } - return ¬ifyFixture{ - TempDirFixture: f, - watched: watched, - notify: notify, + nf := ¬ifyFixture{ + TempDirFixture: tempdir.NewTempDirFixture(t), + paths: []string{}, out: out, } + nf.watch(nf.TempDir("watched")) + return nf } func (f *notifyFixture) watch(path string) { - err := f.notify.Add(path) + f.paths = append(f.paths, path) + + // sync any outstanding events and close the old watcher + if f.notify != nil { + f.fsync() + f.closeWatcher() + } + + // create a new watcher + notify, err := NewWatcher(f.paths, EmptyMatcher{}, logger.NewLogger(logger.DebugLvl, f.out)) if err != nil { - f.T().Fatalf("notify.Add: %s", path) + f.T().Fatal(err) + } + f.notify = notify + err = f.notify.Start() + if err != nil { + f.T().Fatal(err) } } @@ -548,8 +514,8 @@ func (f *notifyFixture) consumeEventsInBackground(ctx context.Context) chan erro func (f *notifyFixture) fsync() { syncPathBase := fmt.Sprintf("sync-%d.txt", time.Now().UnixNano()) - syncPath := filepath.Join(f.watched, syncPathBase) - anySyncPath := filepath.Join(f.watched, "sync-") + syncPath := filepath.Join(f.paths[0], syncPathBase) + anySyncPath := filepath.Join(f.paths[0], "sync-") timeout := time.After(time.Second) f.WriteFile(syncPath, fmt.Sprintf("%s", time.Now())) @@ -582,21 +548,25 @@ F: } } -func (f *notifyFixture) tearDown() { - err := f.notify.Close() +func (f *notifyFixture) closeWatcher() { + notify := f.notify + err := notify.Close() if err != nil { f.T().Fatal(err) } // drain channels from watcher go func() { - for _ = range f.notify.Events() { + for _ = range notify.Events() { } }() go func() { - for _ = range f.notify.Errors() { + for _ = range notify.Errors() { } }() +} +func (f *notifyFixture) tearDown() { + f.closeWatcher() f.TempDirFixture.TearDown() } diff --git a/pkg/watch/watcher_darwin.go b/pkg/watch/watcher_darwin.go index 7cf0ac08..038f940f 100644 --- a/pkg/watch/watcher_darwin.go +++ b/pkg/watch/watcher_darwin.go @@ -2,7 +2,6 @@ package watch import ( "path/filepath" - "sync" "time" "github.com/windmilleng/tilt/internal/logger" @@ -19,14 +18,9 @@ type darwinNotify struct { errors chan error stop chan struct{} - // TODO(nick): This mutex is needed for the case where we add paths after we - // start watching. But because fsevents supports recursive watches, we don't - // actually need this feature. We should change the api contract of wmNotify - // so that, for recursive watches, we can guarantee that the path list doesn't - // change. - sm *sync.Mutex - pathsWereWatching map[string]interface{} + ignore PathMatcher + logger logger.Logger sawAnyHistoryDone bool } @@ -44,9 +38,7 @@ func (d *darwinNotify) loop() { e.Path = filepath.Join("/", e.Path) if e.Flags&fsevents.HistoryDone == fsevents.HistoryDone { - d.sm.Lock() d.sawAnyHistoryDone = true - d.sm.Unlock() continue } @@ -63,6 +55,13 @@ func (d *darwinNotify) loop() { continue } + ignore, err := d.ignore.Matches(e.Path) + if err != nil { + d.logger.Infof("Error matching path %q: %v", e.Path, err) + } else if ignore { + continue + } + d.events <- FileEvent{ Path: e.Path, } @@ -71,41 +70,33 @@ func (d *darwinNotify) loop() { } } -func (d *darwinNotify) Add(name string) error { - d.sm.Lock() - defer d.sm.Unlock() - - es := d.stream - +// Add a path to be watched. Should only be called during initialization. +func (d *darwinNotify) initAdd(name string) { // Check if this is a subdirectory of any of the paths // we're already watching. - for _, parent := range es.Paths { + for _, parent := range d.stream.Paths { if ospath.IsChild(parent, name) { - return nil + return } } - es.Paths = append(es.Paths, name) + d.stream.Paths = append(d.stream.Paths, name) if d.pathsWereWatching == nil { d.pathsWereWatching = make(map[string]interface{}) } d.pathsWereWatching[name] = struct{}{} +} - if len(es.Paths) == 1 { - es.Start() - go d.loop() - } else { - es.Restart() - } +func (d *darwinNotify) Start() error { + d.stream.Start() + + go d.loop() return nil } func (d *darwinNotify) Close() error { - d.sm.Lock() - defer d.sm.Unlock() - d.stream.Stop() close(d.errors) close(d.stop) @@ -121,8 +112,10 @@ func (d *darwinNotify) Errors() chan error { return d.errors } -func NewWatcher(l logger.Logger) (Notify, error) { +func newWatcher(paths []string, ignore PathMatcher, l logger.Logger) (*darwinNotify, error) { dw := &darwinNotify{ + ignore: ignore, + logger: l, stream: &fsevents.EventStream{ Latency: 1 * time.Millisecond, Flags: fsevents.FileEvents, @@ -130,12 +123,15 @@ func NewWatcher(l logger.Logger) (Notify, error) { // https://developer.apple.com/documentation/coreservices/1443980-fseventstreamcreate EventID: fsevents.LatestEventID(), }, - sm: &sync.Mutex{}, events: make(chan FileEvent), errors: make(chan error), stop: make(chan struct{}), } + for _, path := range paths { + dw.initAdd(path) + } + return dw, nil } diff --git a/pkg/watch/watcher_naive.go b/pkg/watch/watcher_naive.go index e2458147..4ddd58b1 100644 --- a/pkg/watch/watcher_naive.go +++ b/pkg/watch/watcher_naive.go @@ -7,7 +7,6 @@ import ( "fmt" "os" "path/filepath" - "sync" "github.com/pkg/errors" "github.com/windmilleng/fsnotify" @@ -21,51 +20,51 @@ import ( // // All OS-specific codepaths are handled by fsnotify. type naiveNotify struct { - log logger.Logger - watcher *fsnotify.Watcher - events chan fsnotify.Event - wrappedEvents chan FileEvent - errors chan error - - mu sync.Mutex - // Paths that we're watching that should be passed up to the caller. // Note that we may have to watch ancestors of these paths // in order to fulfill the API promise. notifyList map[string]bool + + ignore PathMatcher + log logger.Logger + + watcher *fsnotify.Watcher + events chan fsnotify.Event + wrappedEvents chan FileEvent + errors chan error } var ( numberOfWatches = expvar.NewInt("watch.naive.numberOfWatches") ) -func (d *naiveNotify) Add(name string) error { - fi, err := os.Stat(name) - if err != nil && !os.IsNotExist(err) { - return errors.Wrapf(err, "notify.Add(%q)", name) - } - - // if it's a file that doesn't exist, watch its parent - if os.IsNotExist(err) { - err = d.watchAncestorOfMissingPath(name) - if err != nil { - return errors.Wrapf(err, "watchAncestorOfMissingPath(%q)", name) - } - } else if fi.IsDir() { - err = d.watchRecursively(name) - if err != nil { +func (d *naiveNotify) Start() error { + for name := range d.notifyList { + fi, err := os.Stat(name) + if err != nil && !os.IsNotExist(err) { return errors.Wrapf(err, "notify.Add(%q)", name) } - } else { - err = d.add(filepath.Dir(name)) - if err != nil { - return errors.Wrapf(err, "notify.Add(%q)", filepath.Dir(name)) + + // if it's a file that doesn't exist, watch its parent + if os.IsNotExist(err) { + err = d.watchAncestorOfMissingPath(name) + if err != nil { + return errors.Wrapf(err, "watchAncestorOfMissingPath(%q)", name) + } + } else if fi.IsDir() { + err = d.watchRecursively(name) + if err != nil { + return errors.Wrapf(err, "notify.Add(%q)", name) + } + } else { + err = d.add(filepath.Dir(name)) + if err != nil { + return errors.Wrapf(err, "notify.Add(%q)", filepath.Dir(name)) + } } } - d.mu.Lock() - defer d.mu.Unlock() - d.notifyList[name] = true + go d.loop() return nil } @@ -123,10 +122,8 @@ func (d *naiveNotify) Errors() chan error { func (d *naiveNotify) loop() { defer close(d.wrappedEvents) for e := range d.events { - shouldNotify := d.shouldNotify(e.Name) - if e.Op&fsnotify.Create != fsnotify.Create { - if shouldNotify { + if d.shouldNotify(e.Name) { d.wrappedEvents <- FileEvent{e.Name} } continue @@ -170,8 +167,13 @@ func (d *naiveNotify) loop() { } func (d *naiveNotify) shouldNotify(path string) bool { - d.mu.Lock() - defer d.mu.Unlock() + ignore, err := d.ignore.Matches(path) + if err != nil { + d.log.Infof("Error matching path %q: %v", path, err) + } else if ignore { + return false + } + if _, ok := d.notifyList[path]; ok { return true } @@ -193,25 +195,32 @@ func (d *naiveNotify) add(path string) error { return nil } -func NewWatcher(l logger.Logger) (*naiveNotify, error) { +func newWatcher(paths []string, ignore PathMatcher, l logger.Logger) (*naiveNotify, error) { + if ignore == nil { + return nil, fmt.Errorf("newWatcher: ignore is nil") + } + fsw, err := fsnotify.NewWatcher() if err != nil { return nil, err } wrappedEvents := make(chan FileEvent) + notifyList := make(map[string]bool, len(paths)) + for _, path := range paths { + notifyList[path] = true + } wmw := &naiveNotify{ + notifyList: notifyList, + ignore: ignore, log: l, watcher: fsw, events: fsw.Events, wrappedEvents: wrappedEvents, errors: fsw.Errors, - notifyList: map[string]bool{}, } - go wmw.loop() - return wmw, nil }