diff --git a/src/Config.ts b/src/Config.ts index 34717cbd..fe74c316 100644 --- a/src/Config.ts +++ b/src/Config.ts @@ -59,9 +59,9 @@ export interface Config { dataLogWriter: { dirName: string maxLogFiles: number - maxReceiptEntries: number - maxCycleEntries: number - maxOriginalTxEntries: number + maxReceiptBytes: number + maxCycleBytes: number + maxOriginalTxBytes: number } experimentalSnapshot: boolean failedBucketsDir: string @@ -175,9 +175,9 @@ let config: Config = { dataLogWriter: { dirName: 'data-logs', maxLogFiles: 10, - maxReceiptEntries: 10000, // Should be >= max TPS experienced by the network. - maxCycleEntries: 500, - maxOriginalTxEntries: 10000, // Should be >= max TPS experienced by the network. + maxReceiptBytes: 1024 * 1024, // 1MB - Should be >= max TPS * avg receipt size + maxCycleBytes: 50 * 1024, // 50KB - cycles are smaller + maxOriginalTxBytes: 1024 * 1024, // 1MB - Should be >= max TPS * avg tx size }, experimentalSnapshot: true, failedBucketsDir: 'failed-buckets', diff --git a/src/Data/DataLogWriter.ts b/src/Data/DataLogWriter.ts index 326d4322..d5adf0ce 100644 --- a/src/Data/DataLogWriter.ts +++ b/src/Data/DataLogWriter.ts @@ -19,7 +19,7 @@ class DataLogWriter { dataLogWriteStream: WriteStream | null dataWriteIndex: number dataLogFilePath: string - totalNumberOfEntries: number + totalNumberOfBytes: number activeLogFileName: string activeLogFilePath: string writeQueue: string[] @@ -28,7 +28,7 @@ class DataLogWriter { constructor( public dataName: string, public logCounter: number, - public maxNumberEntriesPerLog: number + public maxNumberBytesPerLog: number ) { this.logDir = `${LOG_WRITER_CONFIG.dirName}/${config.ARCHIVER_IP}_${config.ARCHIVER_PORT}` this.maxLogCounter = LOG_WRITER_CONFIG.maxLogFiles @@ -37,7 +37,7 @@ class DataLogWriter { this.activeLogFileName = `active-${dataName}-log.txt` this.activeLogFilePath = path.join(this.logDir, this.activeLogFileName) this.dataLogFilePath = path.join(this.logDir, `${dataName}-log${logCounter}.txt`) - this.totalNumberOfEntries = 0 + this.totalNumberOfBytes = 0 this.writeQueue = [] this.isWriting = false } @@ -58,18 +58,16 @@ class DataLogWriter { console.log(`> DataLogWriter: Active log file: ${this.dataName}-log${this.logCounter}.txt`) this.dataLogFilePath = path.join(this.logDir, `${this.dataName}-log${this.logCounter}.txt`) // eslint-disable-next-line security/detect-non-literal-fs-filename - const data = await fs.readFile(this.dataLogFilePath, { - encoding: 'utf8', - }) - this.totalNumberOfEntries += data.split('\n').length - 1 - console.log(`> DataLogWriter: Total ${this.dataName} Entries: ${this.totalNumberOfEntries}`) + const stats = await fs.stat(this.dataLogFilePath) + this.totalNumberOfBytes += stats.size + console.log(`> DataLogWriter: Total ${this.dataName} Bytes: ${this.totalNumberOfBytes}`) // eslint-disable-next-line security/detect-non-literal-fs-filename this.dataLogWriteStream = createWriteStream(this.dataLogFilePath, { flags: 'a' }) - if (this.totalNumberOfEntries >= this.maxNumberEntriesPerLog) { - // Finish the log file with the total number of entries. - await this.appendData(`End: Number of entries: ${this.totalNumberOfEntries}\n`) + if (this.totalNumberOfBytes >= this.maxNumberBytesPerLog) { + // Finish the log file with the total number of bytes. + await this.appendData(`End: Number of bytes: ${this.totalNumberOfBytes}\n`) await this.endStream() - this.totalNumberOfEntries = 0 + this.totalNumberOfBytes = 0 await this.rotateLogFile() await this.setActiveLog() } @@ -158,17 +156,17 @@ class DataLogWriter { while (this.writeQueue.length) { try { for (let i = 0; i < this.writeQueue.length; i++) { - if (this.totalNumberOfEntries === this.maxNumberEntriesPerLog) { - await this.appendData(`End: Number of entries: ${this.totalNumberOfEntries}\n`) + if (this.totalNumberOfBytes >= this.maxNumberBytesPerLog) { + await this.appendData(`End: Number of bytes: ${this.totalNumberOfBytes}\n`) await this.endStream() - this.totalNumberOfEntries = 0 + this.totalNumberOfBytes = 0 await this.rotateLogFile() await this.setActiveLog() } // eslint-disable-next-line security/detect-object-injection await this.appendData(this.writeQueue[i]) this.dataWriteIndex += 1 - this.totalNumberOfEntries += 1 + this.totalNumberOfBytes += Buffer.byteLength(this.writeQueue[i], 'utf8') } // console.log('-->> Write queue length: ', this.writeQueue.length) this.writeQueue.splice(0, this.dataWriteIndex) @@ -212,7 +210,7 @@ class DataLogWriter { return new Promise((resolve, reject) => { try { this.dataLogWriteStream!.end(() => { - console.log(`✅ Finished writing ${this.totalNumberOfEntries}.`) + console.log(`✅ Finished writing ${this.totalNumberOfBytes} bytes.`) resolve() }) } catch (e) { @@ -238,10 +236,10 @@ export let ReceiptOverwriteLogWriter: DataLogWriter * @returns {Promise} A promise that resolves when all log writers are initialized. */ export async function initDataLogWriter(): Promise { - CycleLogWriter = new DataLogWriter('cycle', 1, LOG_WRITER_CONFIG.maxCycleEntries) - ReceiptLogWriter = new DataLogWriter('receipt', 1, LOG_WRITER_CONFIG.maxReceiptEntries) - OriginalTxDataLogWriter = new DataLogWriter('originalTx', 1, LOG_WRITER_CONFIG.maxOriginalTxEntries) - ReceiptOverwriteLogWriter = new DataLogWriter('receiptOverwrite', 1, LOG_WRITER_CONFIG.maxOriginalTxEntries) + CycleLogWriter = new DataLogWriter('cycle', 1, LOG_WRITER_CONFIG.maxCycleBytes) + ReceiptLogWriter = new DataLogWriter('receipt', 1, LOG_WRITER_CONFIG.maxReceiptBytes) + OriginalTxDataLogWriter = new DataLogWriter('originalTx', 1, LOG_WRITER_CONFIG.maxOriginalTxBytes) + ReceiptOverwriteLogWriter = new DataLogWriter('receiptOverwrite', 1, LOG_WRITER_CONFIG.maxOriginalTxBytes) await Promise.all([ CycleLogWriter.init(), ReceiptLogWriter.init(), diff --git a/test/unit/src/Data/DataLogWriter.test.ts b/test/unit/src/Data/DataLogWriter.test.ts index 5d1ae975..263563e2 100644 --- a/test/unit/src/Data/DataLogWriter.test.ts +++ b/test/unit/src/Data/DataLogWriter.test.ts @@ -18,9 +18,9 @@ jest.mock('../../../../src/Config', () => ({ dataLogWriter: { dirName: 'test-logs', maxLogFiles: 10, - maxCycleEntries: 100, - maxReceiptEntries: 100, - maxOriginalTxEntries: 100, + maxCycleBytes: 1000, + maxReceiptBytes: 1000, + maxOriginalTxBytes: 1000, }, }, })) @@ -39,6 +39,7 @@ describe('DataLogWriter', () => { // Cast mocked functions const mockedMkdir = fs.mkdir as jest.MockedFunction const mockedReadFile = fs.readFile as jest.MockedFunction + const mockedStat = fs.stat as jest.MockedFunction const mockedWriteFile = fs.writeFile as jest.MockedFunction const mockedAppendFile = fs.appendFile as jest.MockedFunction const mockedReaddir = fs.readdir as jest.MockedFunction @@ -75,6 +76,7 @@ describe('DataLogWriter', () => { mockedWriteFile.mockResolvedValue() mockedAppendFile.mockResolvedValue() mockedReaddir.mockResolvedValue([] as any) + mockedStat.mockResolvedValue({ size: 0 } as any) // Spy on console consoleLogSpy = jest.spyOn(console, 'log').mockImplementation(() => {}) @@ -115,16 +117,17 @@ describe('DataLogWriter', () => { // Mock readFile calls only for CycleLogWriter mockedReadFile .mockResolvedValueOnce('cycle-log2.txt') // CycleLogWriter active log - .mockResolvedValueOnce('line1\nline2\nline3\n') // CycleLogWriter data + mockedStat + .mockResolvedValueOnce({ size: 50 } as any) // CycleLogWriter file size await initDataLogWriter() expect(mockedReadFile).toHaveBeenCalledWith(expect.stringContaining('active-cycle-log.txt'), 'utf8') expect(CycleLogWriter.logCounter).toBe(2) - expect(CycleLogWriter.totalNumberOfEntries).toBe(3) + expect(CycleLogWriter.totalNumberOfBytes).toBe(50) }) - it('should rotate log when entries exceed max', async () => { + it('should rotate log when bytes exceed max', async () => { // Only mock exists for CycleLogWriter's active log mockedExistsSync .mockReturnValueOnce(true) // CycleLogWriter @@ -135,11 +138,12 @@ describe('DataLogWriter', () => { // Mock readFile calls only for CycleLogWriter mockedReadFile .mockResolvedValueOnce('cycle-log1.txt') // CycleLogWriter active log - .mockResolvedValueOnce(Array(100).fill('entry').join('\n') + '\n') // 100 entries with trailing newline + mockedStat + .mockResolvedValueOnce({ size: 1000 } as any) // File size that exceeds max await initDataLogWriter() - expect(CycleLogWriter.totalNumberOfEntries).toBe(0) // Reset after rotation + expect(CycleLogWriter.totalNumberOfBytes).toBe(0) // Reset after rotation expect(CycleLogWriter.logCounter).toBe(2) // Incremented }) @@ -170,7 +174,7 @@ describe('DataLogWriter', () => { await CycleLogWriter.writeToLog(testData) expect(mockWriteStream.write).toHaveBeenCalledWith(testData) - expect(CycleLogWriter.totalNumberOfEntries).toBe(1) + expect(CycleLogWriter.totalNumberOfBytes).toBe(15) }) it('should queue multiple writes', async () => { @@ -187,7 +191,7 @@ describe('DataLogWriter', () => { expect(mockWriteStream.write).toHaveBeenCalledWith(data1) expect(mockWriteStream.write).toHaveBeenCalledWith(data2) expect(mockWriteStream.write).toHaveBeenCalledWith(data3) - expect(CycleLogWriter.totalNumberOfEntries).toBe(3) + expect(CycleLogWriter.totalNumberOfBytes).toBe(21) }) it('should handle write errors', async () => { @@ -324,12 +328,12 @@ describe('DataLogWriter', () => { }) it('should end stream successfully', async () => { - CycleLogWriter.totalNumberOfEntries = 50 + CycleLogWriter.totalNumberOfBytes = 50 await CycleLogWriter.endStream() expect(mockWriteStream.end).toHaveBeenCalled() - expect(consoleLogSpy).toHaveBeenCalledWith('✅ Finished writing 50.') + expect(consoleLogSpy).toHaveBeenCalledWith('✅ Finished writing 50 bytes.') }) it('should handle errors when ending stream', async () => { @@ -349,14 +353,14 @@ describe('DataLogWriter', () => { }) it('should rotate log when reaching max entries during write', async () => { - CycleLogWriter.totalNumberOfEntries = 100 - CycleLogWriter.maxNumberEntriesPerLog = 100 + CycleLogWriter.totalNumberOfBytes = 100 + CycleLogWriter.maxNumberBytesPerLog = 100 await CycleLogWriter.writeToLog('final entry\n') - expect(mockWriteStream.write).toHaveBeenCalledWith('End: Number of entries: 100\n') + expect(mockWriteStream.write).toHaveBeenCalledWith('End: Number of bytes: 100\n') expect(mockWriteStream.end).toHaveBeenCalled() - expect(CycleLogWriter.totalNumberOfEntries).toBe(1) // Reset and new entry + expect(CycleLogWriter.totalNumberOfBytes).toBe(12) // Reset and new entry ('final entry\n') expect(CycleLogWriter.logCounter).toBe(2) // Incremented }) }) @@ -374,7 +378,7 @@ describe('DataLogWriter', () => { await Promise.all(writes.map((data) => CycleLogWriter.writeToLog(data))) expect(mockWriteStream.write).toHaveBeenCalledTimes(10) - expect(CycleLogWriter.totalNumberOfEntries).toBe(10) + expect(CycleLogWriter.totalNumberOfBytes).toBe(70) }) it('should maintain write order in queue', async () => {