Skip to content

Commit

Permalink
Share properties along the multiple created LogHandlers
Browse files Browse the repository at this point in the history
  • Loading branch information
philippzagar committed Aug 8, 2021
1 parent 0c62a98 commit 6b39175
Show file tree
Hide file tree
Showing 5 changed files with 193 additions and 155 deletions.
7 changes: 7 additions & 0 deletions Sources/LoggingELK/LogstashLogHandler+Error.swift
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,12 @@ extension LogstashLogHandler {
The passed maximumLogStorageSize is too low. It needs to be at least twice as much \
(spoken in terms of the power of two) as the passed minimumLogStorageSize.
"""

case notYetSetup = """
The static .setup() function must be called before the LogstashLogHandler is intialized
via LoggingSystem.bootrap(...). \
The reason for that is the Background Activity Logger which can't use the LogstashLogHandler \
as a backend because it would result in an infinite recursion in case of an error.
"""
}
}
9 changes: 8 additions & 1 deletion Sources/LoggingELK/LogstashLogHandler+HTTPFormatting.swift
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,14 @@ extension LogstashLogHandler {

/// Creates the HTTP request which stays constant during the entire lifetime of the `LogstashLogHandler`
/// Sets some default headers, eg. a dynamically adjusted "Keep-Alive" header
func createHTTPRequest() -> HTTPClient.Request {
static func createHTTPRequest() -> HTTPClient.Request {
guard let useHTTPS = Self.useHTTPS,
let hostname = Self.hostname,
let port = Self.port,
let uploadInterval = Self.uploadInterval else {
fatalError(Error.notYetSetup.rawValue)
}

var httpRequest: HTTPClient.Request

do {
Expand Down
79 changes: 46 additions & 33 deletions Sources/LoggingELK/LogstashLogHandler+Uploading.swift
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,13 @@ import AsyncHTTPClient

extension LogstashLogHandler {
/// Schedules the `uploadLogData` function with a certain `TimeAmount` as `initialDelay` and `delay` (delay between repeating the task)
func scheduleUploadTask(initialDelay: TimeAmount) -> RepeatedTask {
eventLoopGroup
static func scheduleUploadTask(initialDelay: TimeAmount) -> RepeatedTask {
guard let eventLoopGroup = Self.eventLoopGroup,
let uploadInterval = Self.uploadInterval else {
fatalError(Error.notYetSetup.rawValue)
}

return eventLoopGroup
.next()
.scheduleRepeatedTask(
initialDelay: initialDelay,
Expand All @@ -27,34 +32,44 @@ extension LogstashLogHandler {
/// This function is thread-safe and designed to only block the stored log data `ByteBuffer`
/// for a short amount of time (the time it takes to duplicate this bytebuffer). Then, the "original"
/// stored log data `ByteBuffer` is freed and the lock is lifted
func uploadLogData(_ task: RepeatedTask? = nil) { // swiftlint:disable:this cyclomatic_complexity function_body_length
guard self.byteBuffer.readableBytes != 0 else {
static func uploadLogData(_ task: RepeatedTask? = nil) { // swiftlint:disable:this cyclomatic_complexity function_body_length
guard let _ = Self.byteBuffer,
let _ = Self.totalByteBufferSize,
let maximumTotalLogStorageSize = Self.maximumTotalLogStorageSize,
let eventLoopGroup = Self.eventLoopGroup,
let httpClient = Self.httpClient,
let hostname = Self.hostname,
let port = Self.port else {
fatalError(Error.notYetSetup.rawValue)
}

guard Self.byteBuffer?.readableBytes != 0 else {
return
}

// If total byte buffer size is exceeded, wait until the size is decreased again
if self.totalByteBufferSize + self.byteBuffer.capacity > self.maximumTotalLogStorageSize {
self.semaphoreCounter -= 1
self.semaphore.wait()
if totalByteBufferSize! + Self.byteBuffer!.capacity > maximumTotalLogStorageSize {
Self.semaphoreCounter -= 1
Self.semaphore.wait()
}

self.byteBufferLock.lock()
Self.byteBufferLock.lock()

self.totalByteBufferSize += self.byteBuffer.capacity
totalByteBufferSize! += Self.byteBuffer!.capacity

// Copy log data into a temporary byte buffer
// This helps to prevent a stalling request if more than the max. buffer size
// log messages are created during uploading of the "old" log data
var tempByteBuffer = ByteBufferAllocator().buffer(capacity: self.byteBuffer.readableBytes)
tempByteBuffer.writeBuffer(&self.byteBuffer)
var tempByteBuffer = ByteBufferAllocator().buffer(capacity: Self.byteBuffer!.readableBytes)
tempByteBuffer.writeBuffer(&Self.byteBuffer!)

self.byteBuffer.clear()
Self.byteBuffer?.clear()

self.byteBufferLock.unlock(withValue: false)
Self.byteBufferLock.unlock(withValue: false)

// Setup of HTTP requests that is used for all transmissions
if self.httpRequest == nil {
self.httpRequest = createHTTPRequest()
if Self.httpRequest == nil {
Self.httpRequest = Self.createHTTPRequest()
}

var pendingHTTPRequests: [EventLoopFuture<HTTPClient.Response>] = []
Expand All @@ -66,62 +81,60 @@ extension LogstashLogHandler {
fatalError("Error reading log data from byte buffer")
}

guard var httpRequest = self.httpRequest else {
guard var httpRequest = Self.httpRequest else {
fatalError("HTTP Request not properly initialized")
}

httpRequest.body = .byteBuffer(logData)

pendingHTTPRequests.append(
self.httpClient.execute(request: httpRequest)
httpClient.execute(request: httpRequest)
)
}

// Wait until all HTTP requests finished, then signal waiting threads
_ = EventLoopFuture<HTTPClient.Response>
.whenAllComplete(pendingHTTPRequests, on: self.eventLoopGroup.next())
.whenAllComplete(pendingHTTPRequests, on: eventLoopGroup.next())
.map { results in
_ = results.map { result in
switch result {
case .failure(let error):
self.backgroundActivityLogger.log(
Self.backgroundActivityLogger?.log(
level: .warning,
"Error during sending logs to Logstash - \(error)",
metadata: [
"label": .string(self.label),
"hostname": .string(self.hostname),
"port": .string(String(describing: self.port))
"hostname": .string(hostname),
"port": .string("\(port)")
]
)
case .success(let response):
if response.status != .ok {
self.backgroundActivityLogger.log(
Self.backgroundActivityLogger?.log(
level: .warning,
"Error during sending logs to Logstash - \(String(describing: response.status))",
metadata: [
"label": .string(self.label),
"hostname": .string(self.hostname),
"port": .string(String(describing: self.port))
"hostname": .string(hostname),
"port": .string("\(port)")
]
)
}
}
}

self.byteBufferLock.lock()
Self.byteBufferLock.lock()

// Once all HTTP requests are completed, signal that new memory space is available
if self.totalByteBufferSize <= self.maximumTotalLogStorageSize {
if Self.totalByteBufferSize! <= maximumTotalLogStorageSize {
// Only signal if the semaphore count is below 0 (so at least one thread is blocked)
if self.semaphoreCounter < 0 {
self.semaphoreCounter += 1
self.semaphore.signal()
if Self.semaphoreCounter < 0 {
Self.semaphoreCounter += 1
Self.semaphore.signal()
}
}

self.totalByteBufferSize -= self.byteBuffer.capacity
Self.totalByteBufferSize! -= Self.byteBuffer!.capacity

self.byteBufferLock.unlock()
Self.byteBufferLock.unlock()
}
}
}
Loading

0 comments on commit 6b39175

Please sign in to comment.