Skip to content

Commit

Permalink
use channel for collecting current progress
Browse files Browse the repository at this point in the history
  • Loading branch information
dundee committed Mar 25, 2021
1 parent e8c2042 commit bcba9e6
Showing 1 changed file with 30 additions and 21 deletions.
51 changes: 30 additions & 21 deletions analyze/dir.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (

// CurrentProgress struct
type CurrentProgress struct {
mutex *sync.Mutex
CurrentItemName string
ItemCount int
TotalSize int64
Expand All @@ -31,29 +30,30 @@ type Analyzer interface {

// ParallelAnalyzer implements Analyzer
type ParallelAnalyzer struct {
progress *CurrentProgress
progressChan chan CurrentProgress
doneChan chan struct{}
wait sync.WaitGroup
ignoreDir ShouldDirBeIgnored
progress *CurrentProgress
progressInChan chan CurrentProgress
progressOutChan chan CurrentProgress
doneChan chan struct{}
wait sync.WaitGroup
ignoreDir ShouldDirBeIgnored
}

// CreateAnalyzer returns Analyzer
func CreateAnalyzer() Analyzer {
return &ParallelAnalyzer{
progress: &CurrentProgress{
mutex: &sync.Mutex{},
ItemCount: 0,
TotalSize: int64(0),
},
progressChan: make(chan CurrentProgress, 10),
doneChan: make(chan struct{}, 1),
progressInChan: make(chan CurrentProgress, 10),
progressOutChan: make(chan CurrentProgress, 10),
doneChan: make(chan struct{}, 1),
}
}

// GetProgressChan returns channel for getting progress
func (a *ParallelAnalyzer) GetProgressChan() chan CurrentProgress {
return a.progressChan
return a.progressOutChan
}

// GetDoneChan returns channel for checking when analysis is done
Expand All @@ -66,19 +66,22 @@ func (a *ParallelAnalyzer) ResetProgress() {
a.progress.ItemCount = 0
a.progress.TotalSize = int64(0)
a.progress.CurrentItemName = ""
a.progress.mutex = &sync.Mutex{}
}

// AnalyzeDir analyzes given path
func (a *ParallelAnalyzer) AnalyzeDir(path string, ignore ShouldDirBeIgnored) *Dir {
a.ignoreDir = ignore

go a.updateProgress()
dir := a.processDir(path)

dir.BasePath = filepath.Dir(path)
a.wait.Wait()

links := make(AlreadyCountedHardlinks, 10)
dir.UpdateStats(links)

a.doneChan <- struct{}{}
a.doneChan <- struct{}{}

return dir
Expand Down Expand Up @@ -156,20 +159,26 @@ func (a *ParallelAnalyzer) processDir(path string) *Dir {
a.wait.Done()
}()

a.updateProgress(path, len(files), totalSize)
a.progressInChan <- CurrentProgress{path, len(files), totalSize}
return dir
}

func (a *ParallelAnalyzer) updateProgress(path string, itemCount int, totalSize int64) {
a.progress.mutex.Lock()
a.progress.CurrentItemName = path
a.progress.ItemCount += itemCount
a.progress.TotalSize += totalSize
select {
case a.progressChan <- *a.progress:
default:
func (a *ParallelAnalyzer) updateProgress() {
for {
select {
case <-a.doneChan:
return
case progress := <-a.progressInChan:
a.progress.CurrentItemName = progress.CurrentItemName
a.progress.ItemCount += progress.ItemCount
a.progress.TotalSize += progress.TotalSize
}

select {
case a.progressOutChan <- *a.progress:
default:
}
}
a.progress.mutex.Unlock()
}

func getDirFlag(err error, items int) rune {
Expand Down

0 comments on commit bcba9e6

Please sign in to comment.