Verified Commit 96a3b5fa authored by Pavel Doležal's avatar Pavel Doležal
Browse files

Export: Add optional filling of ASN and Country Code fields in Parquet and C-DNS exports

parent bc7dff55
......@@ -85,8 +85,15 @@ namespace DDP {
DDP::Probe::Probe() : m_cfg_loaded(false), m_initialized(false), m_running(false), m_poll(), m_cfg(),
m_aggregated_timer(nullptr), m_output_timer(nullptr), m_comm_links(),
m_log_link(), m_dns_record_mempool(), m_export_rings(), m_factory_rings(), m_stats(),
m_stopped_workers(0), m_ret_value(ReturnValue::STOP) {}
m_log_link(), m_dns_record_mempool(), m_export_rings(), m_factory_rings(),
m_country(), m_asn(), m_stats(), m_stopped_workers(0),
m_ret_value(ReturnValue::STOP) {}
DDP::Probe::~Probe()
{
MMDB_close(&m_country);
MMDB_close(&m_asn);
}
DDP::ParsedArgs DDP::Probe::process_args(int argc, char** argv)
{
......@@ -290,6 +297,28 @@ void DDP::Probe::init(const Arguments& args)
#endif
}
if (!m_cfg.country_db.value().empty()) {
int status = MMDB_open(m_cfg.country_db.value().c_str(), MMDB_MODE_MMAP, &m_country);
if (status != MMDB_SUCCESS) {
Logger("Probe").warning() << "Couldn't open Maxmind Country database!";
m_country.filename = nullptr;
}
}
else {
m_country.filename = nullptr;
}
if (!m_cfg.asn_db.value().empty()) {
int status = MMDB_open(m_cfg.asn_db.value().c_str(), MMDB_MODE_MMAP, &m_asn);
if (status != MMDB_SUCCESS) {
Logger("Probe").warning() << "Couldn't open Maxmind ASN database!";
m_asn.filename = nullptr;
}
}
else {
m_asn.filename = nullptr;
}
if (m_cfg.export_location.value() == ExportLocation::REMOTE)
TlsCtx::getInstance().init(m_cfg.export_ca_cert.value());
......@@ -313,7 +342,7 @@ DDP::Probe::ReturnValue DDP::Probe::run(std::vector<std::shared_ptr<DDP::Port>>&
try {
Worker w(m_cfg, stats, m_factory_rings.at(worker).get_poll_able_ring(), m_comm_links[worker].worker_endpoint(),
*m_dns_record_mempool, *m_tcp_connection_mempool, queue, ports, w_sockets,
m_cfg.match_qname, worker);
m_cfg.match_qname, worker, m_country, m_asn);
Logger logger("Worker");
logger.info() << "Starting worker on lcore " << ThreadManager::current_lcore() << ".";
w.run();
......
......@@ -27,6 +27,7 @@
#include <functional>
#include <vector>
#include <set>
#include <maxminddb.h>
#include "utils/Poll.h"
#include "utils/RingFwdDecl.h"
......@@ -174,6 +175,11 @@ namespace DDP {
*/
Probe();
/**
* Destructor
*/
~Probe();
/**
* Read all messages from log commlink and writes them to the output.
*/
......@@ -195,6 +201,8 @@ namespace DDP {
std::unique_ptr<Mempool<DnsTcpConnection>> m_tcp_connection_mempool; //!< Mempool for TCP connections.
std::unordered_map<unsigned, std::unique_ptr<Ring<boost::any>>> m_export_rings; //!< Rings for sending data from workers to exporter.
std::unordered_map<unsigned, PollAbleRingFactory<boost::any>> m_factory_rings;
MMDB_s m_country; //!< Maxmind Country database
MMDB_s m_asn; //!< Maxmind ASN database
std::vector<Statistics> m_stats; //!< Statistics structure for workers. One item in vector per worker.
AggregatedStatistics m_aggregated_stats; //!< Aggregated statistics from workers.
......
......@@ -24,6 +24,7 @@
#pragma once
#include <set>
#include <maxminddb.h>
#include "export/BaseExport.h"
#include "export/BaseWriter.h"
#include "core/DnsRecord.h"
......@@ -153,7 +154,8 @@ namespace DDP {
Worker(Config& cfg, Statistics& stats, PollAbleRing<boost::any> ring,
CommLink::CommLinkEP& comm_link, Mempool<DnsRecord>& record_mempool,
Mempool<DnsTcpConnection>& tcp_mempool, unsigned lcore_queue, std::vector<std::shared_ptr<DDP::Port>> ports,
std::vector<std::shared_ptr<DDP::Port>> sockets, bool match_qname, unsigned process_id) :
std::vector<std::shared_ptr<DDP::Port>> sockets, bool match_qname, unsigned process_id, MMDB_s& country_db,
MMDB_s& asn_db) :
Process(cfg, stats, comm_link),
m_record_mempool(record_mempool),
m_tcp_mempool(tcp_mempool),
......@@ -174,14 +176,14 @@ namespace DDP {
{
if (cfg.export_format.value() == ExportFormat::PARQUET) {
#ifdef PROBE_PARQUET
m_exporter = new ParquetExport(cfg);
m_exporter = new ParquetExport(cfg, country_db, asn_db);
#else
throw std::runtime_error("DNS Probe was built without Parquet support!");
#endif
}
else {
#ifdef PROBE_CDNS
m_exporter = new CdnsExport(cfg);
m_exporter = new CdnsExport(cfg, country_db, asn_db);
#else
throw std::runtime_error("DNS Probe was built without C-DNS support!");
#endif
......
/*
* Copyright (C) 2021 CZ.NIC, z. s. p. o.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* In addition, as a special exception, the copyright holders give
* permission to link the code of portions of this program with the
* OpenSSL library under certain conditions as described in each
* individual source file, and distribute linked combinations including
* the two.
*/
#include "BaseExport.h"
void DDP::BaseExport::fill_asn_country(const in6_addr* addr, int ipv, std::string& asn, std::string& country)
{
if (m_country.filename != nullptr || m_asn.filename != nullptr) {
sockaddr_in sa4;
sockaddr_in6 sa6;
sockaddr* sa = nullptr;
if (ipv == AF_INET) {
sa4.sin_family = AF_INET;
sa = reinterpret_cast<sockaddr*>(&sa4);
sa4.sin_addr = *reinterpret_cast<const in_addr*>(addr);
}
else {
sa6.sin6_family = AF_INET6;
sa6.sin6_addr = *addr;
sa = reinterpret_cast<sockaddr*>(&sa6);
}
if (m_country.filename != nullptr) {
int err;
auto result = MMDB_lookup_sockaddr(&m_country, sa, &err);
if (err == MMDB_SUCCESS && result.found_entry) {
MMDB_entry_data_s data;
int status = MMDB_get_value(&result.entry, &data, "country", "iso_code", NULL);
if (status == MMDB_SUCCESS && data.has_data && data.type == MMDB_DATA_TYPE_UTF8_STRING)
country = std::string(data.utf8_string, data.data_size);
}
}
if (m_asn.filename != nullptr) {
int err;
auto result = MMDB_lookup_sockaddr(&m_asn, sa, &err);
if (err == MMDB_SUCCESS && result.found_entry) {
MMDB_entry_data_s data;
int status = MMDB_get_value(&result.entry, &data, "autonomous_system_number", NULL);
if (status == MMDB_SUCCESS && data.has_data && data.type == MMDB_DATA_TYPE_UINT32)
asn = std::to_string(data.uint32);
}
}
}
}
......@@ -27,6 +27,7 @@
#include <cstring>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <maxminddb.h>
#ifdef PROBE_CRYPTOPANT
#include <cryptopANT.h>
......@@ -48,7 +49,8 @@ namespace DDP {
class BaseExport
{
public:
explicit BaseExport(bool anonymize_ip) : m_anonymize_ip(anonymize_ip) {}
explicit BaseExport(bool anonymize_ip, MMDB_s& country_db, MMDB_s& asn_db)
: m_anonymize_ip(anonymize_ip), m_country(country_db), m_asn(asn_db) {}
virtual ~BaseExport() {};
......@@ -79,6 +81,18 @@ namespace DDP {
virtual void update_configuration(Config& cfg) = 0;
protected:
/**
* @brief Fill given ASN and Country Code strings from Maxmind databases based on given IP address
* @param addr IP address to lookup in Maxmind databases
* @param ipv Version of given IP address
* @param asn ASN string to fill with IP's ASN
* @param country Country Code string to fill with IP's ISO 3166-1 country code
*/
void fill_asn_country(const in6_addr* addr, int ipv, std::string& asn, std::string& country);
bool m_anonymize_ip;
MMDB_s& m_country;
MMDB_s& m_asn;
};
}
......@@ -23,8 +23,9 @@
#include "CdnsExport.h"
DDP::CdnsExport::CdnsExport(Config& cfg)
: BaseExport(cfg.anonymize_ip.value()), m_fields(cfg.cdns_fields.value()), m_parameters()
DDP::CdnsExport::CdnsExport(Config& cfg, MMDB_s& country_db, MMDB_s& asn_db)
: BaseExport(cfg.anonymize_ip.value(), country_db, asn_db), m_fields(cfg.cdns_fields.value()),
m_parameters()
{
m_parameters.storage_parameters.max_block_items = cfg.cdns_records_per_block.value();
set_cdns_hints(m_parameters.storage_parameters.storage_hints.query_response_hints,
......@@ -73,8 +74,11 @@ boost::any DDP::CdnsExport::buffer_record(DnsRecord& record)
qrs_filled = true;
}
std::string country;
std::string asn;
if (m_fields[static_cast<uint32_t>(CDNSField::CLIENT_ADDRESS)]) {
in6_addr* addr = record.client_address();
fill_asn_country(addr, record.m_addr_family == DnsRecord::AddrFamily::IP4 ? AF_INET : AF_INET6, asn, country);
if (record.m_addr_family == DnsRecord::AddrFamily::IP4) {
#ifdef PROBE_CRYPTOPANT
if (m_anonymize_ip)
......@@ -217,6 +221,12 @@ boost::any DDP::CdnsExport::buffer_record(DnsRecord& record)
if (m_fields[static_cast<uint32_t>(CDNSField::RESPONSE_DELAY)])
qr.response_delay = record.m_tcp_rtt;
if (!asn.empty())
qr.asn = asn;
if (!country.empty())
qr.country_code = country;
// Add QueryResponseSignature to the QueryResponse
if (qrs_filled)
qr.qr_signature_index = m_block->add_qr_signature(qrs);
......
......@@ -39,7 +39,7 @@ namespace DDP {
* @brief Constructor creates new C-DNS block configured for given C-DNS fields
* @param cfg Object witch configuration options
*/
CdnsExport(Config& cfg);
CdnsExport(Config& cfg, MMDB_s& country_db, MMDB_s& asn_db);
/**
* @brief Store DNS record into C-DNS block
......
......@@ -35,8 +35,9 @@
constexpr char DDP::ParquetExport::DIGITS[];
DDP::ParquetExport::ParquetExport(Config& cfg)
: BaseExport(cfg.anonymize_ip.value()), m_records_limit(cfg.parquet_records.value())
DDP::ParquetExport::ParquetExport(Config& cfg, MMDB_s& country_db, MMDB_s& asn_db)
: BaseExport(cfg.anonymize_ip.value(), country_db, asn_db),
m_records_limit(cfg.parquet_records.value())
{
m_DnsSchema = arrow::schema({arrow::field("id", arrow::int32()),
arrow::field("unixtime", arrow::int64()),
......@@ -161,6 +162,10 @@ boost::any DDP::ParquetExport::buffer_record(DDP::DnsRecord& record)
in6_addr* addr = record.client_address();
int ipv = record.m_addr_family == DDP::DnsRecord::AddrFamily::IP4 ? AF_INET : AF_INET6;
std::string country;
std::string asn;
fill_asn_country(addr, ipv, asn, country);
#ifdef PROBE_CRYPTOPANT
if (m_anonymize_ip) {
if (ipv == AF_INET)
......@@ -236,10 +241,10 @@ boost::any DDP::ParquetExport::buffer_record(DDP::DnsRecord& record)
PARQUET_THROW_NOT_OK(QClass.Append(record.m_qclass));
// Country
PARQUET_THROW_NOT_OK(Country.Append(""));
PARQUET_THROW_NOT_OK(Country.Append(country));
// ASN
PARQUET_THROW_NOT_OK(ASN.Append(""));
PARQUET_THROW_NOT_OK(ASN.Append(asn));
// EDNS UDP payload
PARQUET_THROW_NOT_OK(EdnsUDP.Append(record.m_ednsUDP));
......
......@@ -77,7 +77,7 @@ namespace DDP {
* @brief Constructor creates Parquet file schema
* @param cfg Object with configuration options
*/
explicit ParquetExport(Config& cfg);
explicit ParquetExport(Config& cfg, MMDB_s& country_db, MMDB_s& asn_db);
/**
* @brief Store DNS record into arrow columns.
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment