Skip to content

Commit f74fb4a

Browse files
committed
feat: refactored csvtojson to reduce memory usage
stream through the lines
1 parent c4891b6 commit f74fb4a

File tree

1 file changed

+177
-118
lines changed

1 file changed

+177
-118
lines changed

lib/handlers/MBNDSynchronizer.js

Lines changed: 177 additions & 118 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ class MBNDSynchronizer {
4242
dbFilePath: undefined
4343
};
4444
this.options = options;
45-
this.limit = pLimit(5000);
45+
this.limit = pLimit(20);
4646

4747
process.on('SIGINT', async () => await this.restoreDbFile());
4848
process.on('SIGTERM', async () => await this.restoreDbFile());
@@ -151,67 +151,111 @@ class MBNDSynchronizer {
151151
throw e;
152152
};
153153

154-
fullSync = async () => {
155-
const { options, user, sequelize, limit, paths } = this;
154+
/**
155+
* Unified CSV processing function that can either count or process tracks
156+
* @param {('count'|'process')} mode
157+
* @param {[function|null]} [trackHandler = null] - callback for handling eligible tracks
158+
* @param {object} [options = {}] - additional options like batchSize, totalCount, etc.
159+
* @param {number} [options.batchSize = 500] - batch size for processing mode
160+
* @param {number} [options.totalCount = null] - total count for progress tracking
161+
* @returns {Promise<number>} - number of processed tracks
162+
*/
163+
processCsv = async (mode, trackHandler = null, options = {}) => {
164+
const { batchSize = 500, totalCount = null } = options;
165+
const { options: syncOptions, paths } = this;
166+
let headerProcessed = false;
167+
let processedCount = 0;
168+
let currentBatch = [];
169+
let progressBar = null;
170+
171+
const colParser = {
172+
playCount: 'number',
173+
rating: item => {
174+
let rating = parseInt(item);
175+
if (!rating) return 0;
176+
if (rating > 5 && rating <= 100) {
177+
rating = Math.round(rating / 20);
178+
}
179+
return rating;
180+
},
181+
lastPlayed: item => (dayjs(item, syncOptions.datetimeFormat).isValid() ? dayjs(item, syncOptions.datetimeFormat).utc() : null),
182+
love: item => (!!item?.trim() ? 1 : 0)
183+
};
156184

157-
let musicBeeCollection = await csv2json({
158-
delimiter: 'auto',
159-
colParser: {
160-
albumRating: 'number',
161-
playCount: 'number',
162-
skipCount: 'number',
163-
rating: item => {
164-
let rating = parseInt(item);
165-
if (!rating) {
166-
return 0;
167-
}
168-
// handling Additional Tagging & Reporting Tools new rating system
169-
if (rating > 5 && rating <= 100) {
170-
rating = Math.round(rating / 20);
171-
}
172-
return rating;
173-
},
174-
dateAdded: item => (dayjs(item, options.datetimeFormat).isValid() ? dayjs(item, options.datetimeFormat).utc() : null),
175-
lastPlayed: item => (dayjs(item, options.datetimeFormat).isValid() ? dayjs(item, options.datetimeFormat).utc() : null),
176-
dateModified: item => (dayjs(item, options.datetimeFormat).isValid() ? dayjs(item, options.datetimeFormat).utc() : null),
177-
love: item => (!!item?.trim() ? 1 : 0)
185+
if (mode === 'process') {
186+
if (!trackHandler) {
187+
throw new Error('trackHandler is required for processing mode');
188+
}
189+
if (!totalCount) {
190+
throw new Error('totalCount is required for processing mode');
191+
}
192+
193+
if (!syncOptions.verbose) {
194+
progressBar = new cliProgress.SingleBar(
195+
{ etaBuffer: Math.max(100, Math.floor(totalCount * 0.1)) },
196+
cliProgress.Presets.shades_classic
197+
);
178198
}
199+
200+
colParser.albumRating = 'number';
201+
colParser.playCount = 'number';
202+
colParser.skipCount = 'number';
203+
colParser.dateAdded = item => (dayjs(item, syncOptions.datetimeFormat).isValid() ? dayjs(item, syncOptions.datetimeFormat).utc() : null);
204+
colParser.dateModified = item => (dayjs(item, syncOptions.datetimeFormat).isValid() ? dayjs(item, syncOptions.datetimeFormat).utc() : null);
205+
}
206+
207+
progressBar?.start(totalCount, 0);
208+
const incrementProgress = () => progressBar?.increment();
209+
210+
await csv2json({
211+
delimiter: 'auto',
212+
colParser
179213
})
180214
.preFileLine((fileLineString, lineIdx) => {
181-
if (lineIdx === 0) {
215+
if (lineIdx === 0 && !headerProcessed) {
182216
this.REQUIRED_HEADERS.map(header => {
183-
// camelCase: handling Additional Tagging & Reporting Tools new headers
184217
if (!camelCase(fileLineString).includes(camelCase(header))) {
185218
throw new Error(`${header} missing in your CSV headers`);
186219
}
187220
});
221+
headerProcessed = true;
188222
return camelCase(fileLineString.replace(/<|>/g, ''));
189223
}
190224
return fileLineString;
191225
})
192-
.fromFile(paths.csvFilePath);
226+
.subscribe(async track => {
227+
const trackEligible = !!track.playCount || !!track.rating || !!track.lastPlayed || !!track.love;
228+
if (!trackEligible) {
229+
return;
230+
}
193231

194-
musicBeeCollection = musicBeeCollection.filter(
195-
track => !!track.playCount || !!track.rating || !!track.lastPlayed || !!track.love
196-
);
232+
if (mode === 'count' || !trackHandler) {
233+
processedCount++;
234+
return;
235+
}
197236

198-
console.log(`${paths.csvFilePath} parsed successfully, ${musicBeeCollection.length} potential tracks to be updated`);
237+
currentBatch.push(track);
199238

200-
const { Track, Annotation, Artist } = sequelize.models;
239+
if (currentBatch.length >= batchSize) {
240+
await trackHandler([...currentBatch], incrementProgress);
241+
processedCount += currentBatch.length;
242+
currentBatch.length = 0;
243+
}
244+
})
245+
.fromFile(paths.csvFilePath);
201246

202-
let notFoundTracksCount = 0;
247+
if (mode === 'process' && currentBatch.length > 0) {
248+
await trackHandler(currentBatch, incrementProgress);
249+
processedCount += currentBatch.length;
250+
}
203251

204-
console.log('Processing tracks...');
205-
const progressBar = options.verbose
206-
? null
207-
: new cliProgress.SingleBar(
208-
{ etaBuffer: Math.max(100, Math.floor(musicBeeCollection.length * 0.1)) },
209-
cliProgress.Presets.shades_classic
210-
);
211-
progressBar?.start(musicBeeCollection.length, 0);
252+
progressBar?.stop();
253+
return processedCount;
254+
};
212255

213-
const sampleDbTrack = await Track.findOne();
214-
const dbFolderSeparator = sampleDbTrack.path.substring(1).startsWith(':\\') ? '\\' : '/';
256+
fullSync = async () => {
257+
const { options, user, sequelize, paths, limit } = this;
258+
const { Track, Annotation } = sequelize.models;
215259

216260
const trackIncludes = [
217261
{
@@ -226,89 +270,104 @@ class MBNDSynchronizer {
226270
];
227271

228272
let trackUpdatedCount = 0;
229-
await Promise.all(
230-
musicBeeCollection.map(track =>
231-
limit(async () => {
232-
progressBar?.increment();
233-
const foundTracks = await Track.findAll({
234-
where: {
235-
[Op.and]: [
236-
{ title: track.title },
237-
{
238-
path: {
239-
[Op.endsWith]: `${track.filename}`
240-
}
273+
let notFoundTracksCount = 0;
274+
275+
const totalEligibleTracks = await this.processCsv('count');
276+
console.log(`${paths.csvFilePath} parsed successfully, ${totalEligibleTracks} potential tracks to be updated`);
277+
278+
// await new Promise(resolve => setTimeout(resolve, 200));
279+
280+
await this.processCsv('process',
281+
/**
282+
* @param {object[]} trackBatch
283+
* @param {function} incrementProgress
284+
*/
285+
async (trackBatch, incrementProgress) => {
286+
await Promise.all(
287+
trackBatch.map(track =>
288+
limit(async () => {
289+
incrementProgress?.();
290+
const foundTracks = await Track.findAll({
291+
where: {
292+
[Op.and]: [
293+
{ title: track.title },
294+
{
295+
path: {
296+
[Op.endsWith]: `${track.filename}`
297+
}
298+
}
299+
]
300+
},
301+
include: trackIncludes
302+
});
303+
const foundTrack = findBestMatch(track, foundTracks);
304+
305+
if (!foundTrack) {
306+
notFoundTracksCount++;
307+
if (options.verbose) {
308+
console.error(`track not found: ${track.filePath}`);
241309
}
242-
]
243-
},
244-
include: trackIncludes
245-
});
246-
const foundTrack = findBestMatch(track, foundTracks);
310+
return;
311+
}
247312

248-
if (!foundTrack) {
249-
notFoundTracksCount++;
250-
if (options.verbose) {
251-
console.error(`track not found: ${track.filePath}`);
252-
}
253-
return;
254-
}
313+
if (options.verbose) {
314+
console.log(`processing track: ${track.filePath}`);
315+
}
255316

256-
if (options.verbose) {
257-
console.log(`processing track: ${track.filePath}`);
258-
}
317+
let annotation = foundTrack?.trackAnnotation;
318+
if (!annotation) {
319+
const record = {
320+
item_type: 'media_file',
321+
user_id: user.id,
322+
item_id: foundTrack.id,
323+
play_count: 0,
324+
starred: 0
325+
};
326+
if (Annotation.getAttributes().ann_id) {
327+
record.ann_id = randomUUID();
328+
}
329+
annotation = Annotation.build(record);
330+
}
259331

260-
let annotation = foundTrack?.trackAnnotation;
261-
if (!annotation) {
262-
const record = {
263-
item_type: 'media_file',
264-
user_id: user.id,
265-
item_id: foundTrack.id,
266-
play_count: 0,
267-
starred: 0
268-
};
269-
if (Annotation.getAttributes().ann_id) {
270-
record.ann_id = randomUUID();
271-
}
272-
annotation = Annotation.build(record);
273-
}
332+
const update = {};
333+
if (track.rating > annotation.rating) {
334+
update.rating = track.rating;
335+
}
336+
if (track.love > annotation.starred) {
337+
update.starred = track.love;
338+
update.starred_at = track.lastPlayed || null;
339+
}
340+
if (track.playCount !== annotation.play_count) {
341+
if (track.playCount > annotation.play_count) {
342+
update.play_count = track.playCount;
343+
}
344+
if (options.first && annotation.play_count + track.playCount > annotation.play_count) {
345+
update.play_count = annotation.play_count + track.playCount;
346+
}
347+
}
274348

275-
const update = {};
276-
if (track.rating > annotation.rating) {
277-
update.rating = track.rating;
278-
}
279-
if (track.love > annotation.starred) {
280-
update.starred = track.love;
281-
update.starred_at = track.lastPlayed || null; // this data is not available with MB
282-
}
283-
if (track.playCount !== annotation.play_count) {
284-
if (track.playCount > annotation.play_count) {
285-
update.play_count = track.playCount;
286-
}
287-
if (options.first && annotation.play_count + track.playCount > annotation.play_count) {
288-
update.play_count = annotation.play_count + track.playCount;
289-
}
290-
}
349+
if (track.lastPlayed > annotation.play_date) {
350+
update.play_date = track.lastPlayed;
351+
if (!annotation.play_count && !update.play_count && !track.skipCount && !track.playCount) {
352+
update.play_count = 1;
353+
}
354+
}
291355

292-
if (track.lastPlayed > annotation.play_date) {
293-
update.play_date = track.lastPlayed;
294-
if (!annotation.play_count && !update.play_count && !track.skipCount && !track.playCount) {
295-
update.play_count = 1;
296-
}
297-
}
298-
if (!Object.keys(update).length) {
299-
return;
300-
}
356+
if (!Object.keys(update).length) {
357+
return;
358+
}
301359

302-
if (!foundTrack.trackAnnotation) {
303-
await annotation.save();
304-
}
305-
await annotation.update(update);
306-
trackUpdatedCount++;
307-
})
308-
)
309-
);
310-
progressBar?.stop();
360+
if (!foundTrack.trackAnnotation) {
361+
await annotation.save();
362+
}
363+
await annotation.update(update);
364+
trackUpdatedCount++;
365+
})
366+
)
367+
);
368+
}, { totalCount: totalEligibleTracks });
311369
console.log(`${trackUpdatedCount} tracks updated`);
370+
312371
if (notFoundTracksCount > 0) {
313372
console.warn(`${notFoundTracksCount} tracks not found`);
314373
}

0 commit comments

Comments
 (0)