Verified Commit 35b54b4b authored by Michal 'vorner' Vaner's avatar Michal 'vorner' Vaner Committed by Michal 'vorner' Vaner
Browse files

Experiment: Some cleanups

Splitting into functions, scoped threads
parent 4b38c5ce
......@@ -6,3 +6,4 @@ authors = ["Michal 'vorner' Vaner <michal.vaner@nic.cz>"]
[dependencies]
csv = "0.14"
regex = "0.1"
scoped-pool = "1"
......@@ -12,15 +12,19 @@
* in case of single-digit octet). The goal is a kind of
* load-balancing the data into files, but making sure the same
* IP addresses are together in the same file.
*
* As it is a small utility for rare manual run, most errors
* simply panic through unwrap() or expect(). We would terminate
* the program anyway.
*/
extern crate csv;
extern crate regex;
extern crate scoped_pool;
use std::process::*;
use std::thread;
use std::sync::*;
use std::collections::HashMap;
use std::collections::{HashMap,HashSet};
use std::io::Write;
use regex::Regex;
......@@ -33,11 +37,15 @@ struct SplitOutput {
}
impl SplitOutput {
/**
* Create a new SplitOutput. It openes the compressor and stores its output. The name is the
* prefix of the file.
*/
fn new(name: &str) -> SplitOutput {
SplitOutput { compressor: Command::new("/bin/sh").arg("-c").arg(format!("gzip >{}.csv.gz", name)).stdin(Stdio::piped()).spawn().expect("Failed to start gzip") }
}
/// Write some data into the file.
fn process(&mut self, data: &Vec<String>) {
fn process(&mut self, data: &[String]) {
write!(self.compressor.stdin.as_mut().unwrap(), "{},{},{},{}\n", data[0], data[1], data[2], data[3]).expect("Write error");
}
}
......@@ -49,56 +57,56 @@ impl Drop for SplitOutput {
}
}
fn main() {
// For threads.
let mut running = Vec::new();
// Currently output files.
/*
* TODO: Could I get rid of that Arc thing? I know I join the
* threads before this goes out of scope.
*/
let outputs: Arc<RwLock<HashMap<String, Mutex<SplitOutput>>>> = Arc::new(RwLock::new(HashMap::new()));
for arg in std::env::args().skip(1) {
let a_cp = arg.clone();
let outputs = outputs.clone();
// Run the input files in parallel
running.push(thread::spawn(move || {
let prefix = Regex::new(r"^(.[^.]?)").unwrap();
let mut unzip = Command::new("/usr/bin/pbzip2").arg("-dc").arg(a_cp).stdout(Stdio::piped()).spawn().expect("Failed to start unzip");
{
let mut output = unzip.stdout.as_mut().unwrap();
let mut reader = csv::Reader::from_reader(&mut output).has_headers(false);
/// Bunch of outputs to store into.
type Splitter = RwLock<HashMap<String, Mutex<SplitOutput>>>;
for row in reader.records() {
let row = row.unwrap();
let iprefix = prefix.captures(&row[0]).expect("Doesn't match").at(1).unwrap();
/*
* First take read lock on the whole map and look up the opened output
* file for the IP prefix. On the very rare occasion it is not yet opened,
* get rid of the read lock (that's the reason for the block in the if
* condition), get a write lock instead. Make sure noone else
* created the opened file in the meantime when we didn't have it locked
* and if not, create a new one.
*/
if {
if let Some(output) = outputs.read().unwrap().get(iprefix) {
output.lock().unwrap().process(&row);
false
} else {
true
}
} {
let mut wlock = outputs.write().unwrap();
wlock.entry(String::from(iprefix)).or_insert_with(|| Mutex::new(SplitOutput::new(iprefix))).lock().unwrap().process(&row);
}
}
/// Split one input file (possibly in parallel with others)
fn split_one(outputs: &Splitter, prefix: &Regex, unzip: &mut Child) {
let mut output = unzip.stdout.as_mut().unwrap();
let mut reader = csv::Reader::from_reader(&mut output).has_headers(false);
for row in reader.records() {
let row = row.unwrap();
let iprefix = prefix.captures(&row[0]).expect("Doesn't match").at(1).unwrap();
/*
* First try to get an already existing opened file. If it is not there (rare),
* drop the read lock, acquire a new write one and check again (someone might have
* created it at the time we didn't hold the lock) and possibly create it.
*/
let created: bool;
{
if let Some(output) = outputs.read().unwrap().get(iprefix) {
created = true;
output.lock().unwrap().process(&row);
} else {
created = false;
}
unzip.wait_with_output().expect("Failed to wait for unzip");
}));
}
// Wait for background threads.
for t in running {
t.join().expect("Failure in a thread");
}
if !created {
let mut wlock = outputs.write().unwrap();
wlock.entry(String::from(iprefix)).or_insert_with(|| Mutex::new(SplitOutput::new(iprefix))).lock().unwrap().process(&row);
}
}
}
/// Perform the splitting phase, returning set of the file prefixes it has been sorted into.
fn split(pool: &scoped_pool::Pool) -> HashSet<String> {
let outputs: Splitter = RwLock::new(HashMap::new());
let prefix = Regex::new(r"^(.[^.:]?)").unwrap();
pool.scoped(|scope|{
for arg in std::env::args().skip(1) {
let outputs = &outputs;
let prefix = &prefix;
scope.execute(move || {
let mut unzip = Command::new("/usr/bin/pbzip2").arg("-dc").arg(arg).stdout(Stdio::piped()).spawn().expect("Failed to start unzip");
split_one(outputs, prefix, &mut unzip);
unzip.wait().unwrap();
});
}
});
outputs.into_inner().unwrap().into_iter().map(|(k, _)| k).collect()
}
fn main() {
let pool = scoped_pool::Pool::new(6);
split(&pool);
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment