Skip to content

Commit e58e9a6

Browse files
committed
fix: orphan object client detector
1 parent a8b4c69 commit e58e9a6

File tree

3 files changed

+315
-0
lines changed

3 files changed

+315
-0
lines changed

src/internal/streams/ndjson.ts

+56
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
import { Transform, TransformCallback } from 'stream'
2+
import { StringDecoder } from 'string_decoder'
3+
4+
export class NdJsonTransform extends Transform {
5+
private decoder = new StringDecoder('utf8')
6+
private buffer = ''
7+
8+
constructor() {
9+
super({ readableObjectMode: true })
10+
}
11+
12+
_transform(chunk: Buffer, encoding: BufferEncoding, callback: TransformCallback) {
13+
// decode safely across chunk boundaries
14+
this.buffer += this.decoder.write(chunk)
15+
16+
let newlineIdx: number
17+
while ((newlineIdx = this.buffer.indexOf('\n')) !== -1) {
18+
const line = this.buffer.slice(0, newlineIdx)
19+
this.buffer = this.buffer.slice(newlineIdx + 1)
20+
if (line.trim()) {
21+
let obj
22+
try {
23+
obj = JSON.parse(line)
24+
} catch (err) {
25+
if (err instanceof Error) {
26+
// this is the case when JSON.parse fails
27+
return callback(new Error(`Invalid JSON on flush: ${err.message}`))
28+
}
29+
30+
return callback(err as Error)
31+
}
32+
// .push() participates in backpressure automatically
33+
this.push(obj)
34+
}
35+
}
36+
37+
callback()
38+
}
39+
40+
_flush(callback: TransformCallback) {
41+
this.buffer += this.decoder.end()
42+
if (this.buffer.trim()) {
43+
try {
44+
this.push(JSON.parse(this.buffer))
45+
} catch (err) {
46+
if (err instanceof Error) {
47+
// this is the case when JSON.parse fails
48+
return callback(new Error(`Invalid JSON on flush: ${err.message}`))
49+
}
50+
51+
return callback(err as Error)
52+
}
53+
}
54+
callback()
55+
}
56+
}

src/scripts/orphan-client.ts

+175
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
import axios from 'axios'
2+
import { NdJsonTransform } from '@internal/streams/ndjson'
3+
import fs from 'fs'
4+
import path from 'path'
5+
6+
const ADMIN_URL = process.env.ADMIN_URL
7+
const ADMIN_API_KEY = process.env.ADMIN_API_KEY
8+
const TENANT_ID = process.env.TENANT_ID
9+
const BUCKET_ID = process.env.BUCKET_ID
10+
11+
const BEFORE = undefined // new Date().toISOString()
12+
13+
const FILE_PATH = (operation: string) =>
14+
`../../dist/${operation}-${TENANT_ID}-${Date.now()}-orphan-objects.json`
15+
16+
const client = axios.create({
17+
baseURL: ADMIN_URL,
18+
headers: {
19+
ApiKey: ADMIN_API_KEY,
20+
},
21+
})
22+
23+
interface OrphanObject {
24+
event: 'data'
25+
type: 's3Orphans'
26+
value: {
27+
name: string
28+
version: string
29+
size: number
30+
}[]
31+
}
32+
33+
interface PingObject {
34+
event: 'ping'
35+
}
36+
37+
async function main() {
38+
const action = process.argv[2]
39+
40+
if (!action) {
41+
console.error('Please provide an action: list or delete')
42+
return
43+
}
44+
45+
if (action === 'list') {
46+
await listOrphans(TENANT_ID, BUCKET_ID)
47+
return
48+
}
49+
50+
await deleteS3Orphans(TENANT_ID, BUCKET_ID)
51+
}
52+
53+
/**
54+
* List Orphan objects in a bucket
55+
* @param tenantId
56+
* @param bucketId
57+
*/
58+
async function listOrphans(tenantId: string, bucketId: string) {
59+
const request = await client.get(`/tenants/${tenantId}/buckets/${bucketId}/orphan-objects`, {
60+
responseType: 'stream',
61+
params: {
62+
before: BEFORE,
63+
},
64+
})
65+
66+
const transformStream = new NdJsonTransform()
67+
request.data.on('error', (err: Error) => {
68+
transformStream.emit('error', err)
69+
})
70+
71+
const jsonStream = request.data.pipe(transformStream)
72+
73+
await writeStreamToJsonArray(jsonStream, FILE_PATH('list'))
74+
}
75+
76+
/**
77+
* Deletes S3 orphan objects in a bucket
78+
* @param tenantId
79+
* @param bucketId
80+
*/
81+
async function deleteS3Orphans(tenantId: string, bucketId: string) {
82+
const request = await client.delete(`/tenants/${tenantId}/buckets/${bucketId}/orphan-objects`, {
83+
responseType: 'stream',
84+
data: {
85+
deleteS3Keys: true,
86+
before: BEFORE,
87+
},
88+
})
89+
90+
const transformStream = new NdJsonTransform()
91+
request.data.on('error', (err: Error) => {
92+
transformStream.emit('error', err)
93+
})
94+
95+
const jsonStream = request.data.pipe(transformStream)
96+
97+
await writeStreamToJsonArray(jsonStream, FILE_PATH('delete'))
98+
}
99+
100+
/**
101+
* Writes the output to a JSON array
102+
* @param stream
103+
* @param relativePath
104+
*/
105+
async function writeStreamToJsonArray(
106+
stream: NodeJS.ReadableStream,
107+
relativePath: string
108+
): Promise<void> {
109+
const filePath = path.resolve(__dirname, relativePath)
110+
const localFile = fs.createWriteStream(filePath)
111+
112+
// Start with an empty array
113+
localFile.write('[\n')
114+
let isFirstItem = true
115+
116+
return new Promise((resolve, reject) => {
117+
let receivedAnyData = false
118+
119+
stream.on('data', (data: OrphanObject | PingObject) => {
120+
if (data.event === 'ping') {
121+
console.log('Received ping event, ignoring')
122+
return
123+
}
124+
125+
if (data.event === 'data' && data.value && Array.isArray(data.value)) {
126+
receivedAnyData = true
127+
console.log(`Processing ${data.value.length} objects`)
128+
129+
for (const item of data.value) {
130+
if (!isFirstItem) {
131+
localFile.write(',\n')
132+
} else {
133+
isFirstItem = false
134+
}
135+
136+
localFile.write(JSON.stringify(item, null, 2))
137+
}
138+
} else {
139+
console.warn(
140+
'Received data with invalid format:',
141+
JSON.stringify(data).substring(0, 100) + '...'
142+
)
143+
}
144+
})
145+
146+
stream.on('error', (err) => {
147+
console.error('Stream error:', err)
148+
localFile.end('\n]', () => {
149+
reject(err)
150+
})
151+
})
152+
153+
stream.on('end', () => {
154+
localFile.write('\n]')
155+
localFile.end(() => {
156+
resolve()
157+
})
158+
159+
if (!receivedAnyData) {
160+
console.warn(`No data was received! File might be empty: ${filePath}`)
161+
} else {
162+
// Check if the file exists and has content
163+
console.log(`Finished writing data to ${filePath}. Data was received and saved.`)
164+
}
165+
})
166+
})
167+
}
168+
169+
main()
170+
.catch((e) => {
171+
console.error('Error:', e)
172+
})
173+
.then(() => {
174+
console.log('Done')
175+
})

src/test/ndjson.test.ts

+84
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
// NdJsonTransform.test.ts
2+
3+
import { Buffer } from 'buffer'
4+
import { NdJsonTransform } from '@internal/streams/ndjson'
5+
6+
/**
7+
* Helper that writes the given chunks into the transform,
8+
* collects all the parsed objects (in order), and resolves
9+
* them—or rejects on error.
10+
*/
11+
function collect(transform: NdJsonTransform, chunks: Buffer[]): Promise<any[]> {
12+
return new Promise((resolve, reject) => {
13+
const out: any[] = []
14+
transform.on('data', (obj) => out.push(obj))
15+
transform.on('error', (err) => reject(err))
16+
transform.on('end', () => resolve(out))
17+
for (const c of chunks) transform.write(c)
18+
transform.end()
19+
})
20+
}
21+
22+
describe('NdJsonTransform', () => {
23+
it('parses a single JSON object terminated by newline', async () => {
24+
const t = new NdJsonTransform()
25+
const result = await collect(t, [Buffer.from('{"foo":123}\n')])
26+
expect(result).toEqual([{ foo: 123 }])
27+
})
28+
29+
it('parses multiple JSON objects in one chunk', async () => {
30+
const t = new NdJsonTransform()
31+
const chunk = Buffer.from('{"a":1}\n{"b":2}\n')
32+
const result = await collect(t, [chunk])
33+
expect(result).toEqual([{ a: 1 }, { b: 2 }])
34+
})
35+
36+
it('skips empty and whitespace-only lines', async () => {
37+
const t = new NdJsonTransform()
38+
const chunk = Buffer.from('\n \n{"x":10}\n \n')
39+
const result = await collect(t, [chunk])
40+
expect(result).toEqual([{ x: 10 }])
41+
})
42+
43+
it('parses JSON split across multiple chunks', async () => {
44+
const t = new NdJsonTransform()
45+
const chunks = [Buffer.from('{"split":'), Buffer.from('true}\n')]
46+
const result = await collect(t, chunks)
47+
expect(result).toEqual([{ split: true }])
48+
})
49+
50+
it('parses final line without trailing newline on flush', async () => {
51+
const t = new NdJsonTransform()
52+
const chunks = [Buffer.from('{"end":"last"}')]
53+
const result = await collect(t, chunks)
54+
expect(result).toEqual([{ end: 'last' }])
55+
})
56+
57+
it('propagates parse errors in _transform (invalid JSON with newline)', async () => {
58+
const t = new NdJsonTransform()
59+
const bad = Buffer.from('{"foo": bad}\n')
60+
await expect(collect(t, [bad])).rejects.toThrow(/Invalid JSON on flush:/)
61+
})
62+
63+
it('propagates parse errors in _flush (invalid final JSON)', async () => {
64+
const t = new NdJsonTransform()
65+
const bad = Buffer.from('{"incomplete":123')
66+
await expect(collect(t, [bad])).rejects.toThrow(/Invalid JSON on flush:/)
67+
})
68+
69+
it('handles multi-byte UTF-8 characters split across chunk boundary', async () => {
70+
const t = new NdJsonTransform()
71+
const full = Buffer.from('{"emoji":"💩"}\n', 'utf8')
72+
// Split in the middle of the 4‑byte 💩 codepoint:
73+
const chunk1 = full.slice(0, 12) // up through two bytes of the emoji
74+
const chunk2 = full.slice(12) // remainder of emoji + '}' + '\n'
75+
const result = await collect(t, [chunk1, chunk2])
76+
expect(result).toEqual([{ emoji: '💩' }])
77+
})
78+
79+
it('emits no data for completely empty input', async () => {
80+
const t = new NdJsonTransform()
81+
const result = await collect(t, [])
82+
expect(result).toEqual([])
83+
})
84+
})

0 commit comments

Comments
 (0)