Upload files to "src"
parent
8f0ab759d0
commit
f6a0ebb2b9
|
@ -0,0 +1,135 @@
|
||||||
|
use std::sync::{Arc, atomic::{AtomicUsize, Ordering}};
|
||||||
|
use tokio::fs::File;
|
||||||
|
use tokio::io::{self, AsyncBufReadExt, AsyncWriteExt, BufReader};
|
||||||
|
use tokio::sync::{mpsc, Mutex};
|
||||||
|
use crate::database::Database;
|
||||||
|
use crate::config::Config;
|
||||||
|
use crate::parser;
|
||||||
|
use crate::progress;
|
||||||
|
use crate::progress::ProgressTracker;
|
||||||
|
|
||||||
|
pub async fn process_file(
|
||||||
|
filename: &str,
|
||||||
|
db: &Database,
|
||||||
|
config: &Config,
|
||||||
|
total_lines: usize,
|
||||||
|
) -> io::Result<()> {
|
||||||
|
// Open input file
|
||||||
|
let file = File::open(filename).await?;
|
||||||
|
let reader = BufReader::new(file);
|
||||||
|
let mut lines = reader.lines();
|
||||||
|
|
||||||
|
let mut full_batch = Vec::new();
|
||||||
|
let mut partial_batch = Vec::new();
|
||||||
|
let mut malformed_lines = Vec::new();
|
||||||
|
|
||||||
|
// Wrap progress tracker in Arc<Mutex>
|
||||||
|
let progress = Arc::new(Mutex::new(ProgressTracker::new(Some(total_lines))));
|
||||||
|
progress::start_progress_tracker(Arc::clone(&progress));
|
||||||
|
//let progress_clone = Arc::clone(&progress);
|
||||||
|
|
||||||
|
|
||||||
|
// Async channel with batch counter
|
||||||
|
let (db_tx, mut db_rx) = mpsc::channel::<(Vec<mongodb::bson::Document>, bool)>(100);
|
||||||
|
let batch_counter = Arc::new(AtomicUsize::new(0));
|
||||||
|
let counter_clone = Arc::clone(&batch_counter);
|
||||||
|
|
||||||
|
// Spawn DB worker task
|
||||||
|
let db_arc = Arc::new(db.clone());
|
||||||
|
let progress_db = Arc::clone(&progress);
|
||||||
|
let db_worker = {
|
||||||
|
let db_worker = Arc::clone(&db_arc);
|
||||||
|
tokio::spawn(async move {
|
||||||
|
while let Some((batch, is_full)) = db_rx.recv().await {
|
||||||
|
if let Err(e) = db_worker.insert_many(batch, is_full).await {
|
||||||
|
eprintln!("Error inserting batch: {}", e);
|
||||||
|
} else {
|
||||||
|
let p = progress_db.lock().await;
|
||||||
|
p.increment_batches_written();
|
||||||
|
counter_clone.fetch_sub(1, Ordering::SeqCst);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
};
|
||||||
|
|
||||||
|
// Read lines
|
||||||
|
while let Some(line) = lines.next_line().await? {
|
||||||
|
match parser::parse_line(&line) {
|
||||||
|
parser::ParsedRecord::Full { url, username, password } => {
|
||||||
|
full_batch.push(mongodb::bson::doc! { "url": url, "username": username, "password": password });
|
||||||
|
}
|
||||||
|
parser::ParsedRecord::Partial { username, password } => {
|
||||||
|
partial_batch.push(mongodb::bson::doc! { "username": username, "password": password });
|
||||||
|
}
|
||||||
|
parser::ParsedRecord::Malformed(line) => {
|
||||||
|
malformed_lines.push(line);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Access progress safely
|
||||||
|
let p = progress.lock().await;
|
||||||
|
p.increment_lines();
|
||||||
|
|
||||||
|
|
||||||
|
// Send full batch
|
||||||
|
if full_batch.len() >= config.mongodb.batch_size {
|
||||||
|
let batch = full_batch.drain(..).collect::<Vec<_>>();
|
||||||
|
batch_counter.fetch_add(1, Ordering::SeqCst);
|
||||||
|
let p = progress.lock().await;
|
||||||
|
p.set_batches_queued(batch_counter.load(Ordering::SeqCst));
|
||||||
|
|
||||||
|
|
||||||
|
if let Err(e) = db_tx.send((batch, true)).await {
|
||||||
|
eprintln!("Failed to send full batch to DB worker: {}", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send partial batch
|
||||||
|
if partial_batch.len() >= config.mongodb.batch_size {
|
||||||
|
let batch = partial_batch.drain(..).collect::<Vec<_>>();
|
||||||
|
batch_counter.fetch_add(1, Ordering::SeqCst);
|
||||||
|
let p = progress.lock().await;
|
||||||
|
p.set_batches_queued(batch_counter.load(Ordering::SeqCst));
|
||||||
|
|
||||||
|
|
||||||
|
if let Err(e) = db_tx.send((batch, false)).await {
|
||||||
|
eprintln!("Failed to send partial batch to DB worker: {}", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send remaining batches
|
||||||
|
if !full_batch.is_empty() {
|
||||||
|
let batch = full_batch.drain(..).collect::<Vec<_>>();
|
||||||
|
batch_counter.fetch_add(1, Ordering::SeqCst);
|
||||||
|
if let Err(e) = db_tx.send((batch, true)).await {
|
||||||
|
eprintln!("Failed to send final full batch to DB worker: {}", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !partial_batch.is_empty() {
|
||||||
|
let batch = partial_batch.drain(..).collect::<Vec<_>>();
|
||||||
|
batch_counter.fetch_add(1, Ordering::SeqCst);
|
||||||
|
if let Err(e) = db_tx.send((batch, false)).await {
|
||||||
|
eprintln!("Failed to send final partial batch to DB worker: {}", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write malformed records
|
||||||
|
if !malformed_lines.is_empty() {
|
||||||
|
let garbage_filename = format!("{}.garbage.txt", filename);
|
||||||
|
let mut garbage_file = File::create(&garbage_filename).await?;
|
||||||
|
for line in malformed_lines {
|
||||||
|
garbage_file.write_all(line.as_bytes()).await?;
|
||||||
|
garbage_file.write_all(b"\n").await?;
|
||||||
|
}
|
||||||
|
println!("Malformed records saved to: {}", garbage_filename);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close channel and wait for DB worker
|
||||||
|
drop(db_tx);
|
||||||
|
if let Err(e) = db_worker.await {
|
||||||
|
eprintln!("DB worker encountered an error: {}", e);
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
Loading…
Reference in New Issue