Skip to content

Commit ebb8304

Browse files
committedFeb 11, 2018
Fix stream splitting.
1 parent c88b631 commit ebb8304

File tree

2 files changed

+157
-17
lines changed

2 files changed

+157
-17
lines changed
 

‎writer.go

+10-1
Original file line numberDiff line numberDiff line change
@@ -320,6 +320,11 @@ func NewSplitter(fragments chan<- Fragment, mode Mode, maxSize uint) (Writer, er
320320
return nil, fmt.Errorf("dedup: unknown mode")
321321
}
322322

323+
w.flush = func(w *writer) error {
324+
w.split(w)
325+
return w.err
326+
}
327+
323328
if w.maxSize < MinBlockSize {
324329
return nil, ErrSizeTooSmall
325330
}
@@ -812,10 +817,14 @@ func (e *entWriter) write(w *writer, b []byte) (int, error) {
812817
if len(b2)+e.histLen > e.minFragment {
813818
b2 = b2[:e.minFragment-e.histLen]
814819
}
820+
off := w.off
815821
for i := range b2 {
816-
e.hist[b[i]]++
822+
v := b2[i]
823+
e.hist[v]++
824+
w.cur[off+i] = v
817825
}
818826
e.histLen += len(b2)
827+
w.off += len(b2)
819828
b = b[len(b2):]
820829
}
821830
if len(b) == 0 {

‎writer_test.go

+147-16
Original file line numberDiff line numberDiff line change
@@ -119,8 +119,8 @@ func TestFixedWriterLimit(t *testing.T) {
119119
r.Close()
120120
}
121121

122-
func TestFixedFragment(t *testing.T) {
123-
const totalinput = 10 << 20
122+
func TestFixedFragmentSplitter(t *testing.T) {
123+
const totalinput = 10<<20 + 500
124124
input := getBufferSize(totalinput)
125125

126126
const size = 64 << 10
@@ -137,12 +137,18 @@ func TestFixedFragment(t *testing.T) {
137137
count := make(chan int, 0)
138138
go func() {
139139
n := 0
140+
off := 0
140141
for f := range out {
142+
if !bytes.Equal(b[off:off+len(f.Payload)], f.Payload) {
143+
panic(fmt.Sprintf("output mismatch at offset %d", n))
144+
}
145+
off += len(f.Payload)
141146
if f.New {
142147
n += len(f.Payload)
143148
}
144149
}
145150
count <- n
151+
count <- off
146152
}()
147153
input = bytes.NewBuffer(b)
148154
w, err := dedup.NewSplitter(out, dedup.ModeFixed, size)
@@ -155,8 +161,12 @@ func TestFixedFragment(t *testing.T) {
155161
t.Fatal(err)
156162
}
157163
datalen := <-count
164+
gotLen := <-count
158165
removed := ((totalinput) - datalen) / size
159166

167+
if gotLen != totalinput {
168+
t.Fatalf("did not get all data, want %d, got %d", totalinput, gotLen)
169+
}
160170
t.Log("Data size:", datalen)
161171
t.Log("Removed", removed, "blocks")
162172
// We should get at least 50 blocks
@@ -168,6 +178,124 @@ func TestFixedFragment(t *testing.T) {
168178
}
169179
}
170180

181+
func TestDynamicFragmentSplitter(t *testing.T) {
182+
const totalinput = 10 << 20
183+
input := getBufferSize(totalinput)
184+
185+
const size = 64 << 10
186+
b := input.Bytes()
187+
// Create some duplicates
188+
for i := 0; i < 50; i++ {
189+
// Read from 10 first blocks
190+
src := b[(i%10)*size : (i%10)*size+size]
191+
// Write into the following ones
192+
dst := b[(10+i)*size : (i+10)*size+size]
193+
copy(dst, src)
194+
}
195+
out := make(chan dedup.Fragment, 10)
196+
count := make(chan int, 0)
197+
go func() {
198+
n := 0
199+
off := 0
200+
for f := range out {
201+
if !bytes.Equal(b[off:off+len(f.Payload)], f.Payload) {
202+
panic(fmt.Sprintf("output mismatch at offset %d", n))
203+
}
204+
off += len(f.Payload)
205+
if f.New {
206+
n += len(f.Payload)
207+
}
208+
}
209+
count <- n
210+
count <- off
211+
}()
212+
input = bytes.NewBuffer(b)
213+
w, err := dedup.NewSplitter(out, dedup.ModeDynamic, size)
214+
if err != nil {
215+
t.Fatal(err)
216+
}
217+
io.Copy(w, input)
218+
err = w.Close()
219+
if err != nil {
220+
t.Fatal(err)
221+
}
222+
datalen := <-count
223+
gotLen := <-count
224+
removed := ((totalinput) - datalen) / size
225+
226+
if gotLen != totalinput {
227+
t.Fatalf("did not get all data, want %d, got %d", totalinput, gotLen)
228+
}
229+
t.Log("Data size:", datalen)
230+
t.Log("Removed", removed, "blocks")
231+
// We should get at least 50 blocks
232+
if removed < 45 {
233+
t.Fatal("didn't remove at least 45 blocks")
234+
}
235+
if removed > 60 {
236+
t.Fatal("removed unreasonable high amount of blocks")
237+
}
238+
}
239+
240+
func TestDynamicEntropySplitter(t *testing.T) {
241+
const totalinput = 10 << 20
242+
input := getBufferSize(totalinput)
243+
244+
const size = 64 << 10
245+
b := input.Bytes()
246+
// Create some duplicates
247+
for i := 0; i < 50; i++ {
248+
// Read from 10 first blocks
249+
src := b[(i%10)*size : (i%10)*size+size]
250+
// Write into the following ones
251+
dst := b[(10+i)*size : (i+10)*size+size]
252+
copy(dst, src)
253+
}
254+
out := make(chan dedup.Fragment, 10)
255+
count := make(chan int, 0)
256+
go func() {
257+
n := 0
258+
off := 0
259+
for f := range out {
260+
if !bytes.Equal(b[off:off+len(f.Payload)], f.Payload) {
261+
panic(fmt.Sprintf("output mismatch at offset %d", n))
262+
}
263+
off += len(f.Payload)
264+
if f.New {
265+
n += len(f.Payload)
266+
}
267+
}
268+
count <- n
269+
count <- off
270+
}()
271+
input = bytes.NewBuffer(b)
272+
w, err := dedup.NewSplitter(out, dedup.ModeDynamic, size)
273+
if err != nil {
274+
t.Fatal(err)
275+
}
276+
io.Copy(w, input)
277+
err = w.Close()
278+
if err != nil {
279+
t.Fatal(err)
280+
}
281+
datalen := <-count
282+
gotLen := <-count
283+
removed := ((totalinput) - datalen) / size
284+
285+
if gotLen != totalinput {
286+
t.Fatalf("did not get all data, want %d, got %d", totalinput, gotLen)
287+
}
288+
t.Log("Data size:", datalen)
289+
t.Log("Removed", removed, "blocks")
290+
// We should get at least 45 blocks
291+
if removed < 45 {
292+
t.Fatal("didn't remove at least 50 blocks")
293+
}
294+
if removed > 60 {
295+
t.Fatal("removed unreasonable high amount of blocks")
296+
}
297+
}
298+
171299
func TestDynamicWriter(t *testing.T) {
172300
idx := bytes.Buffer{}
173301
data := bytes.Buffer{}
@@ -704,7 +832,7 @@ func ExampleNewSplitter() {
704832
}()
705833

706834
// This is our input:
707-
input := bytes.NewBuffer(make([]byte, 50000))
835+
input := bytes.NewBuffer(make([]byte, 50050))
708836

709837
// Create a new writer, with each block being 1000 bytes,
710838
w, err := dedup.NewSplitter(out, dedup.ModeFixed, 1000)
@@ -722,10 +850,11 @@ func ExampleNewSplitter() {
722850

723851
// Let us inspect what was written:
724852
fmt.Println("Blocks:", <-info)
853+
// Size of one (repeated) block + 50 bytes for last.
725854
fmt.Println("Data size:", <-info)
726855

727-
// OUTPUT: Blocks: 50
728-
// Data size: 1000
856+
// OUTPUT: Blocks: 51
857+
// Data size: 1050
729858
}
730859

731860
// This will deduplicate a file
@@ -784,6 +913,7 @@ func ExampleNewSplitter_file() {
784913
// Got OLD fragment #7, size 165, hash:6fb05a63e28a1bb2e880e051940f517115e7b16c
785914
// Got OLD fragment #8, size 852, hash:6671826ffff6edd32951a0e774efccb5101ba629
786915
// Got NEW fragment #9, size 2380, hash:1507aa13e215517ce982b9235a0221018128ed4e
916+
// Got NEW fragment #10, size 71, hash:f262fcf4af26ee75ff3045db2af21f2acca235cd
787917
}
788918

789919
// This will deduplicate a file
@@ -832,17 +962,18 @@ func ExampleNewSplitter_entropy() {
832962
wg.Wait()
833963

834964
// OUTPUT:
835-
// Got NEW fragment #0, size 646, hash:5435bfaa1d5c9301798fdfea3112c94306e2cdf3
836-
// Got NEW fragment #1, size 926, hash:a3251db1c56347e4c7b245f35fc4c9b034418900
837-
// Got NEW fragment #2, size 919, hash:9d68759ef33ae919b656faf52bb1177e803f810b
838-
// Got NEW fragment #3, size 1326, hash:c272c26dff010417ca2120a8e82addfdadb4efeb
839-
// Got NEW fragment #4, size 1284, hash:9bbe891ccb1b141e0e122110e730e8df9743331e
840-
// Got NEW fragment #5, size 1220, hash:5019f56fa9395060fbe2e957ad518a35cd667f9b
841-
// Got NEW fragment #6, size 3509, hash:e0d7c8acfdd5b399a92b5e495a0794ffa842ee73
842-
// Got OLD fragment #7, size 919, hash:9d68759ef33ae919b656faf52bb1177e803f810b
843-
// Got OLD fragment #8, size 1326, hash:c272c26dff010417ca2120a8e82addfdadb4efeb
844-
// Got OLD fragment #9, size 1284, hash:9bbe891ccb1b141e0e122110e730e8df9743331e
845-
// Got OLD fragment #10, size 1220, hash:5019f56fa9395060fbe2e957ad518a35cd667f9b
965+
//Got NEW fragment #0, size 521, hash:0c5989843e85f31aed26f249bd203240dd72f77a
966+
//Got NEW fragment #1, size 1563, hash:308ff2e0b4776c2a08fe549422c7ebfbf646bb22
967+
//Got NEW fragment #2, size 919, hash:9d68759ef33ae919b656faf52bb1177e803f810b
968+
//Got NEW fragment #3, size 1326, hash:c272c26dff010417ca2120a8e82addfdadb4efeb
969+
//Got NEW fragment #4, size 1284, hash:9bbe891ccb1b141e0e122110e730e8df9743331e
970+
//Got NEW fragment #5, size 1220, hash:5019f56fa9395060fbe2e957ad518a35cd667f9b
971+
//Got NEW fragment #6, size 3509, hash:e0d7c8acfdd5b399a92b5e495a0794ffa842ee73
972+
//Got OLD fragment #7, size 919, hash:9d68759ef33ae919b656faf52bb1177e803f810b
973+
//Got OLD fragment #8, size 1326, hash:c272c26dff010417ca2120a8e82addfdadb4efeb
974+
//Got OLD fragment #9, size 1284, hash:9bbe891ccb1b141e0e122110e730e8df9743331e
975+
//Got OLD fragment #10, size 1220, hash:5019f56fa9395060fbe2e957ad518a35cd667f9b
976+
//Got NEW fragment #11, size 1569, hash:5ae2760535662c13b336d1ae4a0a7fdcba789d83
846977
}
847978

848979
// This example will show how to write data to two files.

0 commit comments

Comments
 (0)
Please sign in to comment.