Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions src/Config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,9 @@ export interface Config {
dataLogWriter: {
dirName: string
maxLogFiles: number
maxReceiptEntries: number
maxCycleEntries: number
maxOriginalTxEntries: number
maxReceiptBytes: number
maxCycleBytes: number
maxOriginalTxBytes: number
}
experimentalSnapshot: boolean
failedBucketsDir: string
Expand Down Expand Up @@ -175,9 +175,9 @@ let config: Config = {
dataLogWriter: {
dirName: 'data-logs',
maxLogFiles: 10,
maxReceiptEntries: 10000, // Should be >= max TPS experienced by the network.
maxCycleEntries: 500,
maxOriginalTxEntries: 10000, // Should be >= max TPS experienced by the network.
maxReceiptBytes: 1024 * 1024, // 1MB - Should be >= max TPS * avg receipt size
maxCycleBytes: 50 * 1024, // 50KB - cycles are smaller
maxOriginalTxBytes: 1024 * 1024, // 1MB - Should be >= max TPS * avg tx size
},
experimentalSnapshot: true,
failedBucketsDir: 'failed-buckets',
Expand Down
40 changes: 19 additions & 21 deletions src/Data/DataLogWriter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class DataLogWriter {
dataLogWriteStream: WriteStream | null
dataWriteIndex: number
dataLogFilePath: string
totalNumberOfEntries: number
totalNumberOfBytes: number
activeLogFileName: string
activeLogFilePath: string
writeQueue: string[]
Expand All @@ -28,7 +28,7 @@ class DataLogWriter {
constructor(
public dataName: string,
public logCounter: number,
public maxNumberEntriesPerLog: number
public maxNumberBytesPerLog: number
) {
this.logDir = `${LOG_WRITER_CONFIG.dirName}/${config.ARCHIVER_IP}_${config.ARCHIVER_PORT}`
this.maxLogCounter = LOG_WRITER_CONFIG.maxLogFiles
Expand All @@ -37,7 +37,7 @@ class DataLogWriter {
this.activeLogFileName = `active-${dataName}-log.txt`
this.activeLogFilePath = path.join(this.logDir, this.activeLogFileName)
this.dataLogFilePath = path.join(this.logDir, `${dataName}-log${logCounter}.txt`)
this.totalNumberOfEntries = 0
this.totalNumberOfBytes = 0
this.writeQueue = []
this.isWriting = false
}
Expand All @@ -58,18 +58,16 @@ class DataLogWriter {
console.log(`> DataLogWriter: Active log file: ${this.dataName}-log${this.logCounter}.txt`)
this.dataLogFilePath = path.join(this.logDir, `${this.dataName}-log${this.logCounter}.txt`)
// eslint-disable-next-line security/detect-non-literal-fs-filename
const data = await fs.readFile(this.dataLogFilePath, {
encoding: 'utf8',
})
this.totalNumberOfEntries += data.split('\n').length - 1
console.log(`> DataLogWriter: Total ${this.dataName} Entries: ${this.totalNumberOfEntries}`)
const stats = await fs.stat(this.dataLogFilePath)
this.totalNumberOfBytes += stats.size
console.log(`> DataLogWriter: Total ${this.dataName} Bytes: ${this.totalNumberOfBytes}`)
// eslint-disable-next-line security/detect-non-literal-fs-filename
this.dataLogWriteStream = createWriteStream(this.dataLogFilePath, { flags: 'a' })
if (this.totalNumberOfEntries >= this.maxNumberEntriesPerLog) {
// Finish the log file with the total number of entries.
await this.appendData(`End: Number of entries: ${this.totalNumberOfEntries}\n`)
if (this.totalNumberOfBytes >= this.maxNumberBytesPerLog) {
// Finish the log file with the total number of bytes.
await this.appendData(`End: Number of bytes: ${this.totalNumberOfBytes}\n`)
await this.endStream()
this.totalNumberOfEntries = 0
this.totalNumberOfBytes = 0
await this.rotateLogFile()
await this.setActiveLog()
}
Expand Down Expand Up @@ -158,17 +156,17 @@ class DataLogWriter {
while (this.writeQueue.length) {
try {
for (let i = 0; i < this.writeQueue.length; i++) {
if (this.totalNumberOfEntries === this.maxNumberEntriesPerLog) {
await this.appendData(`End: Number of entries: ${this.totalNumberOfEntries}\n`)
if (this.totalNumberOfBytes >= this.maxNumberBytesPerLog) {
await this.appendData(`End: Number of bytes: ${this.totalNumberOfBytes}\n`)
await this.endStream()
this.totalNumberOfEntries = 0
this.totalNumberOfBytes = 0
await this.rotateLogFile()
await this.setActiveLog()
}
// eslint-disable-next-line security/detect-object-injection
await this.appendData(this.writeQueue[i])
this.dataWriteIndex += 1
this.totalNumberOfEntries += 1
this.totalNumberOfBytes += Buffer.byteLength(this.writeQueue[i], 'utf8')
}
// console.log('-->> Write queue length: ', this.writeQueue.length)
this.writeQueue.splice(0, this.dataWriteIndex)
Expand Down Expand Up @@ -212,7 +210,7 @@ class DataLogWriter {
return new Promise((resolve, reject) => {
try {
this.dataLogWriteStream!.end(() => {
console.log(`✅ Finished writing ${this.totalNumberOfEntries}.`)
console.log(`✅ Finished writing ${this.totalNumberOfBytes} bytes.`)
resolve()
})
} catch (e) {
Expand All @@ -238,10 +236,10 @@ export let ReceiptOverwriteLogWriter: DataLogWriter
* @returns {Promise<void>} A promise that resolves when all log writers are initialized.
*/
export async function initDataLogWriter(): Promise<void> {
CycleLogWriter = new DataLogWriter('cycle', 1, LOG_WRITER_CONFIG.maxCycleEntries)
ReceiptLogWriter = new DataLogWriter('receipt', 1, LOG_WRITER_CONFIG.maxReceiptEntries)
OriginalTxDataLogWriter = new DataLogWriter('originalTx', 1, LOG_WRITER_CONFIG.maxOriginalTxEntries)
ReceiptOverwriteLogWriter = new DataLogWriter('receiptOverwrite', 1, LOG_WRITER_CONFIG.maxOriginalTxEntries)
CycleLogWriter = new DataLogWriter('cycle', 1, LOG_WRITER_CONFIG.maxCycleBytes)
ReceiptLogWriter = new DataLogWriter('receipt', 1, LOG_WRITER_CONFIG.maxReceiptBytes)
OriginalTxDataLogWriter = new DataLogWriter('originalTx', 1, LOG_WRITER_CONFIG.maxOriginalTxBytes)
ReceiptOverwriteLogWriter = new DataLogWriter('receiptOverwrite', 1, LOG_WRITER_CONFIG.maxOriginalTxBytes)
await Promise.all([
CycleLogWriter.init(),
ReceiptLogWriter.init(),
Expand Down
38 changes: 21 additions & 17 deletions test/unit/src/Data/DataLogWriter.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ jest.mock('../../../../src/Config', () => ({
dataLogWriter: {
dirName: 'test-logs',
maxLogFiles: 10,
maxCycleEntries: 100,
maxReceiptEntries: 100,
maxOriginalTxEntries: 100,
maxCycleBytes: 1000,
maxReceiptBytes: 1000,
maxOriginalTxBytes: 1000,
},
},
}))
Expand All @@ -39,6 +39,7 @@ describe('DataLogWriter', () => {
// Cast mocked functions
const mockedMkdir = fs.mkdir as jest.MockedFunction<typeof fs.mkdir>
const mockedReadFile = fs.readFile as jest.MockedFunction<typeof fs.readFile>
const mockedStat = fs.stat as jest.MockedFunction<typeof fs.stat>
const mockedWriteFile = fs.writeFile as jest.MockedFunction<typeof fs.writeFile>
const mockedAppendFile = fs.appendFile as jest.MockedFunction<typeof fs.appendFile>
const mockedReaddir = fs.readdir as jest.MockedFunction<typeof fs.readdir>
Expand Down Expand Up @@ -75,6 +76,7 @@ describe('DataLogWriter', () => {
mockedWriteFile.mockResolvedValue()
mockedAppendFile.mockResolvedValue()
mockedReaddir.mockResolvedValue([] as any)
mockedStat.mockResolvedValue({ size: 0 } as any)

// Spy on console
consoleLogSpy = jest.spyOn(console, 'log').mockImplementation(() => {})
Expand Down Expand Up @@ -115,16 +117,17 @@ describe('DataLogWriter', () => {
// Mock readFile calls only for CycleLogWriter
mockedReadFile
.mockResolvedValueOnce('cycle-log2.txt') // CycleLogWriter active log
.mockResolvedValueOnce('line1\nline2\nline3\n') // CycleLogWriter data
mockedStat
.mockResolvedValueOnce({ size: 50 } as any) // CycleLogWriter file size

await initDataLogWriter()

expect(mockedReadFile).toHaveBeenCalledWith(expect.stringContaining('active-cycle-log.txt'), 'utf8')
expect(CycleLogWriter.logCounter).toBe(2)
expect(CycleLogWriter.totalNumberOfEntries).toBe(3)
expect(CycleLogWriter.totalNumberOfBytes).toBe(50)
})

it('should rotate log when entries exceed max', async () => {
it('should rotate log when bytes exceed max', async () => {
// Only mock exists for CycleLogWriter's active log
mockedExistsSync
.mockReturnValueOnce(true) // CycleLogWriter
Expand All @@ -135,11 +138,12 @@ describe('DataLogWriter', () => {
// Mock readFile calls only for CycleLogWriter
mockedReadFile
.mockResolvedValueOnce('cycle-log1.txt') // CycleLogWriter active log
.mockResolvedValueOnce(Array(100).fill('entry').join('\n') + '\n') // 100 entries with trailing newline
mockedStat
.mockResolvedValueOnce({ size: 1000 } as any) // File size that exceeds max

await initDataLogWriter()

expect(CycleLogWriter.totalNumberOfEntries).toBe(0) // Reset after rotation
expect(CycleLogWriter.totalNumberOfBytes).toBe(0) // Reset after rotation
expect(CycleLogWriter.logCounter).toBe(2) // Incremented
})

Expand Down Expand Up @@ -170,7 +174,7 @@ describe('DataLogWriter', () => {
await CycleLogWriter.writeToLog(testData)

expect(mockWriteStream.write).toHaveBeenCalledWith(testData)
expect(CycleLogWriter.totalNumberOfEntries).toBe(1)
expect(CycleLogWriter.totalNumberOfBytes).toBe(15)
})

it('should queue multiple writes', async () => {
Expand All @@ -187,7 +191,7 @@ describe('DataLogWriter', () => {
expect(mockWriteStream.write).toHaveBeenCalledWith(data1)
expect(mockWriteStream.write).toHaveBeenCalledWith(data2)
expect(mockWriteStream.write).toHaveBeenCalledWith(data3)
expect(CycleLogWriter.totalNumberOfEntries).toBe(3)
expect(CycleLogWriter.totalNumberOfBytes).toBe(21)
})

it('should handle write errors', async () => {
Expand Down Expand Up @@ -324,12 +328,12 @@ describe('DataLogWriter', () => {
})

it('should end stream successfully', async () => {
CycleLogWriter.totalNumberOfEntries = 50
CycleLogWriter.totalNumberOfBytes = 50

await CycleLogWriter.endStream()

expect(mockWriteStream.end).toHaveBeenCalled()
expect(consoleLogSpy).toHaveBeenCalledWith('✅ Finished writing 50.')
expect(consoleLogSpy).toHaveBeenCalledWith('✅ Finished writing 50 bytes.')
})

it('should handle errors when ending stream', async () => {
Expand All @@ -349,14 +353,14 @@ describe('DataLogWriter', () => {
})

it('should rotate log when reaching max entries during write', async () => {
CycleLogWriter.totalNumberOfEntries = 100
CycleLogWriter.maxNumberEntriesPerLog = 100
CycleLogWriter.totalNumberOfBytes = 100
CycleLogWriter.maxNumberBytesPerLog = 100

await CycleLogWriter.writeToLog('final entry\n')

expect(mockWriteStream.write).toHaveBeenCalledWith('End: Number of entries: 100\n')
expect(mockWriteStream.write).toHaveBeenCalledWith('End: Number of bytes: 100\n')
expect(mockWriteStream.end).toHaveBeenCalled()
expect(CycleLogWriter.totalNumberOfEntries).toBe(1) // Reset and new entry
expect(CycleLogWriter.totalNumberOfBytes).toBe(12) // Reset and new entry ('final entry\n')
expect(CycleLogWriter.logCounter).toBe(2) // Incremented
})
})
Expand All @@ -374,7 +378,7 @@ describe('DataLogWriter', () => {
await Promise.all(writes.map((data) => CycleLogWriter.writeToLog(data)))

expect(mockWriteStream.write).toHaveBeenCalledTimes(10)
expect(CycleLogWriter.totalNumberOfEntries).toBe(10)
expect(CycleLogWriter.totalNumberOfBytes).toBe(70)
})

it('should maintain write order in queue', async () => {
Expand Down
Loading