From f6a0ebb2b9104900dcfec90aebbd263cb6e37841 Mon Sep 17 00:00:00 2001 From: CatWithAHat Date: Sat, 29 Mar 2025 08:57:12 +0000 Subject: [PATCH] Upload files to "src" --- src/file_handler.rs | 135 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 135 insertions(+) create mode 100644 src/file_handler.rs diff --git a/src/file_handler.rs b/src/file_handler.rs new file mode 100644 index 0000000..d7f89fb --- /dev/null +++ b/src/file_handler.rs @@ -0,0 +1,135 @@ +use std::sync::{Arc, atomic::{AtomicUsize, Ordering}}; +use tokio::fs::File; +use tokio::io::{self, AsyncBufReadExt, AsyncWriteExt, BufReader}; +use tokio::sync::{mpsc, Mutex}; +use crate::database::Database; +use crate::config::Config; +use crate::parser; +use crate::progress; +use crate::progress::ProgressTracker; + +pub async fn process_file( + filename: &str, + db: &Database, + config: &Config, + total_lines: usize, +) -> io::Result<()> { + // Open input file + let file = File::open(filename).await?; + let reader = BufReader::new(file); + let mut lines = reader.lines(); + + let mut full_batch = Vec::new(); + let mut partial_batch = Vec::new(); + let mut malformed_lines = Vec::new(); + + // Wrap progress tracker in Arc + let progress = Arc::new(Mutex::new(ProgressTracker::new(Some(total_lines)))); + progress::start_progress_tracker(Arc::clone(&progress)); + //let progress_clone = Arc::clone(&progress); + + + // Async channel with batch counter + let (db_tx, mut db_rx) = mpsc::channel::<(Vec, bool)>(100); + let batch_counter = Arc::new(AtomicUsize::new(0)); + let counter_clone = Arc::clone(&batch_counter); + + // Spawn DB worker task + let db_arc = Arc::new(db.clone()); + let progress_db = Arc::clone(&progress); + let db_worker = { + let db_worker = Arc::clone(&db_arc); + tokio::spawn(async move { + while let Some((batch, is_full)) = db_rx.recv().await { + if let Err(e) = db_worker.insert_many(batch, is_full).await { + eprintln!("Error inserting batch: {}", e); + } else { + let p = progress_db.lock().await; + p.increment_batches_written(); + counter_clone.fetch_sub(1, Ordering::SeqCst); + } + } + }) + }; + + // Read lines + while let Some(line) = lines.next_line().await? { + match parser::parse_line(&line) { + parser::ParsedRecord::Full { url, username, password } => { + full_batch.push(mongodb::bson::doc! { "url": url, "username": username, "password": password }); + } + parser::ParsedRecord::Partial { username, password } => { + partial_batch.push(mongodb::bson::doc! { "username": username, "password": password }); + } + parser::ParsedRecord::Malformed(line) => { + malformed_lines.push(line); + } + } + + // Access progress safely + let p = progress.lock().await; + p.increment_lines(); + + + // Send full batch + if full_batch.len() >= config.mongodb.batch_size { + let batch = full_batch.drain(..).collect::>(); + batch_counter.fetch_add(1, Ordering::SeqCst); + let p = progress.lock().await; + p.set_batches_queued(batch_counter.load(Ordering::SeqCst)); + + + if let Err(e) = db_tx.send((batch, true)).await { + eprintln!("Failed to send full batch to DB worker: {}", e); + } + } + + // Send partial batch + if partial_batch.len() >= config.mongodb.batch_size { + let batch = partial_batch.drain(..).collect::>(); + batch_counter.fetch_add(1, Ordering::SeqCst); + let p = progress.lock().await; + p.set_batches_queued(batch_counter.load(Ordering::SeqCst)); + + + if let Err(e) = db_tx.send((batch, false)).await { + eprintln!("Failed to send partial batch to DB worker: {}", e); + } + } + } + + // Send remaining batches + if !full_batch.is_empty() { + let batch = full_batch.drain(..).collect::>(); + batch_counter.fetch_add(1, Ordering::SeqCst); + if let Err(e) = db_tx.send((batch, true)).await { + eprintln!("Failed to send final full batch to DB worker: {}", e); + } + } + if !partial_batch.is_empty() { + let batch = partial_batch.drain(..).collect::>(); + batch_counter.fetch_add(1, Ordering::SeqCst); + if let Err(e) = db_tx.send((batch, false)).await { + eprintln!("Failed to send final partial batch to DB worker: {}", e); + } + } + + // Write malformed records + if !malformed_lines.is_empty() { + let garbage_filename = format!("{}.garbage.txt", filename); + let mut garbage_file = File::create(&garbage_filename).await?; + for line in malformed_lines { + garbage_file.write_all(line.as_bytes()).await?; + garbage_file.write_all(b"\n").await?; + } + println!("Malformed records saved to: {}", garbage_filename); + } + + // Close channel and wait for DB worker + drop(db_tx); + if let Err(e) = db_worker.await { + eprintln!("DB worker encountered an error: {}", e); + } + + Ok(()) +}