Verified Commit b19a88ee authored by Michal 'vorner' Vaner's avatar Michal 'vorner' Vaner
Browse files

Closed handling

When the connection/flow closes:
• Update the internal state
• Report it right away
• Lower the TTL, so it's dropped sooner
parent 9bfd01b0
Pipeline #1825 passed with stage
in 1 minute and 16 seconds
......@@ -78,12 +78,17 @@ Flows Dissector::process(vector<Packet> &packets) {
tie(flow, added, cleanup) = activeFlows.access(match, p);
(added ? result : delayed).insert(flow);
if (added)
activeFlows.ttl(flow, p.field<PKT_timestamp_monotonic>() + inactivityTime);
activeFlows.ttl(flow, flow->ttl(p.field<PKT_timestamp_monotonic>()));
cleanupPerformed = cleanupPerformed || cleanup;
localCache.emplace(match, flow);
} else
flow = localLookup->second;
flow->update(p);
if (flow->update(p))
/*
* The update has been somehow interesting ‒ something important has changed.
* Report it right away.
*/
result.insert(flow);
p.store<PKT_flow>(flow);
/*
* TODO: Pass through NDPI, if applicable. How do dups & reorders work with that?
......@@ -107,13 +112,12 @@ Flows Dissector::timeouts() {
std::lock_guard<std::mutex> guard(mutex);
result.swap(delayed);
}
uint64_t now = timeMsec(CLOCK_MONOTONIC_COARSE);
uint64_t ttlActive = now + inactivityTime;
const uint64_t now = timeMsec(CLOCK_MONOTONIC_COARSE);
// Move them to the back of the LRU and reset their TTL
for (const auto &f: result) {
activeFlows.touch(f);
// TODO: Check if the flow is actually active and not closed ‒ the TTL would differ
activeFlows.ttl(f, ttlActive);
activeFlows.ttl(f, f->ttl(now));
}
activeFlows.cleanupTTL(now);
// Drop long inactive or extra flows, but incorporate them in the result
......
......@@ -40,10 +40,6 @@ private:
* the flows here and empty it.
*/
Flows delayed;
// If no data go there for this long, drop it as dead
static constexpr uint32_t inactivityTime = 900 * 1000;
// If no data go there after the flow has been closed, drop it as dead
static constexpr uint32_t closedTime = 15 * 1000;
// TODO: Make this configurable
static constexpr size_t maxActiveFlowCount = 65536;
public:
......
......@@ -171,13 +171,21 @@ std::string Flow::toJSON() const {
return Pakon::toJSON(fields);
}
void Flow::update(const Packet &packet) {
bool Flow::update(const Packet &packet) {
MuxGuard lock(mutex);
Direction dir = packet.field<PKT_direction>();
bool interesting = false;
if (dir == Direction::IN)
fields.field<FL_in>().update(packet);
else
fields.field<FL_out>().update(packet);
// Did the second half of the flow just close?
if (fields.field<FL_in>().field<FL_closed>()
&& fields.field<FL_out>().field<FL_closed>()
&& fields.field<FL_status>() != FlowStatus::Closed) {
fields.store<FL_status>(FlowStatus::Closed);
interesting = true;
}
// Take max ‒ the packets may be processed out of order
fields.field<FL_lasttime>() = std::max(fields.field<FL_lasttime>(), packet.field<PKT_timestamp>());
fields.field<FL_lasttime_monotonic>() = std::max(fields.field<FL_lasttime_monotonic>(), packet.field<PKT_timestamp_monotonic>());
......@@ -189,6 +197,7 @@ void Flow::update(const Packet &packet) {
fields.field<FL_remote>().store<FL_mac>(macLevel.src);
else if (dir == Direction::OUT && !fields.field<FL_local>().present<FL_mac>() && macLevel.src.size())
fields.field<FL_local>().store<FL_mac>(macLevel.src);
return interesting;
}
void Flow::setStatus(FlowStatus status) {
......@@ -196,4 +205,14 @@ void Flow::setStatus(FlowStatus status) {
fields.store<FL_status>(status);
}
uint64_t Flow::ttl(uint64_t now) const {
MuxGuard lock(mutex);
switch (fields.field<FL_status>()) {
case FlowStatus::Closed:
return now + closedTime;
default:
return now + inactivityTime;
}
}
}
......@@ -166,6 +166,10 @@ private:
Field<std::string, InlineMap, FL_flags>
> fields;
friend class Dissector;
// If no data go there for this long, drop it as dead
static constexpr uint32_t inactivityTime = 900 * 1000;
// If no data go there after the flow has been closed, drop it as dead
static constexpr uint32_t closedTime = 15 * 1000;
public:
const std::string match;
// Get the verdict currently assigned to the flow
......@@ -177,12 +181,17 @@ public:
/*
* Add another packet to the flow. It updates the counters, last
* activity time and possibly extracts some more missing fields.
*
* Returns if something interesting happened (eg. changed state,
* added some fields).
*/
void update(const Packet &packet);
bool update(const Packet &packet);
// Create the flow from a first packet
explicit Flow(const Packet &packet);
// Change the status of the flow
void setStatus(FlowStatus status);
// Compute the TTL based on the flow status and now
uint64_t ttl(uint64_t now) const;
};
typedef Cache<std::string, Flow, 13, true, true, false> FlowCache;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment