Skip to content

feat: Implement Streaming Processing for Large File Operations #89

@pranavkonde

Description

@pranavkonde

Implement Streaming Processing for Large File Operations

Description:

The current implementation of tlock's file processing could be improved to handle large files more efficiently. Currently, while the code uses io.Reader and io.Writer interfaces, the underlying encryption/decryption process could benefit from chunked processing and progress reporting for better memory management and user experience.

Current Implementation:

  1. Basic streaming with io.Copy:
// tlock.go
func (t Tlock) Encrypt(dst io.Writer, src io.Reader, roundNumber uint64) (err error) {
    w, err := age.Encrypt(dst, &Recipient{network: t.network, roundNumber: roundNumber})
    if err != nil {
        return fmt.Errorf("hybrid encrypt: %w", err)
    }
    
    if _, err := io.Copy(w, src); err != nil {
        return fmt.Errorf("write: %w", err)
    }
    return nil
}
  1. File handling in CLI:
// cmd/tle/tle.go
var src io.Reader = os.Stdin
if name := flag.Arg(0); name != "" && name != "-" {
    f, err := os.OpenFile(name, os.O_RDONLY, 0600)
    if err != nil {
        return fmt.Errorf("failed to open input file %q: %v", name, err)
    }
    defer func(f *os.File) {
        err = f.Close()
    }(f)
    src = f
}

Issues to Address:

  1. Memory Usage: While io.Copy is used, the underlying encryption/decryption process might still buffer large amounts of data in memory.
  2. Progress Tracking: No way to monitor progress for large file operations.
  3. Resource Management: No explicit control over buffer sizes or memory limits.
  4. User Feedback: Limited feedback during long-running operations.

Proposed Changes:

  1. Implement Chunked Processing:
type ChunkedProcessor struct {
    ChunkSize int
    Progress  func(processed, total int64)
    Buffer    *bytes.Buffer
}

func (t Tlock) EncryptChunked(dst io.Writer, src io.Reader, roundNumber uint64, opts ChunkOptions) error {
    chunk := make([]byte, opts.ChunkSize)
    for {
        n, err := src.Read(chunk)
        if err == io.EOF {
            break
        }
        if err != nil {
            return fmt.Errorf("read chunk: %w", err)
        }
        
        // Process chunk
        if err := t.encryptChunk(dst, chunk[:n]); err != nil {
            return fmt.Errorf("process chunk: %w", err)
        }
        
        // Report progress
        if opts.Progress != nil {
            opts.Progress(processed, total)
        }
    }
    return nil
}
  1. Add Progress Reporting Interface:
type ProgressReporter interface {
    OnProgress(processed, total int64)
    OnComplete()
    OnError(err error)
}

type ProgressOptions struct {
    Reporter ProgressReporter
    Interval time.Duration
}
  1. Implement Memory-Efficient Buffering:
type BufferPool struct {
    pool sync.Pool
    maxSize int64
}

func NewBufferPool(maxSize int64) *BufferPool {
    return &BufferPool{
        pool: sync.Pool{
            New: func() interface{} {
                return bytes.NewBuffer(make([]byte, 0, defaultChunkSize))
            },
        },
        maxSize: maxSize,
    }
}
  1. Add CLI Progress Bar:
type CliProgress struct {
    bar     *progressbar.ProgressBar
    started time.Time
}

func (p *CliProgress) OnProgress(processed, total int64) {
    p.bar.Set64(processed)
    p.bar.Describe(fmt.Sprintf("Processing: %.1f MB/s", 
        float64(processed)/time.Since(p.started).Seconds()/1024/1024))
}

Implementation Details:

  1. Configuration Options:
type ProcessingOptions struct {
    ChunkSize      int
    MaxBufferSize  int64
    ProgressReport bool
    BufferPool     *BufferPool
}
  1. Enhanced Error Handling:
type ProcessingError struct {
    Phase    string
    Chunk    int
    Position int64
    Err      error
}
  1. Memory Management:
  • Implement configurable chunk sizes (default: 1MB)
  • Add buffer pooling for reuse
  • Implement memory usage limits
  • Add garbage collection hints
  1. Progress Reporting:
  • Add file size estimation
  • Implement progress callback system
  • Add ETA calculation
  • Add transfer speed calculation

CLI Enhancements:

  1. Add new flags for chunked processing:
flag.Int64Var(&opts.ChunkSize, "chunk-size", 1024*1024, "size of chunks for processing")
flag.BoolVar(&opts.ShowProgress, "progress", true, "show progress bar")
  1. Implement progress visualization:
$ tle -e -D 30d large_file.dat
Processing large_file.dat: [===>    ] 45% (123.4 MB/s) ETA: 2m15s

Benefits:

  1. Reduced Memory Usage:

    • Controlled buffer sizes
    • Efficient memory reuse
    • Prevention of memory spikes
  2. Better User Experience:

    • Progress visibility
    • Speed information
    • Time estimates
    • Error reporting
  3. Improved Performance:

    • Optimized buffer management
    • Reduced GC pressure
    • Better resource utilization

Testing:

Add new test cases:

  • Test with various file sizes
  • Verify memory usage patterns
  • Test progress reporting accuracy
  • Validate chunked processing correctness
  • Test error handling in chunked mode

Acceptance Criteria:

  • Implement chunked processing
  • Add progress reporting
  • Add memory management
  • Update CLI interface
  • Add tests for new functionality
  • Document new features
  • Maintain backward compatibility
  • Verify memory usage improvement

Performance Targets:

  • Process 1GB+ files without significant memory growth
  • Maintain consistent memory usage regardless of file size
  • Keep memory usage under 2x chunk size
  • Provide accurate progress updates at least every second

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions