6
6
import os
7
7
import threading
8
8
from pathlib import Path
9
+ from queue import Queue
9
10
from typing import Any
10
11
11
12
from llama_index .core .data_structs import IndexDict
12
13
from llama_index .core .embeddings .utils import EmbedType
13
14
from llama_index .core .indices import VectorStoreIndex , load_index_from_storage
14
15
from llama_index .core .indices .base import BaseIndex
15
16
from llama_index .core .ingestion import run_transformations
16
- from llama_index .core .schema import Document , TransformComponent
17
+ from llama_index .core .schema import BaseNode , Document , TransformComponent
17
18
from llama_index .core .storage import StorageContext
18
19
19
20
from private_gpt .components .ingest .ingest_helper import IngestionHelper
20
21
from private_gpt .paths import local_data_path
21
22
from private_gpt .settings .settings import Settings
23
+ from private_gpt .utils .eta import eta
22
24
23
25
logger = logging .getLogger (__name__ )
24
26
@@ -314,6 +316,170 @@ def __del__(self) -> None:
314
316
self ._file_to_documents_work_pool .terminate ()
315
317
316
318
319
+ class PipelineIngestComponent (BaseIngestComponentWithIndex ):
320
+ """Pipeline ingestion - keeping the embedding worker pool as busy as possible.
321
+
322
+ This class implements a threaded ingestion pipeline, which comprises two threads
323
+ and two queues. The primary thread is responsible for reading and parsing files
324
+ into documents. These documents are then placed into a queue, which is
325
+ distributed to a pool of worker processes for embedding computation. After
326
+ embedding, the documents are transferred to another queue where they are
327
+ accumulated until a threshold is reached. Upon reaching this threshold, the
328
+ accumulated documents are flushed to the document store, index, and vector
329
+ store.
330
+
331
+ Exception handling ensures robustness against erroneous files. However, in the
332
+ pipelined design, one error can lead to the discarding of multiple files. Any
333
+ discarded files will be reported.
334
+ """
335
+
336
+ NODE_FLUSH_COUNT = 5000 # Save the index every # nodes.
337
+
338
+ def __init__ (
339
+ self ,
340
+ storage_context : StorageContext ,
341
+ embed_model : EmbedType ,
342
+ transformations : list [TransformComponent ],
343
+ count_workers : int ,
344
+ * args : Any ,
345
+ ** kwargs : Any ,
346
+ ) -> None :
347
+ super ().__init__ (storage_context , embed_model , transformations , * args , ** kwargs )
348
+ self .count_workers = count_workers
349
+ assert (
350
+ len (self .transformations ) >= 2
351
+ ), "Embeddings must be in the transformations"
352
+ assert count_workers > 0 , "count_workers must be > 0"
353
+ self .count_workers = count_workers
354
+ # We are doing our own multiprocessing
355
+ # To do not collide with the multiprocessing of huggingface, we disable it
356
+ os .environ ["TOKENIZERS_PARALLELISM" ] = "false"
357
+
358
+ # doc_q stores parsed files as Document chunks.
359
+ # Using a shallow queue causes the filesystem parser to block
360
+ # when it reaches capacity. This ensures it doesn't outpace the
361
+ # computationally intensive embeddings phase, avoiding unnecessary
362
+ # memory consumption. The semaphore is used to bound the async worker
363
+ # embedding computations to cause the doc Q to fill and block.
364
+ self .doc_semaphore = multiprocessing .Semaphore (
365
+ self .count_workers
366
+ ) # limit the doc queue to # items.
367
+ self .doc_q : Queue [tuple [str , str | None , list [Document ] | None ]] = Queue (20 )
368
+ # node_q stores documents parsed into nodes (embeddings).
369
+ # Larger queue size so we don't block the embedding workers during a slow
370
+ # index update.
371
+ self .node_q : Queue [
372
+ tuple [str , str | None , list [Document ] | None , list [BaseNode ] | None ]
373
+ ] = Queue (40 )
374
+ threading .Thread (target = self ._doc_to_node , daemon = True ).start ()
375
+ threading .Thread (target = self ._write_nodes , daemon = True ).start ()
376
+
377
+ def _doc_to_node (self ) -> None :
378
+ # Parse documents into nodes
379
+ with multiprocessing .pool .ThreadPool (processes = self .count_workers ) as pool :
380
+ while True :
381
+ try :
382
+ cmd , file_name , documents = self .doc_q .get (
383
+ block = True
384
+ ) # Documents for a file
385
+ if cmd == "process" :
386
+ # Push CPU/GPU embedding work to the worker pool
387
+ # Acquire semaphore to control access to worker pool
388
+ self .doc_semaphore .acquire ()
389
+ pool .apply_async (
390
+ self ._doc_to_node_worker , (file_name , documents )
391
+ )
392
+ elif cmd == "quit" :
393
+ break
394
+ finally :
395
+ if cmd != "process" :
396
+ self .doc_q .task_done () # unblock Q joins
397
+
398
+ def _doc_to_node_worker (self , file_name : str , documents : list [Document ]) -> None :
399
+ # CPU/GPU intensive work in its own process
400
+ try :
401
+ nodes = run_transformations (
402
+ documents , # type: ignore[arg-type]
403
+ self .transformations ,
404
+ show_progress = self .show_progress ,
405
+ )
406
+ self .node_q .put (("process" , file_name , documents , nodes ))
407
+ finally :
408
+ self .doc_semaphore .release ()
409
+ self .doc_q .task_done () # unblock Q joins
410
+
411
+ def _save_docs (
412
+ self , files : list [str ], documents : list [Document ], nodes : list [BaseNode ]
413
+ ) -> None :
414
+ try :
415
+ logger .info (
416
+ f"Saving { len (files )} files ({ len (documents )} documents / { len (nodes )} nodes)"
417
+ )
418
+ self ._index .insert_nodes (nodes )
419
+ for document in documents :
420
+ self ._index .docstore .set_document_hash (
421
+ document .get_doc_id (), document .hash
422
+ )
423
+ self ._save_index ()
424
+ except Exception :
425
+ # Tell the user so they can investigate these files
426
+ logger .exception (f"Processing files { files } " )
427
+ finally :
428
+ # Clearing work, even on exception, maintains a clean state.
429
+ nodes .clear ()
430
+ documents .clear ()
431
+ files .clear ()
432
+
433
+ def _write_nodes (self ) -> None :
434
+ # Save nodes to index. I/O intensive.
435
+ node_stack : list [BaseNode ] = []
436
+ doc_stack : list [Document ] = []
437
+ file_stack : list [str ] = []
438
+ while True :
439
+ try :
440
+ cmd , file_name , documents , nodes = self .node_q .get (block = True )
441
+ if cmd in ("flush" , "quit" ):
442
+ if file_stack :
443
+ self ._save_docs (file_stack , doc_stack , node_stack )
444
+ if cmd == "quit" :
445
+ break
446
+ elif cmd == "process" :
447
+ node_stack .extend (nodes ) # type: ignore[arg-type]
448
+ doc_stack .extend (documents ) # type: ignore[arg-type]
449
+ file_stack .append (file_name ) # type: ignore[arg-type]
450
+ # Constant saving is heavy on I/O - accumulate to a threshold
451
+ if len (node_stack ) >= self .NODE_FLUSH_COUNT :
452
+ self ._save_docs (file_stack , doc_stack , node_stack )
453
+ finally :
454
+ self .node_q .task_done ()
455
+
456
+ def _flush (self ) -> None :
457
+ self .doc_q .put (("flush" , None , None ))
458
+ self .doc_q .join ()
459
+ self .node_q .put (("flush" , None , None , None ))
460
+ self .node_q .join ()
461
+
462
+ def ingest (self , file_name : str , file_data : Path ) -> list [Document ]:
463
+ documents = IngestionHelper .transform_file_into_documents (file_name , file_data )
464
+ self .doc_q .put (("process" , file_name , documents ))
465
+ self ._flush ()
466
+ return documents
467
+
468
+ def bulk_ingest (self , files : list [tuple [str , Path ]]) -> list [Document ]:
469
+ docs = []
470
+ for file_name , file_data in eta (files ):
471
+ try :
472
+ documents = IngestionHelper .transform_file_into_documents (
473
+ file_name , file_data
474
+ )
475
+ self .doc_q .put (("process" , file_name , documents ))
476
+ docs .extend (documents )
477
+ except Exception :
478
+ logger .exception (f"Skipping { file_data .name } " )
479
+ self ._flush ()
480
+ return docs
481
+
482
+
317
483
def get_ingestion_component (
318
484
storage_context : StorageContext ,
319
485
embed_model : EmbedType ,
@@ -336,6 +502,13 @@ def get_ingestion_component(
336
502
transformations = transformations ,
337
503
count_workers = settings .embedding .count_workers ,
338
504
)
505
+ elif ingest_mode == "pipeline" :
506
+ return PipelineIngestComponent (
507
+ storage_context = storage_context ,
508
+ embed_model = embed_model ,
509
+ transformations = transformations ,
510
+ count_workers = settings .embedding .count_workers ,
511
+ )
339
512
else :
340
513
return SimpleIngestComponent (
341
514
storage_context = storage_context ,
0 commit comments