Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
ben-ha committed Mar 10, 2024
1 parent cb4da23 commit 4f336bf
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 30 deletions.
6 changes: 1 addition & 5 deletions copier/blockcopier.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,6 @@ func (copier BlockCopier) CopyWithProgress(source discovery.FileInformation, des
return CopierState{State: state.State, Error: castErr}
}

if state.State == nil {
concreteState = BlockCopierState{}
}

inputFile, inputErr := os.Open(source.FullPath)
if inputErr != nil {
reportProgress(progress, source, destination, concreteState, inputErr)
Expand Down Expand Up @@ -106,5 +102,5 @@ func reportProgress(progressChan chan<- CopierProgress, source discovery.FileInf
if progressChan == nil {
return
}
progressChan <- CopierProgress{Source: source.FullPath, Dest: dest.FullPath, Size: currentState.Size, BytesTransferred: currentState.BytesTransferred, OpaqueState: currentState, Error: err}
progressChan <- CopierProgress{Source: source.FullPath, Dest: dest.FullPath, Size: currentState.Size, BytesTransferred: currentState.BytesTransferred, OpaqueState: CopierState{Error: err, State: currentState}, Error: err}
}
32 changes: 15 additions & 17 deletions logic/logic.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,20 @@ import (

"github.com/ben-ha/jcp/copier"
"github.com/ben-ha/jcp/discovery"
"github.com/ben-ha/jcp/state"
)

const BLOCKSIZE = 1024 * 1024 // 1MB

type JcpProgress struct {
JcpError error
Progress copier.CopierProgress
}

type Jcp struct {
ProgressChannel chan JcpProgress
ProgressChannel chan state.JcpProgress
ConcurrencyLimit uint
JcpState state.JcpState
}

func MakeJcp(concurrencyLimit uint) Jcp {
progressChannel := make(chan JcpProgress)
return Jcp{ProgressChannel: progressChannel, ConcurrencyLimit: concurrencyLimit}
func MakeJcp(concurrencyLimit uint, jcpState state.JcpState) Jcp {
progressChannel := make(chan state.JcpProgress)
return Jcp{ProgressChannel: progressChannel, ConcurrencyLimit: concurrencyLimit, JcpState: jcpState}
}

func (jcp Jcp) StartCopy(src string, dest string) error {
Expand All @@ -45,7 +42,7 @@ func (jcp Jcp) StartCopy(src string, dest string) error {
return err
}
} else {
err := jcp.startFileCopy(src, dest, copier.CopierState{}, copierProgressChannel)
err := jcp.startFileCopy(src, dest, copierProgressChannel)
if err != nil {
return err
}
Expand Down Expand Up @@ -88,7 +85,7 @@ func (jcp Jcp) startDirectoryCopy(src string, dest string, progressChannel chan
destFile, _ := discovery.MakeFileInformation(path.Join(destBasePath, RemoveBaseDirectory(srcBasePath, currentFile.FullPath)))
go func(currentFile discovery.FileInformation, destFile discovery.FileInformation) {
defer transferCompletion.Done()
jcp.startFileCopyByInfo(currentFile, destFile, copier.CopierState{}, progressChannel)
jcp.startFileCopyByInfo(currentFile, destFile, progressChannel)
<-concurrencyLimiter
}(currentFile, destFile)
}
Expand All @@ -101,15 +98,15 @@ func (jcp Jcp) startDirectoryCopy(src string, dest string, progressChannel chan
return nil
}

func (jcp Jcp) startFileCopy(source string, destination string, state copier.CopierState, progress chan<- copier.CopierProgress) error {
func (jcp Jcp) startFileCopy(source string, destination string, progress chan<- copier.CopierProgress) error {
srcInfo, err := discovery.MakeFileInformation(source)
if err != nil {
return err
}

dstInfo, _ := discovery.MakeFileInformation(destination)
go func() {
newState := jcp.startFileCopyByInfo(srcInfo, dstInfo, state, progress)
newState := jcp.startFileCopyByInfo(srcInfo, dstInfo, progress)
err := newState.Error
if err == nil {
err = io.EOF
Expand All @@ -120,14 +117,15 @@ func (jcp Jcp) startFileCopy(source string, destination string, state copier.Cop
return nil
}

func (jcp Jcp) startFileCopyByInfo(source discovery.FileInformation, destination discovery.FileInformation, state copier.CopierState, progress chan<- copier.CopierProgress) copier.CopierState {
func (jcp Jcp) startFileCopyByInfo(source discovery.FileInformation, destination discovery.FileInformation, progress chan<- copier.CopierProgress) copier.CopierState {
cp := copier.MakeBlockCopier(BLOCKSIZE)
newState := cp.CopyWithProgress(source, destination, state, progress)
copierState := jcp.JcpState.GetStateForTransfer(source.FullPath, destination.FullPath)
newState := cp.CopyWithProgress(source, destination, copierState, progress)
return newState
}

func (jcp Jcp) reportError(err error) {
jcp.ProgressChannel <- JcpProgress{JcpError: err, Progress: copier.CopierProgress{}}
jcp.ProgressChannel <- state.JcpProgress{JcpError: err, Progress: copier.CopierProgress{}}
}

func RemoveBaseDirectory(base string, input string) string {
Expand All @@ -142,7 +140,7 @@ func (jcp Jcp) startProcessingProgress(copierProgress chan copier.CopierProgress
if !more {
break
}
jcp.ProgressChannel <- JcpProgress{JcpError: nil, Progress: msg}
jcp.ProgressChannel <- state.JcpProgress{JcpError: nil, Progress: msg}
}
}()
}
2 changes: 1 addition & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func main() {

defer state.SaveState()

jcp := logic.MakeJcp(10)
jcp := logic.MakeJcp(10, *state)

ui := tea.NewProgram(tui.UIModel{})
uiComplete := sync.WaitGroup{}
Expand Down
33 changes: 29 additions & 4 deletions state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package state
import (
"time"

"github.com/ben-ha/jcp/logic"
"github.com/ben-ha/jcp/copier"
)

type CopySourceKey = string
Expand All @@ -27,7 +27,32 @@ type JcpCopyState struct {
Percent float64
}

func MakeNewCopyState(progress logic.JcpProgress) JcpCopyState {
percent := float64(progress.Progress.BytesTransferred) / float64(progress.Progress.Size)
return JcpCopyState{OpaqueState: progress.Progress.OpaqueState, CopierType: BlockCopier, LastUpdate: time.Now(), Percent: percent}
type JcpProgress struct {
JcpError error
Progress copier.CopierProgress
}

func (jcpState JcpState) GetStateForTransfer(src string, dest string) copier.CopierState {
if jcpState.CopyStates[src] == nil {
return copier.CopierState{}
}

copierState, err := anyToType[copier.CopierState](jcpState.CopyStates[src][dest].OpaqueState)
if err != nil {
panic("Corrupted state. Please delete state.json cache and try again")
}

if copierState.State != nil {
switch jcpState.CopyStates[src][dest].CopierType {
case BlockCopier:
copierState.State, err = anyToType[copier.BlockCopierState](copierState.State)
if err != nil {
panic("Corrupted state. Please delete state.json cache and try again")
}
default:
panic("Corrupted state. Please delete state.json cache and try again")
}
}

return copierState
}
25 changes: 22 additions & 3 deletions state/statemanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ import (
"path"
"sync"
"time"

"github.com/ben-ha/jcp/logic"
)

var stateManagerMutex sync.Mutex
Expand Down Expand Up @@ -59,7 +57,7 @@ func (copierState *JcpState) saveState(fileName string) {
}
}

func (copierState *JcpState) Update(progress logic.JcpProgress) {
func (copierState *JcpState) Update(progress JcpProgress) {
stateManagerMutex.Lock()
defer stateManagerMutex.Unlock()

Expand Down Expand Up @@ -103,3 +101,24 @@ func (copyState JcpCopyState) ShouldKeep() bool {

return true
}

func MakeNewCopyState(progress JcpProgress) JcpCopyState {
percent := float64(progress.Progress.BytesTransferred) / float64(progress.Progress.Size)
return JcpCopyState{OpaqueState: progress.Progress.OpaqueState, CopierType: BlockCopier, LastUpdate: time.Now(), Percent: percent}
}

func anyToType[T any](input any) (T, error) {
var empty T
jsonVal, err := json.Marshal(input)
if err != nil {
return empty, err
}

var obj T
err = json.Unmarshal(jsonVal, &obj)
if err != nil {
return empty, err
}

return obj, nil
}

0 comments on commit 4f336bf

Please sign in to comment.