Initial commit

Signed-off-by: hr567 <hr567@hr567.me>
This commit is contained in:
2024-03-07 00:37:25 +08:00
parent a71087840e
commit eff24bb8e5
3 changed files with 100 additions and 161 deletions

View File

@ -6,14 +6,16 @@ use std::os::unix::prelude::*;
use std::path::PathBuf;
use std::rc::{Rc, Weak};
use std::sync::{
atomic::{AtomicUsize, Ordering},
atomic::{AtomicBool, AtomicUsize, Ordering},
Arc,
};
use std::time::Duration;
use clap::Parser;
use dashmap::DashMap;
use indicatif::{ProgressBar, ProgressState, ProgressStyle};
use tokio::{fs, io::AsyncReadExt, sync::Semaphore, time::sleep};
use tokio::{fs, sync::Semaphore, time::sleep};
use tracing::Instrument;
static KEYWORDS: &[&str] = &[
"数字经济",
@ -44,9 +46,8 @@ static KEYWORDS: &[&str] = &[
"物联网",
"机器学习",
];
thread_local! {
static ANALYZER: AhoCorasick = AhoCorasick::new(KEYWORDS);
}
thread_local! { static ANALYZER: AhoCorasick = AhoCorasick::new(KEYWORDS); }
#[derive(Parser, Debug)]
#[command(version, about, long_about = None)]
@ -62,60 +63,80 @@ async fn main() -> io::Result<()> {
tracing_subscriber::fmt::init();
let args = Args::parse();
println!("{}", args.directory.display());
tracing::debug!(
"start to analyze {} with {} jobs",
args.directory.display(),
args.jobs
);
let all_tasks = {
let mut res = Vec::new();
let mut files = fs::read_dir(&args.directory).await?;
while let Some(entry) = files.next_entry().await? {
if entry.path().extension().map_or(false, |ext| ext == "txt") {
tracing::debug!(
"Text file {} has been added to task list",
entry.path().display()
);
let metadata = entry.metadata().await?;
let task = Task {
file: entry.path(),
size: metadata.size() as usize,
};
res.push(task);
if entry.path().extension().map_or(true, |ext| ext != "txt") {
continue;
}
tracing::debug!(
"Text file {} has been added to task list",
entry.path().display()
);
let metadata = entry.metadata().await?;
let task = Task {
file: entry.path(),
size: metadata.size() as usize,
};
res.push(task);
}
res
};
let analysis = Arc::new(DashMap::new());
let permits = Arc::new(Semaphore::new(args.jobs));
let total_size: usize = all_tasks.iter().map(|t| t.size).sum();
let analyzed_size = Arc::new(AtomicUsize::new(0));
tracing::info!("A total of {} bytes of text to analyze", total_size);
let mut tasks = Vec::new();
for task in all_tasks.into_iter() {
let analyzed_size = analyzed_size.clone();
let permits = permits.clone();
let task = tokio::spawn(async move {
let _ = permits.acquire().await.unwrap();
let mut buf = String::new();
let len = fs::File::open(&task.file)
.await?
.read_to_string(&mut buf)
.await?;
let result = ANALYZER.with(|analyzer| analyzer.analyze(&buf));
tracing::debug_span!("after analyzing").in_scope(|| {
tracing::debug!("Analysis of file {}", &task.file.display());
let analysis = Arc::clone(&analysis);
let analyzed_size = Arc::clone(&analyzed_size);
let permits = Arc::clone(&permits);
let task_span = tracing::info_span!("Analysis of", file = task.file.display().to_string());
let task = tokio::spawn(
async move {
tracing::debug!("Prepare to acquire a permit to start");
let _ = permits.acquire().await.unwrap();
tracing::info!("Start to read file");
let buf = fs::read(&task.file).await?;
let len = buf.len();
let content = String::from_utf8_lossy(&buf);
tracing::debug!("Start to analyze");
let result = ANALYZER.with(|analyzer| analyzer.analyze(&content));
for (word, count) in result.iter() {
tracing::debug!("{}: {}", word, count);
tracing::trace!(word = %word, count = %count, "Analyzed");
analysis
.entry(word.to_string())
.and_modify(|e| *e += count)
.or_insert(*count);
}
});
let mut target_file = task.file;
target_file.set_extension("json");
let json_result = serde_json::to_vec(&result).unwrap();
fs::write(target_file, json_result).await?;
analyzed_size.fetch_add(len, Ordering::Release);
io::Result::Ok(())
});
tracing::debug!("Finished analysis");
let json_result = serde_json::to_vec(&result).unwrap();
fs::write(task.file.with_extension("json"), json_result).await?;
tracing::info!("Write result to file");
analyzed_size.fetch_add(len, Ordering::Release);
io::Result::Ok(())
}
.instrument(task_span),
);
tasks.push(task);
}
let finished = Arc::new(AtomicBool::new(false));
let finished_flag = Arc::clone(&finished);
let progress = tokio::spawn(async move {
let process = ProgressBar::new(total_size as u64);
const TEMPLATE : &str = "{spinner:.green} [{elapsed_precise}] [{wide_bar:.cyan/blue}] {bytes}/{total_bytes} ({eta})";
@ -126,21 +147,26 @@ async fn main() -> io::Result<()> {
})
.progress_chars("#>-");
process.set_style(style);
while analyzed_size.load(Ordering::Acquire) < total_size {
while analyzed_size.load(Ordering::Acquire) < total_size
&& !finished_flag.load(Ordering::Acquire)
{
process.set_position(analyzed_size.load(Ordering::Acquire) as u64);
sleep(Duration::from_millis(100)).await;
}
process.finish_with_message("All files have been processed");
});
let results = futures::future::join_all(tasks).await;
if let Err(e) = progress.await {
tracing::error!("failed to process all tasks: {}", e);
}
for res in results.into_iter().map(|r| r.unwrap()) {
if let Err(e) = res {
tracing::error!("failed to process a task: {}", e);
}
}
finished.store(true, Ordering::Release);
if let Err(e) = progress.await {
tracing::error!("failed to process all tasks: {}", e);
}
let total_result = serde_json::to_vec(&*analysis).unwrap();
fs::write("analysis.json", &total_result).await?;
Ok(())
}
@ -191,9 +217,8 @@ impl AhoCorasick {
child.lengths.extend(node.borrow().lengths.clone());
child.suffix = Rc::downgrade(node);
break;
} else {
suffix = suffix.unwrap().borrow().suffix.upgrade();
}
suffix = suffix.unwrap().borrow().suffix.upgrade();
}
}
}