diff --git a/Cargo.lock b/Cargo.lock index 0b119ef..4407420 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -179,21 +179,17 @@ dependencies = [ ] [[package]] -name = "default" -version = "0.1.0" +name = "dashmap" +version = "5.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856" dependencies = [ - "clap", - "futures", - "indicatif", + "cfg-if", + "hashbrown", + "lock_api", "once_cell", - "rand", - "rand_distr", + "parking_lot_core", "serde", - "serde_json", - "tokio", - "tokio-stream", - "tracing", - "tracing-subscriber", ] [[package]] @@ -291,23 +287,18 @@ dependencies = [ "slab", ] -[[package]] -name = "getrandom" -version = "0.2.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "190092ea657667030ac6a35e305e62fc4dd69fd98ac98631e5d3a2b1575a12b5" -dependencies = [ - "cfg-if", - "libc", - "wasi", -] - [[package]] name = "gimli" version = "0.28.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4271d37baee1b8c7e4b708028c57d816cf9d2434acb33a549475f78c181f6253" +[[package]] +name = "hashbrown" +version = "0.14.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "290f1a1d9242c78d09ce40a5e87e7554ee637af1351968159f4952f028f75604" + [[package]] name = "heck" version = "0.4.1" @@ -360,12 +351,6 @@ version = "0.2.153" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c198f91728a82281a64e1f4f9eeb25d82cb32a5de251c6bd1b5154d63a8e7bd" -[[package]] -name = "libm" -version = "0.2.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4ec2a862134d2a7d32d7983ddcdd1c4923530833c9f2ea1a44fc5fa473989058" - [[package]] name = "lock_api" version = "0.4.11" @@ -427,16 +412,6 @@ dependencies = [ "winapi", ] -[[package]] -name = "num-traits" -version = "0.2.18" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da0df0e5185db44f69b44f26786fe401b6c293d1907744beaa7fa62b2e5a517a" -dependencies = [ - "autocfg", - "libm", -] - [[package]] name = "num_cpus" version = "1.16.0" @@ -515,12 +490,6 @@ version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7170ef9988bc169ba16dd36a7fa041e5c4cbeb6a35b76d4c03daded371eae7c0" -[[package]] -name = "ppv-lite86" -version = "0.2.17" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" - [[package]] name = "proc-macro2" version = "1.0.78" @@ -539,46 +508,6 @@ dependencies = [ "proc-macro2", ] -[[package]] -name = "rand" -version = "0.8.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" -dependencies = [ - "libc", - "rand_chacha", - "rand_core", -] - -[[package]] -name = "rand_chacha" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" -dependencies = [ - "ppv-lite86", - "rand_core", -] - -[[package]] -name = "rand_core" -version = "0.6.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" -dependencies = [ - "getrandom", -] - -[[package]] -name = "rand_distr" -version = "0.4.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "32cb0b9bc82b0a0876c2dd994a7e7a2683d3e7390ca40e6886785ef0c7e3ee31" -dependencies = [ - "num-traits", - "rand", -] - [[package]] name = "redox_syscall" version = "0.4.1" @@ -781,31 +710,6 @@ dependencies = [ "syn", ] -[[package]] -name = "tokio-stream" -version = "0.1.14" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "397c988d37662c7dda6d2208364a706264bf3d6138b11d436cbac0ad38832842" -dependencies = [ - "futures-core", - "pin-project-lite", - "tokio", - "tokio-util", -] - -[[package]] -name = "tokio-util" -version = "0.7.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5419f34732d9eb6ee4c3578b7989078579b7f039cbbb9ca2c4da015749371e15" -dependencies = [ - "bytes", - "futures-core", - "futures-sink", - "pin-project-lite", - "tokio", -] - [[package]] name = "tracing" version = "0.1.40" @@ -1050,3 +954,17 @@ name = "windows_x86_64_msvc" version = "0.52.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "32b752e52a2da0ddfbdbcc6fceadfeede4c939ed16d13e648833a61dfb611ed8" + +[[package]] +name = "word_freq_analyzer" +version = "0.1.0" +dependencies = [ + "clap", + "dashmap", + "futures", + "indicatif", + "serde_json", + "tokio", + "tracing", + "tracing-subscriber", +] diff --git a/Cargo.toml b/Cargo.toml index 7f2a757..db76b6a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "default" +name = "word_freq_analyzer" version = "0.1.0" edition = "2021" @@ -7,15 +7,11 @@ edition = "2021" [dependencies] clap = { version = "4.5.1", features = ["env", "derive"] } +dashmap = { version = "5.5.3", features = ["serde"] } futures = "0.3.30" indicatif = "0.17.8" -once_cell = "1.19.0" -rand = "0.8.5" -rand_distr = "0.4.3" -serde = { version = "1.0.197", features = ["derive"] } serde_json = "1.0.114" tokio = { version = "1.36.0", features = ["full"] } -tokio-stream = { version = "0.1.14", features = ["full"] } tracing = "0.1.40" tracing-subscriber = { version = "0.3.18", features = ["env-filter"] } diff --git a/src/main.rs b/src/main.rs index d476ff3..9f2121b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -6,14 +6,16 @@ use std::os::unix::prelude::*; use std::path::PathBuf; use std::rc::{Rc, Weak}; use std::sync::{ - atomic::{AtomicUsize, Ordering}, + atomic::{AtomicBool, AtomicUsize, Ordering}, Arc, }; use std::time::Duration; use clap::Parser; +use dashmap::DashMap; use indicatif::{ProgressBar, ProgressState, ProgressStyle}; -use tokio::{fs, io::AsyncReadExt, sync::Semaphore, time::sleep}; +use tokio::{fs, sync::Semaphore, time::sleep}; +use tracing::Instrument; static KEYWORDS: &[&str] = &[ "数字经济", @@ -44,9 +46,8 @@ static KEYWORDS: &[&str] = &[ "物联网", "机器学习", ]; -thread_local! { - static ANALYZER: AhoCorasick = AhoCorasick::new(KEYWORDS); -} + +thread_local! { static ANALYZER: AhoCorasick = AhoCorasick::new(KEYWORDS); } #[derive(Parser, Debug)] #[command(version, about, long_about = None)] @@ -62,60 +63,80 @@ async fn main() -> io::Result<()> { tracing_subscriber::fmt::init(); let args = Args::parse(); - println!("{}", args.directory.display()); + tracing::debug!( + "start to analyze {} with {} jobs", + args.directory.display(), + args.jobs + ); let all_tasks = { let mut res = Vec::new(); let mut files = fs::read_dir(&args.directory).await?; while let Some(entry) = files.next_entry().await? { - if entry.path().extension().map_or(false, |ext| ext == "txt") { - tracing::debug!( - "Text file {} has been added to task list", - entry.path().display() - ); - let metadata = entry.metadata().await?; - let task = Task { - file: entry.path(), - size: metadata.size() as usize, - }; - res.push(task); + if entry.path().extension().map_or(true, |ext| ext != "txt") { + continue; } + tracing::debug!( + "Text file {} has been added to task list", + entry.path().display() + ); + let metadata = entry.metadata().await?; + let task = Task { + file: entry.path(), + size: metadata.size() as usize, + }; + res.push(task); } res }; + let analysis = Arc::new(DashMap::new()); let permits = Arc::new(Semaphore::new(args.jobs)); let total_size: usize = all_tasks.iter().map(|t| t.size).sum(); let analyzed_size = Arc::new(AtomicUsize::new(0)); tracing::info!("A total of {} bytes of text to analyze", total_size); let mut tasks = Vec::new(); for task in all_tasks.into_iter() { - let analyzed_size = analyzed_size.clone(); - let permits = permits.clone(); - let task = tokio::spawn(async move { - let _ = permits.acquire().await.unwrap(); - let mut buf = String::new(); - let len = fs::File::open(&task.file) - .await? - .read_to_string(&mut buf) - .await?; - let result = ANALYZER.with(|analyzer| analyzer.analyze(&buf)); - tracing::debug_span!("after analyzing").in_scope(|| { - tracing::debug!("Analysis of file {}", &task.file.display()); + let analysis = Arc::clone(&analysis); + let analyzed_size = Arc::clone(&analyzed_size); + let permits = Arc::clone(&permits); + let task_span = tracing::info_span!("Analysis of", file = task.file.display().to_string()); + let task = tokio::spawn( + async move { + tracing::debug!("Prepare to acquire a permit to start"); + let _ = permits.acquire().await.unwrap(); + + tracing::info!("Start to read file"); + let buf = fs::read(&task.file).await?; + let len = buf.len(); + let content = String::from_utf8_lossy(&buf); + + tracing::debug!("Start to analyze"); + let result = ANALYZER.with(|analyzer| analyzer.analyze(&content)); for (word, count) in result.iter() { - tracing::debug!("{}: {}", word, count); + tracing::trace!(word = %word, count = %count, "Analyzed"); + analysis + .entry(word.to_string()) + .and_modify(|e| *e += count) + .or_insert(*count); } - }); - let mut target_file = task.file; - target_file.set_extension("json"); - let json_result = serde_json::to_vec(&result).unwrap(); - fs::write(target_file, json_result).await?; - analyzed_size.fetch_add(len, Ordering::Release); - io::Result::Ok(()) - }); + tracing::debug!("Finished analysis"); + + let json_result = serde_json::to_vec(&result).unwrap(); + fs::write(task.file.with_extension("json"), json_result).await?; + tracing::info!("Write result to file"); + + analyzed_size.fetch_add(len, Ordering::Release); + + io::Result::Ok(()) + } + .instrument(task_span), + ); tasks.push(task); } + let finished = Arc::new(AtomicBool::new(false)); + let finished_flag = Arc::clone(&finished); let progress = tokio::spawn(async move { let process = ProgressBar::new(total_size as u64); const TEMPLATE : &str = "{spinner:.green} [{elapsed_precise}] [{wide_bar:.cyan/blue}] {bytes}/{total_bytes} ({eta})"; @@ -126,21 +147,26 @@ async fn main() -> io::Result<()> { }) .progress_chars("#>-"); process.set_style(style); - while analyzed_size.load(Ordering::Acquire) < total_size { + while analyzed_size.load(Ordering::Acquire) < total_size + && !finished_flag.load(Ordering::Acquire) + { process.set_position(analyzed_size.load(Ordering::Acquire) as u64); sleep(Duration::from_millis(100)).await; } - process.finish_with_message("All files have been processed"); }); let results = futures::future::join_all(tasks).await; - if let Err(e) = progress.await { - tracing::error!("failed to process all tasks: {}", e); - } for res in results.into_iter().map(|r| r.unwrap()) { if let Err(e) = res { tracing::error!("failed to process a task: {}", e); } } + finished.store(true, Ordering::Release); + if let Err(e) = progress.await { + tracing::error!("failed to process all tasks: {}", e); + } + + let total_result = serde_json::to_vec(&*analysis).unwrap(); + fs::write("analysis.json", &total_result).await?; Ok(()) } @@ -191,9 +217,8 @@ impl AhoCorasick { child.lengths.extend(node.borrow().lengths.clone()); child.suffix = Rc::downgrade(node); break; - } else { - suffix = suffix.unwrap().borrow().suffix.upgrade(); } + suffix = suffix.unwrap().borrow().suffix.upgrade(); } } }