Verified Commit e3bee05b authored by Michal 'vorner' Vaner's avatar Michal 'vorner' Vaner
Browse files

Testing & fixes around the rust AmIHacked processing

Test and fix the AmIHacked rust processing. Also include few
improvements:
• Use different hashes in hash tables, to speed up a bit.
• Provide tab in the output instead of a space, as is required by
  to_db.pl
• Accept both compressed and uncompressed input files, as needed by
  repu_add
parent b375d3b9
......@@ -9,3 +9,4 @@ regex = "0.1"
scoped-pool = "1"
serde_json = "0.8"
rustc-serialize = "0.3"
fnv = "1"
......@@ -13,7 +13,7 @@
* The lines look like this:
*
* ```text
* 1.2.3.4 {"telnet":{"2016:12:01":1},"ssh":{"2016-02-02":3}}
* 1.2.3.4 {"telnet":{"2016:12:01":1},"ssh":{"2016-02-02":3}}
* ```
*
* It does so by first splitting it into multiple files (by the string prefix of the IP address, to
......@@ -25,13 +25,16 @@ extern crate regex;
extern crate scoped_pool;
extern crate serde_json;
extern crate rustc_serialize;
extern crate fnv;
use std::process::*;
use std::sync::*;
use std::collections::{HashMap,HashSet};
use std::io::{Write,BufWriter,BufReader};
use std::net::IpAddr;
use std::fs::remove_file;
use regex::Regex;
use fnv::{FnvHashMap, FnvHashSet};
/**
* This is the inner part of SplitOutput.
......@@ -91,7 +94,7 @@ impl SplitOutput {
}
/// Bunch of outputs to store into.
type Splitter = RwLock<HashMap<String, Mutex<SplitOutput>>>;
type Splitter = RwLock<FnvHashMap<String, Mutex<SplitOutput>>>;
/// Split one input file (possibly in parallel with others)
fn split_one(outputs: &Splitter, prefix: &Regex, unzip: &mut Child) {
......@@ -122,15 +125,20 @@ fn split_one(outputs: &Splitter, prefix: &Regex, unzip: &mut Child) {
}
/// Perform the splitting phase, returning set of the file prefixes it has been sorted into.
fn split(pool: &scoped_pool::Pool) -> HashSet<String> {
let outputs: Splitter = RwLock::new(HashMap::new());
fn split(pool: &scoped_pool::Pool) -> FnvHashSet<String> {
let outputs: Splitter = RwLock::new(FnvHashMap::default());
let prefix = Regex::new(r"^(.[^.:]?)").unwrap();
pool.scoped(|scope| {
for arg in std::env::args().skip(1) {
let outputs = &outputs;
let prefix = &prefix;
scope.execute(move || {
let mut unzip = Command::new("/bin/sh").arg("-c").arg(format!("bzip2 -dc <\"{}\" | (bigbuffer 128 || cat)", arg)).stdout(Stdio::piped()).spawn().expect("Failed to start unzip");
let cmd = if arg.ends_with(".bz2") {
"bzip2 -dc"
} else {
"cat"
};
let mut unzip = Command::new("/bin/sh").arg("-c").arg(format!("{} <\"{}\" | (bigbuffer 128 || cat)", cmd, arg)).stdout(Stdio::piped()).spawn().expect("Failed to start unzip");
split_one(outputs, prefix, &mut unzip);
unzip.wait().expect("Failed to wait for unzip");
});
......@@ -149,7 +157,7 @@ struct Record {
}
/// The summed up incidents per IP address.
type ResultSum = HashMap<String, HashMap<String, u32>>;
type ResultSum = FnvHashMap<String, FnvHashMap<String, u32>>;
/**
* A buffer that'll lock when writing the output to stdout (since there'll be many in multiple
......@@ -188,8 +196,8 @@ impl Drop for MultiBuf {
/// If there's something for the previous IP, output it as IP JSON pair and reset the result
fn json_output(buf: &mut MultiBuf, sum: &mut ResultSum, last: &mut Option<IpAddr>) {
if let Some(ip) = *last {
buf.write(format!("{} {}", ip, serde_json::to_string(&sum).unwrap()));
*sum = HashMap::new();
buf.write(format!("{}\t{}", ip, serde_json::to_string(&sum).unwrap()));
*sum = FnvHashMap::default();
}
}
......@@ -210,7 +218,7 @@ fn aggregate(sort: &mut Child) {
let mut last: Option<IpAddr> = None;
let output = sort.stdout.as_mut().unwrap();
let mut reader = csv::Reader::from_reader(BufReader::new(output)).has_headers(false);
let mut sum: ResultSum = HashMap::new();
let mut sum: ResultSum = FnvHashMap::default();
let mut buf = MultiBuf::new();
for row in reader.decode() {
let row: Record = row.unwrap();
......@@ -222,7 +230,7 @@ fn aggregate(sort: &mut Child) {
json_output(&mut buf, &mut sum, &mut last);
}
last = Some(ip);
*sum.entry(row.kind).or_insert_with(HashMap::new).entry(row.date).or_insert(0) += row.cnt;
*sum.entry(row.kind).or_insert_with(FnvHashMap::default).entry(row.date).or_insert(0) += row.cnt;
}
json_output(&mut buf, &mut sum, &mut last);
}
......@@ -231,13 +239,15 @@ fn aggregate(sort: &mut Child) {
* Go through the content of all the files with given prefixes, process them
* and produce aggregated JSONs.
*/
fn jsonize(pool: &scoped_pool::Pool, prefixes: &HashSet<String>) {
fn jsonize(pool: &scoped_pool::Pool, prefixes: FnvHashSet<String>) {
pool.scoped(|scope| {
for prefix in prefixes {
for mut prefix in prefixes.into_iter() {
scope.execute(move || {
let mut sort = Command::new("/bin/sh").arg("-c").arg(format!("gunzip -cd {}.csv.gz | sort -S 1G -T .", prefix)).env("LC_ALL", "C").stdout(Stdio::piped()).spawn().expect("Failed to run sort");
prefix.push_str(".csv.gz");
let mut sort = Command::new("/bin/sh").arg("-c").arg(format!("gunzip -cd {} | sort -S 2G -T .", prefix)).env("LC_ALL", "C").stdout(Stdio::piped()).spawn().expect("Failed to run sort");
aggregate(&mut sort);
sort.wait().expect("Failed to wait for sort");
remove_file(prefix).expect("Failed to remove gzip temporary");
});
}
});
......@@ -246,5 +256,5 @@ fn jsonize(pool: &scoped_pool::Pool, prefixes: &HashSet<String>) {
fn main() {
let pool = scoped_pool::Pool::new(6);
let prefixes = split(&pool);
jsonize(&pool, &prefixes);
jsonize(&pool, prefixes);
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment