use std::cell::RefCell; use std::collections::{BTreeMap, HashMap, VecDeque}; use std::fmt::Write; use std::io; use std::os::unix::prelude::*; use std::path::PathBuf; use std::rc::{Rc, Weak}; use std::sync::{ atomic::{AtomicBool, AtomicUsize, Ordering}, Arc, }; use std::time::Duration; use clap::Parser; use dashmap::DashMap; use indicatif::{ProgressBar, ProgressState, ProgressStyle}; use once_cell::{sync::Lazy, unsync::OnceCell}; use tokio::{fs, sync::Semaphore, time::sleep}; use tracing::Instrument; #[derive(Parser, Debug)] #[command(version, about, long_about = None)] struct Args { #[arg(short, long)] directory: PathBuf, #[arg(short, long, default_value = "16")] jobs: usize, #[arg(short = 'f', long)] keywords_file: Option, #[arg(last = true)] keywords: Vec, } static ARGS: Lazy = Lazy::new(Args::parse); thread_local! { static ANALYZER: OnceCell = const { OnceCell::new() }; } #[tokio::main] async fn main() -> io::Result<()> { tracing_subscriber::fmt::init(); tracing::debug!( "start to analyze {} with {} jobs", ARGS.directory.display(), ARGS.jobs ); tracing::debug!("current keywords are {:?}", &ARGS.keywords); let all_tasks = { let mut res = Vec::new(); let mut files = fs::read_dir(&ARGS.directory).await?; while let Some(entry) = files.next_entry().await? { if entry.path().extension().map_or(true, |ext| ext != "txt") { continue; } tracing::debug!( "Text file {} has been added to task list", entry.path().display() ); let metadata = entry.metadata().await?; let task = Task { file: entry.path(), size: metadata.size() as usize, }; res.push(task); } res }; let analysis = Arc::new(DashMap::new()); let permits = Arc::new(Semaphore::new(ARGS.jobs)); let total_size: usize = all_tasks.iter().map(|t| t.size).sum(); let analyzed_size = Arc::new(AtomicUsize::new(0)); tracing::info!("A total of {} bytes of text to analyze", total_size); let mut tasks = Vec::new(); for task in all_tasks.into_iter() { let analysis = Arc::clone(&analysis); let analyzed_size = Arc::clone(&analyzed_size); let permits = Arc::clone(&permits); let task_span = tracing::info_span!("Analysis of", file = task.file.display().to_string()); let task = tokio::spawn( async move { tracing::debug!("Prepare to acquire a permit to start"); let _permit = permits.acquire().await.unwrap(); tracing::info!("Start to read file"); let buf = fs::read(&task.file).await?; let len = buf.len(); let content = String::from_utf8_lossy(&buf); tracing::debug!("Start to analyze"); let result = ANALYZER.with(|analyzer| { analyzer .get_or_init(|| AhoCorasick::new(&ARGS.keywords)) .analyze(&content) }); for (word, count) in result.iter() { tracing::trace!(word = %word, count = %count, "Analyzed"); analysis .entry(word.to_string()) .and_modify(|e| *e += count) .or_insert(*count); } tracing::debug!("Finished analysis"); let json_result = serde_json::to_vec(&result).unwrap(); fs::write(task.file.with_extension("json"), json_result).await?; tracing::info!("Write result to file"); analyzed_size.fetch_add(len, Ordering::Release); io::Result::Ok(()) } .instrument(task_span), ); tasks.push(task); } let finished = Arc::new(AtomicBool::new(false)); let finished_flag = Arc::clone(&finished); let progress = tokio::spawn(async move { let process = ProgressBar::new(total_size as u64); const TEMPLATE : &str = "{spinner:.green} [{elapsed_precise}] [{wide_bar:.cyan/blue}] {bytes}/{total_bytes} ({eta})"; let style = ProgressStyle::with_template(TEMPLATE) .unwrap() .with_key("eta", |state: &ProgressState, w: &mut dyn Write| { write!(w, "{:.1}s", state.eta().as_secs_f64()).unwrap() }) .progress_chars("#>-"); process.set_style(style); while analyzed_size.load(Ordering::Acquire) < total_size { if finished_flag.load(Ordering::Acquire) { tracing::warn!("Some tasks failed to complete"); break; } process.set_position(analyzed_size.load(Ordering::Acquire) as u64); sleep(Duration::from_millis(100)).await; } }); let results = futures::future::join_all(tasks).await; for res in results.into_iter().map(|r| r.unwrap()) { if let Err(e) = res { tracing::error!("failed to process a task: {}", e); } } finished.store(true, Ordering::Release); if let Err(e) = progress.await { tracing::error!("failed to process all tasks: {}", e); } let total_result = serde_json::to_vec(&*analysis).unwrap(); fs::write("analysis.json", &total_result).await?; Ok(()) } struct Task { file: PathBuf, size: usize, } #[derive(Default)] pub struct AhoCorasick { root: Rc>, } impl AhoCorasick { pub fn new>(words: &[S]) -> Self { let root = Rc::new(RefCell::new(ACNode::default())); for word in words.iter().map(|s| s.as_ref()) { let mut cur = Rc::clone(&root); for c in word.chars() { cur = Rc::clone(Rc::clone(&cur).borrow_mut().trans.entry(c).or_default()); } cur.borrow_mut().lengths.push(word.len()); } Self::build_suffix(Rc::clone(&root)); Self { root } } fn build_suffix(root: Rc>) { let mut q = VecDeque::new(); q.push_back(Rc::clone(&root)); while let Some(parent) = q.pop_front() { let parent = parent.borrow(); for (c, child) in &parent.trans { q.push_back(Rc::clone(child)); let mut child = child.borrow_mut(); let mut suffix = parent.suffix.upgrade(); loop { match &suffix { None => { child.lengths.extend(root.borrow().lengths.clone()); child.suffix = Rc::downgrade(&root); break; } Some(node) => { if node.borrow().trans.contains_key(c) { let node = &node.borrow().trans[c]; child.lengths.extend(node.borrow().lengths.clone()); child.suffix = Rc::downgrade(node); break; } suffix = suffix.unwrap().borrow().suffix.upgrade(); } } } } } } pub fn analyze<'a>(&self, s: &'a str) -> HashMap<&'a str, usize> { let mut ans = HashMap::new(); let mut cur = Rc::clone(&self.root); let mut position: usize = 0; for c in s.chars() { loop { if let Some(child) = Rc::clone(&cur).borrow().trans.get(&c) { cur = Rc::clone(child); break; } let suffix = cur.borrow().suffix.clone(); match suffix.upgrade() { Some(node) => cur = node, None => break, } } position += c.len_utf8(); for &len in &cur.borrow().lengths { ans.entry(&s[position - len..position]) .and_modify(|e| *e += 1) .or_insert(1); } } ans } } #[derive(Default)] struct ACNode { trans: BTreeMap>>, suffix: Weak>, lengths: Vec, }