Skip to content

Commit

Permalink
Issue-2734 - Clamp buffer to maximum upon large write operation (#2745)
Browse files Browse the repository at this point in the history
### Motivation:

Fix Issue 2734

### Modifications:
- Added a function to clamp storage by copying bytes and setting new
capacity of storage
- Adding a function to clamp the capacity of ByteBuffer
- Added the ability to specify he maxBufferCapacity to
MessageToByteHandler
### Result:


Once a write message that is larger than the capacity of the
MessageToByteHandler's maxBufferCapacity, it clamps the byteBuffer down.

---------

Co-authored-by: Johannes Weiss <[email protected]>
Co-authored-by: Cory Benfield <[email protected]>
Co-authored-by: Franz Busch <[email protected]>
Co-authored-by: Ali Ali <[email protected]>
  • Loading branch information
5 people authored Sep 18, 2024
1 parent c9781cf commit 8307ad6
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 2 deletions.
29 changes: 27 additions & 2 deletions Sources/NIOCore/ByteBuffer-core.swift
Original file line number Diff line number Diff line change
Expand Up @@ -371,11 +371,12 @@ public struct ByteBuffer {
@inline(never)
mutating func _copyStorageAndRebase(capacity: _Capacity, resetIndices: Bool = false) {
// This math has to be very careful, because we already know that in some call paths _readerIndex exceeds 1 << 24, and lots of this math
// is in UInt32 space. It's not hard for us to trip some of these conditions. As a result, I've heavily commented this
// fairly heavily to explain the math.
// is in UInt32 space. It's not hard for us to trip some of these conditions.
// As a result, I've heavily commented this function to explain the math.

// Step 1: If we are resetting the indices, we need to slide the allocation by at least the current value of _readerIndex, so the new
// value of _readerIndex will be 0. Otherwise we can leave them as they are.
// Resetting the indices will also ensure that leading unwritten bytes are not copied.
let indexRebaseAmount = resetIndices ? self._readerIndex : 0

// Step 2: We also want to only copy the bytes within the slice, and move them to index 0. As a result, we have this
Expand Down Expand Up @@ -887,6 +888,30 @@ public struct ByteBuffer {
return true
}

/// The `ByteBuffer` will successfully be shrunk if the requested capacity is less than the current capacity,
/// and the requested capacity is greater than or equal to the number of readable bytes in the buffer.
/// If either condition is not true, the buffer will not be shrunk.
///
/// - Parameter desiredCapacity: The desired capacity for the buffers capacity to be shrunken to
/// - Returns: Bool indicating whether the buffer capacity has been shrunk to the desiredCapacity.
@inlinable
@discardableResult public mutating func shrinkBufferCapacity(to desiredCapacity: Int) -> Bool {
let desiredCapacity = ByteBuffer.roundUpToUsableCapacity(_toCapacity(desiredCapacity))
guard desiredCapacity < self.capacity, desiredCapacity >= self.readableBytes else {
return false
}
self._copyStorageAndRebase(capacity: desiredCapacity, resetIndices: true)
return true
}

/// Returns size of capacity with optimal padding
/// - Parameter initialCapacity: Capacity that needs expansion with padding
/// - Returns: Capacity with calculated padding
@inlinable
static func roundUpToUsableCapacity(_ initialCapacity: UInt32) -> UInt32 {
initialCapacity == 0 ? 0 : initialCapacity.nextPowerOf2ClampedToMax()
}

/// The reader index or the number of bytes previously read from this `ByteBuffer`. `readerIndex` is `0` for a
/// newly allocated `ByteBuffer`.
@inlinable
Expand Down
14 changes: 14 additions & 0 deletions Sources/NIOCore/Codec.swift
Original file line number Diff line number Diff line change
Expand Up @@ -793,9 +793,16 @@ public final class MessageToByteHandler<Encoder: MessageToByteEncoder>: ChannelO
private var state: State = .notInChannelYet
private let encoder: Encoder
private var buffer: ByteBuffer? = nil
private let desiredBufferCapacity: Int?

public init(_ encoder: Encoder, desiredBufferCapacity: Int) {
self.encoder = encoder
self.desiredBufferCapacity = desiredBufferCapacity
}

public init(_ encoder: Encoder) {
self.encoder = encoder
self.desiredBufferCapacity = nil
}
}

Expand Down Expand Up @@ -844,4 +851,11 @@ extension MessageToByteHandler {
context.fireErrorCaught(error)
}
}

public func flush(context: ChannelHandlerContext) {
if let desiredBufferCapacity = self.desiredBufferCapacity {
self.buffer?.shrinkBufferCapacity(to: desiredBufferCapacity)
}
context.flush()
}
}
56 changes: 56 additions & 0 deletions Tests/NIOCoreTests/ByteBufferTest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -1859,6 +1859,62 @@ class ByteBufferTest: XCTestCase {
XCTAssertEqual(0, buffer2.readableBytes)
}

func testShrinkBufferCapacityWithNoLeadingUnwrittenBytes() {
let desiredCapacity = 1024
var buffer = self.allocator.buffer(capacity: 512)

// For any item, it should not shrink buffer capacity to a value larger than the current buffer capacity
buffer.clear()
buffer.writeString("Any item")
XCTAssertFalse(buffer.shrinkBufferCapacity(to: 2048))
XCTAssertEqual(buffer.capacity, 512)

// If desired capacity are less than or equal to buffer capacity, should not shrink
buffer.clear()
buffer.writeString(String(repeating: "x", count: desiredCapacity))
XCTAssertEqual(buffer.capacity, 1024) // Before
XCTAssertFalse(buffer.shrinkBufferCapacity(to: desiredCapacity))
XCTAssertEqual(buffer.capacity, 1024) // After

// If desiredCapacity is less than readable bytes, do not shrink
buffer.clear()
buffer.writeString(String(repeating: "x", count: desiredCapacity + 1))
XCTAssertEqual(buffer.capacity, 2048)
XCTAssertFalse(buffer.shrinkBufferCapacity(to: desiredCapacity))
XCTAssertEqual(buffer.capacity, 2048)

// If desired capacity is greater than or equal the readable bytes and less than buffer capacity, should shrink
buffer.clear()
buffer.writeString(String(repeating: "x", count: desiredCapacity))
XCTAssertEqual(buffer.capacity, 2048)
XCTAssertTrue(buffer.shrinkBufferCapacity(to: desiredCapacity))
XCTAssertEqual(buffer.capacity, 1024)
}

func testShrinkBufferCapacityWithLeadingUnwrittenBytes() {
var buffer = self.allocator.buffer(capacity: 16384)
buffer.moveWriterIndex(to: 16000)
buffer.moveReaderIndex(to: 16000)
buffer.writeString("WW")
buffer.shrinkBufferCapacity(to: 4)
XCTAssertEqual("WW", String(buffer: buffer))

// If readable bytes is exactly the same as buffer capacity shrunken to
buffer = self.allocator.buffer(capacity: 16)
buffer.moveWriterIndex(to: 8)
buffer.moveReaderIndex(to: 8)
buffer.writeString("WWWWWWWW") // 8 bytes written
buffer.shrinkBufferCapacity(to: 4) // Invisible padding makes this 8 bytes
XCTAssertEqual("WWWWWWWW", String(buffer: buffer)) // All 8 bytes are returned!
}

func testExpansionOfCapacityWithPadding() throws {
XCTAssertEqual(ByteBuffer.roundUpToUsableCapacity(12), 16)
XCTAssertEqual(ByteBuffer.roundUpToUsableCapacity(0), 0)
XCTAssertEqual(ByteBuffer.roundUpToUsableCapacity(UInt32.min), 0)
XCTAssertEqual(ByteBuffer.roundUpToUsableCapacity(UInt32.max), UInt32.max)
}

func testDumpBytesFormat() throws {
self.buf.clear()
for f in UInt8.min...UInt8.max {
Expand Down

0 comments on commit 8307ad6

Please sign in to comment.