Skip to content

Commit

Permalink
Merge pull request #1 from grisu48/threads
Browse files Browse the repository at this point in the history
Add ChunkFactory thread
  • Loading branch information
grisu48 authored Mar 9, 2021
2 parents 0380653 + 1a0cec1 commit 8f39e80
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 327 deletions.
60 changes: 57 additions & 3 deletions cmd/disko-san/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,73 @@ func VerifyChunk(buf []byte) bool {
}

func CreateChunk(buf []byte) {
n, err := rand.Read(buf)
n, err := rand.Read(buf[4:]) // don't waste the first four bytes, as they are anyways checksum
if err != nil {
// This error is critical and cannot be recovered
panic(err)
}
if n < len(buf) {
if n < len(buf)-4 {
panic(fmt.Errorf("couldn't get enough bytes from random pool"))
}
ApplyChecksum(buf)
}

func ApplyChecksum(buf []byte) {
// Apply checksum to CHUNK at the beginning
// TODO: Don't waste the first four bytes for it!
cSum := checksum(buf[4:])
for i := 0; i < 4; i++ {
buf[i] = byte(cSum << i)
}
}

// Factory for producing chunks
type ChunkFactory struct {
buf []byte // destination buffer
sig chan int // status channel
running bool // Running flag
}

func (cf *ChunkFactory) StartProduce(size int) {
if cf.running {
return
}
cf.buf = make([]byte, size)
cf.sig = make(chan int, 1)
cf.running = true
go cf.produce()
}

func (cf *ChunkFactory) produce() {
for cf.running {
CreateChunk(cf.buf)
cf.sig <- 0 // Send ready signal
if <-cf.sig != 0 { // Wait for signal to proceed
break // stop signal
}
}
}

func (cf *ChunkFactory) Read(buf []byte) error {
if !cf.running {
return fmt.Errorf("chunk factory not running")
}
// We can deal with smaller buffer sizes, but not with larger
if len(cf.buf) < len(buf) {
return fmt.Errorf("chunk factory buffer size mismatch")
}
// Wait for ready signal
if sig := <-cf.sig; sig != 0 {
return fmt.Errorf("chunk factory signal %d", sig)
}
copy(buf, cf.buf) // smaller buffer is allowed
// If it is a smaller buffer, we need to re-compute the checksum
if len(cf.buf) != len(buf) {
ApplyChecksum(buf)
}
cf.sig <- 0
return nil
}

func (cf *ChunkFactory) Stop() {
cf.running = false
}
11 changes: 8 additions & 3 deletions cmd/disko-san/disko-san.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,11 @@ func WriteCheck(disk *Disk, progress *Progress, statsFile string) error {
return err
}

// Background chunk production instance
var cf ChunkFactory
cf.StartProduce(CHUNKSIZE)
defer cf.Stop()

fmt.Printf("\033[s") // save cursor position
for progress.Pos < progress.Size {
// Determine size of current chunk - at the end of the disk this might not be the full size anymore
Expand All @@ -219,10 +224,10 @@ func WriteCheck(disk *Disk, progress *Progress, statsFile string) error {
chunk = chunk[:size]
}

// TODO: Create thread for create chunk to speed up process

// Create chunk
CreateChunk(chunk)
if err := cf.Read(chunk); err != nil {
return fmt.Errorf("ChunkFactory read error: %s", err)
}
// Write chunk to file with runtime
runtime := time.Now().UnixNano()
if n, err := disk.Write(chunk); err != nil {
Expand Down
Loading

0 comments on commit 8f39e80

Please sign in to comment.