mirror of
https://github.com/stalwartlabs/smtp-server.git
synced 2024-10-23 06:57:29 +00:00
Aggregate reporting part 3.
This commit is contained in:
parent
a5f8bce0e2
commit
2bab4dcf48
11 changed files with 482 additions and 64 deletions
55
Cargo.lock
generated
55
Cargo.lock
generated
|
@ -138,6 +138,12 @@ version = "0.20.0"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0ea22880d78093b0cbe17c89f64a7d457941e65759157ec6cb31a31d652b05e5"
|
||||
|
||||
[[package]]
|
||||
name = "base64"
|
||||
version = "0.21.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a4a4ddaa51a5bc52a6948f74c06d20aaaddb71924eab79b8c97a8c556e942d6a"
|
||||
|
||||
[[package]]
|
||||
name = "base64ct"
|
||||
version = "1.5.3"
|
||||
|
@ -990,7 +996,7 @@ dependencies = [
|
|||
[[package]]
|
||||
name = "mail-builder"
|
||||
version = "0.2.4"
|
||||
source = "git+https://github.com/stalwartlabs/mail-builder#c226ae550f15fbc86e0b7b4f2d8b93df2e2390ef"
|
||||
source = "git+https://github.com/stalwartlabs/mail-builder#057939b7b0a9aa952f059250d5342c7766710b39"
|
||||
dependencies = [
|
||||
"gethostname",
|
||||
]
|
||||
|
@ -1223,9 +1229,9 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "parking_lot_core"
|
||||
version = "0.9.5"
|
||||
version = "0.9.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7ff9f3fef3968a3ec5945535ed654cb38ff72d7495a25619e2247fb15a2ed9ba"
|
||||
checksum = "ba1ef8814b5c993410bb3adfad7a5ed269563e4a2f90c41f5d85be7fb47133bf"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"libc",
|
||||
|
@ -1594,9 +1600,9 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "rustls"
|
||||
version = "0.20.7"
|
||||
version = "0.20.8"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "539a2bfe908f471bfa933876bd1eb6a19cf2176d375f82ef7f99530a40e48c2c"
|
||||
checksum = "fff78fc74d175294f4e83b28343315ffcfb114b156f0185e9741cb5570f50e2f"
|
||||
dependencies = [
|
||||
"log",
|
||||
"ring",
|
||||
|
@ -1606,11 +1612,11 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "rustls-pemfile"
|
||||
version = "1.0.1"
|
||||
version = "1.0.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0864aeff53f8c05aa08d86e5ef839d3dfcf07aeba2db32f12db0ef716e87bd55"
|
||||
checksum = "d194b56d58803a43635bdc398cd17e383d6f71f9182b9a192c127ca42494a59b"
|
||||
dependencies = [
|
||||
"base64 0.13.1",
|
||||
"base64 0.21.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -2379,45 +2385,45 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "windows_aarch64_gnullvm"
|
||||
version = "0.42.0"
|
||||
version = "0.42.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "41d2aa71f6f0cbe00ae5167d90ef3cfe66527d6f613ca78ac8024c3ccab9a19e"
|
||||
checksum = "8c9864e83243fdec7fc9c5444389dcbbfd258f745e7853198f365e3c4968a608"
|
||||
|
||||
[[package]]
|
||||
name = "windows_aarch64_msvc"
|
||||
version = "0.42.0"
|
||||
version = "0.42.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "dd0f252f5a35cac83d6311b2e795981f5ee6e67eb1f9a7f64eb4500fbc4dcdb4"
|
||||
checksum = "4c8b1b673ffc16c47a9ff48570a9d85e25d265735c503681332589af6253c6c7"
|
||||
|
||||
[[package]]
|
||||
name = "windows_i686_gnu"
|
||||
version = "0.42.0"
|
||||
version = "0.42.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "fbeae19f6716841636c28d695375df17562ca208b2b7d0dc47635a50ae6c5de7"
|
||||
checksum = "de3887528ad530ba7bdbb1faa8275ec7a1155a45ffa57c37993960277145d640"
|
||||
|
||||
[[package]]
|
||||
name = "windows_i686_msvc"
|
||||
version = "0.42.0"
|
||||
version = "0.42.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "84c12f65daa39dd2babe6e442988fc329d6243fdce47d7d2d155b8d874862246"
|
||||
checksum = "bf4d1122317eddd6ff351aa852118a2418ad4214e6613a50e0191f7004372605"
|
||||
|
||||
[[package]]
|
||||
name = "windows_x86_64_gnu"
|
||||
version = "0.42.0"
|
||||
version = "0.42.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "bf7b1b21b5362cbc318f686150e5bcea75ecedc74dd157d874d754a2ca44b0ed"
|
||||
checksum = "c1040f221285e17ebccbc2591ffdc2d44ee1f9186324dd3e84e99ac68d699c45"
|
||||
|
||||
[[package]]
|
||||
name = "windows_x86_64_gnullvm"
|
||||
version = "0.42.0"
|
||||
version = "0.42.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "09d525d2ba30eeb3297665bd434a54297e4170c7f1a44cad4ef58095b4cd2028"
|
||||
checksum = "628bfdf232daa22b0d64fdb62b09fcc36bb01f05a3939e20ab73aaf9470d0463"
|
||||
|
||||
[[package]]
|
||||
name = "windows_x86_64_msvc"
|
||||
version = "0.42.0"
|
||||
version = "0.42.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f40009d85759725a34da6d89a94e63d7bdc50a862acf0dbc7c8e488f1edcb6f5"
|
||||
checksum = "447660ad36a13288b1db4d4248e857b510e8c3a225c822ba4fb748c0aafecffd"
|
||||
|
||||
[[package]]
|
||||
name = "winreg"
|
||||
|
@ -2508,10 +2514,11 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "zstd-sys"
|
||||
version = "2.0.4+zstd.1.5.2"
|
||||
version = "2.0.5+zstd.1.5.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4fa202f2ef00074143e219d15b62ffc317d17cc33909feac471c044087cad7b0"
|
||||
checksum = "edc50ffce891ad571e9f9afe5039c4837bede781ac4bb13052ed7ae695518596"
|
||||
dependencies = [
|
||||
"cc",
|
||||
"libc",
|
||||
"pkg-config",
|
||||
]
|
||||
|
|
|
@ -2,10 +2,8 @@
|
|||
Stalwart SMTP Server
|
||||
|
||||
# TODO
|
||||
- DKIM, SPF, DMARC integration
|
||||
- RBL
|
||||
- Logging
|
||||
- Reporting
|
||||
- Sieve
|
||||
- Spam filter
|
||||
- Antivirus
|
||||
|
|
|
@ -9,7 +9,7 @@ use smtp_server::{
|
|||
Core, QueueCore, ReportCore, SessionCore, TlsConnectors,
|
||||
},
|
||||
queue::{self, manager::SpawnQueue},
|
||||
reporting::scheduler::SpawnReport,
|
||||
reporting::{self, scheduler::SpawnReport},
|
||||
};
|
||||
use tokio::sync::{mpsc, watch};
|
||||
|
||||
|
@ -127,7 +127,7 @@ async fn main() -> std::io::Result<()> {
|
|||
queue_rx.spawn(core.clone(), core.queue.read_queue().await);
|
||||
|
||||
// Spawn report manager
|
||||
//report_rx.spawn(core.clone());
|
||||
report_rx.spawn(core.clone(), core.report.read_reports().await);
|
||||
|
||||
// Spawn listeners
|
||||
let (shutdown_tx, shutdown_rx) = watch::channel(false);
|
||||
|
@ -170,6 +170,7 @@ async fn main() -> std::io::Result<()> {
|
|||
// Stop services
|
||||
shutdown_tx.send(true).ok();
|
||||
core.queue.tx.send(queue::Event::Stop).await.ok();
|
||||
core.report.tx.send(reporting::Event::Stop).await.ok();
|
||||
|
||||
// Wait for services to finish
|
||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||
|
|
|
@ -77,7 +77,7 @@ impl Core {
|
|||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) fn policy_add<'x>(
|
||||
pub fn policy_add<'x>(
|
||||
&self,
|
||||
key: impl mail_auth::common::resolver::IntoFqdn<'x>,
|
||||
value: Policy,
|
||||
|
|
|
@ -409,6 +409,24 @@ pub fn instant_to_timestamp(now: Instant, time: Instant) -> u64 {
|
|||
+ time.checked_duration_since(now).map_or(0, |d| d.as_secs())
|
||||
}
|
||||
|
||||
pub trait InstantFromTimestamp {
|
||||
fn to_instant(&self) -> Instant;
|
||||
}
|
||||
|
||||
impl InstantFromTimestamp for u64 {
|
||||
fn to_instant(&self) -> Instant {
|
||||
let timestamp = *self;
|
||||
let current_timestamp = SystemTime::now()
|
||||
.duration_since(SystemTime::UNIX_EPOCH)
|
||||
.map_or(0, |d| d.as_secs());
|
||||
if timestamp > current_timestamp {
|
||||
Instant::now() + Duration::from_secs(timestamp - current_timestamp)
|
||||
} else {
|
||||
Instant::now()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub trait DomainPart {
|
||||
fn domain_part(&self) -> &str;
|
||||
}
|
||||
|
|
|
@ -13,8 +13,8 @@ use crate::config::QueueConfig;
|
|||
use crate::core::QueueCore;
|
||||
|
||||
use super::{
|
||||
instant_to_timestamp, Domain, Error, ErrorDetails, Event, HostResponse, Message, Recipient,
|
||||
Schedule, SimpleEnvelope, Status, RCPT_STATUS_CHANGED,
|
||||
instant_to_timestamp, Domain, Error, ErrorDetails, Event, HostResponse, InstantFromTimestamp,
|
||||
Message, Recipient, Schedule, SimpleEnvelope, Status, RCPT_STATUS_CHANGED,
|
||||
};
|
||||
|
||||
impl QueueCore {
|
||||
|
@ -612,16 +612,7 @@ impl QueueSerializer for Instant {
|
|||
}
|
||||
|
||||
fn deserialize(bytes: &mut Iter<'_, u8>) -> Option<Self> {
|
||||
let timestamp = usize::deserialize(bytes)? as u64;
|
||||
let current_timestamp = SystemTime::now()
|
||||
.duration_since(SystemTime::UNIX_EPOCH)
|
||||
.map_or(0, |d| d.as_secs());
|
||||
if timestamp > current_timestamp {
|
||||
Instant::now() + Duration::from_secs(timestamp - current_timestamp)
|
||||
} else {
|
||||
Instant::now()
|
||||
}
|
||||
.into()
|
||||
(usize::deserialize(bytes)? as u64).to_instant().into()
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -39,8 +39,7 @@ impl<T: AsyncWrite + AsyncRead + Unpin> Session<T> {
|
|||
.with_dkim_identity(signature.identity())
|
||||
.with_headers(message.raw_headers())
|
||||
.write_rfc5322(
|
||||
config.name.eval(self).await,
|
||||
from_addr,
|
||||
(config.name.eval(self).await.as_str(), from_addr.as_str()),
|
||||
rcpt,
|
||||
config.subject.eval(self).await,
|
||||
&mut report,
|
||||
|
|
|
@ -173,8 +173,7 @@ impl<T: AsyncWrite + AsyncRead + Unpin> Session<T> {
|
|||
IdentityAlignment::Spf
|
||||
})
|
||||
.write_rfc5322(
|
||||
config.name.eval(self).await,
|
||||
from_addr,
|
||||
(config.name.eval(self).await.as_str(), from_addr.as_str()),
|
||||
&rcpts.join(", "),
|
||||
config.subject.eval(self).await,
|
||||
&mut report,
|
||||
|
@ -245,6 +244,7 @@ impl GenerateDmarcReport for Arc<Core> {
|
|||
fn generate_dmarc_report(&self, domain: ReportPolicy<String>, path: ReportPath<PathBuf>) {
|
||||
let core = self.clone();
|
||||
tokio::spawn(async move {
|
||||
// Deserialize report
|
||||
let dmarc = if let Some(dmarc) = json_read::<DmarcFormat>(&path.path).await {
|
||||
dmarc
|
||||
} else {
|
||||
|
@ -317,8 +317,10 @@ impl GenerateDmarcReport for Arc<Core> {
|
|||
.submitter
|
||||
.eval(&domain.inner.as_str())
|
||||
.await,
|
||||
config.name.eval(&domain.inner.as_str()).await,
|
||||
from_addr,
|
||||
(
|
||||
config.name.eval(&domain.inner.as_str()).await.as_str(),
|
||||
from_addr.as_str(),
|
||||
),
|
||||
rua.iter().map(|a| a.as_str()),
|
||||
&mut message,
|
||||
);
|
||||
|
@ -368,7 +370,12 @@ impl Scheduler {
|
|||
.map_or(0, |d| d.as_secs());
|
||||
let deliver_at = created + event.interval.as_secs();
|
||||
let path = core
|
||||
.build_report_path(&domain, policy, deliver_at, "dmarc")
|
||||
.build_report_path(
|
||||
ReportType::Dmarc(&domain),
|
||||
policy,
|
||||
deliver_at,
|
||||
event.interval.as_secs(),
|
||||
)
|
||||
.await;
|
||||
let v = e.insert(ReportType::Dmarc(ReportPath {
|
||||
path,
|
||||
|
@ -383,7 +390,7 @@ impl Scheduler {
|
|||
if let Some(domain) = create {
|
||||
// Serialize report
|
||||
let entry = DmarcFormat {
|
||||
rua: event.dmarc_record.rua().iter().cloned().collect(),
|
||||
rua: event.dmarc_record.rua().to_vec(),
|
||||
policy: PolicyPublished::from_record(domain, &event.dmarc_record),
|
||||
records: vec![event.report_record],
|
||||
};
|
||||
|
|
|
@ -1,12 +1,15 @@
|
|||
use ahash::{AHashMap, AHasher};
|
||||
use mail_auth::{
|
||||
common::{base32::Base32Writer, headers::Writer},
|
||||
common::{
|
||||
base32::{Base32Reader, Base32Writer},
|
||||
headers::Writer,
|
||||
},
|
||||
dmarc::Dmarc,
|
||||
};
|
||||
|
||||
use serde::{de::DeserializeOwned, Serialize};
|
||||
use std::{
|
||||
collections::BinaryHeap,
|
||||
collections::{hash_map::Entry, BinaryHeap},
|
||||
hash::{Hash, Hasher},
|
||||
path::PathBuf,
|
||||
sync::Arc,
|
||||
|
@ -18,7 +21,10 @@ use tokio::{
|
|||
sync::mpsc,
|
||||
};
|
||||
|
||||
use crate::{core::Core, queue::Schedule};
|
||||
use crate::{
|
||||
core::{Core, ReportCore},
|
||||
queue::{InstantFromTimestamp, Schedule},
|
||||
};
|
||||
|
||||
use super::{dmarc::GenerateDmarcReport, tls::GenerateTlsReport, Event};
|
||||
|
||||
|
@ -88,27 +94,113 @@ impl SpawnReport for mpsc::Receiver<Event> {
|
|||
impl Core {
|
||||
pub async fn build_report_path(
|
||||
&self,
|
||||
domain: &str,
|
||||
domain: ReportType<&str, &str>,
|
||||
policy: u64,
|
||||
deliver_at: u64,
|
||||
rtype: &str,
|
||||
interval: u64,
|
||||
) -> PathBuf {
|
||||
let (ext, domain) = match domain {
|
||||
ReportType::Dmarc(domain) => ("t", domain),
|
||||
ReportType::Tls(domain) => ("d", domain),
|
||||
};
|
||||
|
||||
// Build base path
|
||||
let mut path = self.report.config.path.eval(&domain).await.clone();
|
||||
path.push((policy % *self.report.config.hash.eval(&domain).await).to_string());
|
||||
let _ = fs::create_dir(&path).await;
|
||||
|
||||
// Build filename
|
||||
use std::fmt::Write;
|
||||
let mut w = Base32Writer::with_capacity(domain.len() + 16);
|
||||
w.write(&policy.to_le_bytes()[..]);
|
||||
w.write(&(deliver_at.saturating_sub(946684800) as u32).to_le_bytes()[..]);
|
||||
w.write(&(interval as u32).to_le_bytes()[..]);
|
||||
w.write(domain.as_bytes());
|
||||
let mut file = w.finalize();
|
||||
let _ = write!(file, "_{}_{}_{}.rpt", rtype, policy, deliver_at);
|
||||
file.push('.');
|
||||
file.push_str(ext);
|
||||
path.push(file);
|
||||
path
|
||||
}
|
||||
}
|
||||
|
||||
impl ReportCore {
|
||||
pub async fn read_reports(&self) -> Scheduler {
|
||||
let mut scheduler = Scheduler::default();
|
||||
|
||||
for path in self
|
||||
.config
|
||||
.path
|
||||
.if_then
|
||||
.iter()
|
||||
.map(|t| &t.then)
|
||||
.chain([&self.config.path.default])
|
||||
{
|
||||
let mut dir = match tokio::fs::read_dir(path).await {
|
||||
Ok(dir) => dir,
|
||||
Err(_) => continue,
|
||||
};
|
||||
loop {
|
||||
match dir.next_entry().await {
|
||||
Ok(Some(file)) => {
|
||||
let file = file.path();
|
||||
if file.is_dir() {
|
||||
match tokio::fs::read_dir(path).await {
|
||||
Ok(mut dir) => {
|
||||
let file_ = file;
|
||||
loop {
|
||||
match dir.next_entry().await {
|
||||
Ok(Some(file)) => {
|
||||
let file = file.path();
|
||||
if file
|
||||
.extension()
|
||||
.map_or(false, |e| e == "t" || e == "d")
|
||||
{
|
||||
scheduler.add_path(file);
|
||||
}
|
||||
}
|
||||
Ok(None) => break,
|
||||
Err(err) => {
|
||||
tracing::warn!(
|
||||
"Failed to read report directory {}: {}",
|
||||
file_.display(),
|
||||
err
|
||||
);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
tracing::warn!(
|
||||
"Failed to read report directory {}: {}",
|
||||
file.display(),
|
||||
err
|
||||
)
|
||||
}
|
||||
};
|
||||
} else if file.extension().map_or(false, |e| e == "t" || e == "d") {
|
||||
scheduler.add_path(file);
|
||||
}
|
||||
}
|
||||
Ok(None) => {
|
||||
break;
|
||||
}
|
||||
Err(err) => {
|
||||
tracing::warn!(
|
||||
"Failed to read report directory {}: {}",
|
||||
path.display(),
|
||||
err
|
||||
);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
scheduler
|
||||
}
|
||||
}
|
||||
|
||||
impl Scheduler {
|
||||
pub fn next_due(&mut self) -> Option<(ReportKey, ReportValue)> {
|
||||
let item = self.main.peek()?;
|
||||
|
@ -132,6 +224,122 @@ impl Scheduler {
|
|||
})
|
||||
.unwrap_or(self.long_wait)
|
||||
}
|
||||
|
||||
pub async fn add_path(&mut self, path: PathBuf) -> Result<(), String> {
|
||||
let (file, ext) = path
|
||||
.file_name()
|
||||
.and_then(|f| f.to_str())
|
||||
.and_then(|f| f.rsplit_once('.'))
|
||||
.ok_or_else(|| format!("Invalid queue file name {}", path.display()))?;
|
||||
let file_size = fs::metadata(&path)
|
||||
.await
|
||||
.map_err(|err| {
|
||||
format!(
|
||||
"Failed to obtain file metadata for {}: {}",
|
||||
path.display(),
|
||||
err
|
||||
)
|
||||
})?
|
||||
.len();
|
||||
if file_size == 0 {
|
||||
let _ = fs::remove_file(&path).await;
|
||||
return Err(format!(
|
||||
"Removed zero length report file {}",
|
||||
path.display()
|
||||
));
|
||||
}
|
||||
|
||||
// Decode domain name
|
||||
let mut policy = [0u8; std::mem::size_of::<u64>()];
|
||||
let mut deliver_at = [0u8; std::mem::size_of::<u32>()];
|
||||
let mut interval = [0u8; std::mem::size_of::<u32>()];
|
||||
let mut domain = Vec::new();
|
||||
for (pos, byte) in Base32Reader::new(file.as_bytes()).enumerate() {
|
||||
match pos {
|
||||
0..=7 => {
|
||||
policy[pos] = byte;
|
||||
}
|
||||
8..=11 => {
|
||||
deliver_at[pos - 8] = byte;
|
||||
}
|
||||
12..=15 => {
|
||||
interval[pos - 12] = byte;
|
||||
}
|
||||
_ => {
|
||||
domain.push(byte);
|
||||
}
|
||||
}
|
||||
}
|
||||
if domain.is_empty() {
|
||||
return Err(format!(
|
||||
"Failed to base32 decode report file {}",
|
||||
path.display()
|
||||
));
|
||||
}
|
||||
let domain = String::from_utf8(domain).map_err(|err| {
|
||||
format!(
|
||||
"Failed to base32 decode report file {}: {}",
|
||||
path.display(),
|
||||
err
|
||||
)
|
||||
})?;
|
||||
|
||||
// Rebuild parts
|
||||
let policy = u64::from_le_bytes(policy);
|
||||
let deliver_at = u32::from_le_bytes(deliver_at) as u64 + 946684800;
|
||||
let created = deliver_at - (u32::from_le_bytes(interval) as u64);
|
||||
|
||||
match ext {
|
||||
"d" => {
|
||||
let key = ReportType::Dmarc(ReportPolicy {
|
||||
inner: domain,
|
||||
policy,
|
||||
});
|
||||
self.reports.insert(
|
||||
key.clone(),
|
||||
ReportType::Dmarc(ReportPath {
|
||||
path,
|
||||
size: file_size as usize,
|
||||
created,
|
||||
deliver_at,
|
||||
}),
|
||||
);
|
||||
self.main.push(Schedule {
|
||||
due: deliver_at.to_instant(),
|
||||
inner: key,
|
||||
});
|
||||
}
|
||||
"t" => match self.reports.entry(ReportType::Tls(domain)) {
|
||||
Entry::Occupied(mut e) => {
|
||||
if let ReportType::Tls(tls) = e.get_mut() {
|
||||
tls.size += file_size as usize;
|
||||
tls.path.push(ReportPolicy {
|
||||
inner: path,
|
||||
policy,
|
||||
});
|
||||
}
|
||||
}
|
||||
Entry::Vacant(e) => {
|
||||
self.main.push(Schedule {
|
||||
due: deliver_at.to_instant(),
|
||||
inner: e.key().clone(),
|
||||
});
|
||||
e.insert(ReportType::Tls(ReportPath {
|
||||
path: vec![ReportPolicy {
|
||||
inner: path,
|
||||
policy,
|
||||
}],
|
||||
size: file_size as usize,
|
||||
created,
|
||||
deliver_at,
|
||||
}));
|
||||
}
|
||||
},
|
||||
_ => unreachable!(),
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn json_write(path: &PathBuf, entry: &impl Serialize) -> usize {
|
||||
|
@ -221,6 +429,17 @@ pub async fn json_read<T: DeserializeOwned>(path: &PathBuf) -> Option<T> {
|
|||
}
|
||||
}
|
||||
|
||||
impl Default for Scheduler {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
short_wait: Duration::from_millis(1),
|
||||
long_wait: Duration::from_secs(86400 * 365),
|
||||
main: BinaryHeap::with_capacity(128),
|
||||
reports: AHashMap::with_capacity(128),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ReportKey {
|
||||
pub fn domain_name(&self) -> &str {
|
||||
match self {
|
||||
|
|
|
@ -40,8 +40,7 @@ impl<T: AsyncWrite + AsyncRead + Unpin> Session<T> {
|
|||
)
|
||||
.with_spf_dns(format!("txt : {} : v=SPF1", output.domain())) // TODO use DNS record
|
||||
.write_rfc5322(
|
||||
config.name.eval(self).await,
|
||||
from_addr,
|
||||
(config.name.eval(self).await.as_str(), from_addr.as_str()),
|
||||
rcpt,
|
||||
config.subject.eval(self).await,
|
||||
&mut report,
|
||||
|
|
|
@ -5,21 +5,32 @@ use std::{
|
|||
time::{Duration, Instant, SystemTime},
|
||||
};
|
||||
|
||||
use ahash::AHashMap;
|
||||
use mail_auth::{
|
||||
flate2::{write::GzEncoder, Compression},
|
||||
mta_sts::{ReportUri, TlsRpt},
|
||||
report::tlsrpt::{FailureDetails, PolicyDetails, PolicyType},
|
||||
report::tlsrpt::{
|
||||
DateRange, FailureDetails, Policy, PolicyDetails, PolicyType, Summary, TlsReport,
|
||||
},
|
||||
};
|
||||
|
||||
use mail_parser::DateTime;
|
||||
use reqwest::header::CONTENT_TYPE;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::fmt::Write;
|
||||
use tokio::fs;
|
||||
|
||||
use crate::{
|
||||
core::Core,
|
||||
outbound::mta_sts::{Mode, MxPattern},
|
||||
queue::Schedule,
|
||||
USER_AGENT,
|
||||
};
|
||||
|
||||
use super::{
|
||||
scheduler::{json_append, json_write, ReportPath, ReportPolicy, ReportType, Scheduler, ToHash},
|
||||
scheduler::{
|
||||
json_append, json_read, json_write, ReportPath, ReportPolicy, ReportType, Scheduler, ToHash,
|
||||
},
|
||||
TlsEvent,
|
||||
};
|
||||
|
||||
|
@ -43,8 +54,150 @@ pub trait GenerateTlsReport {
|
|||
impl GenerateTlsReport for Arc<Core> {
|
||||
fn generate_tls_report(&self, domain: String, path: ReportPath<Vec<ReportPolicy<PathBuf>>>) {
|
||||
let core = self.clone();
|
||||
tokio::spawn(async {
|
||||
//TODO
|
||||
tokio::spawn(async move {
|
||||
// Deserialize report
|
||||
let config = &core.report.config.tls;
|
||||
let mut report = TlsReport {
|
||||
organization_name: config.org_name.eval(&domain.as_str()).await.clone(),
|
||||
date_range: DateRange {
|
||||
start_datetime: DateTime::from_timestamp(path.created as i64),
|
||||
end_datetime: DateTime::from_timestamp(path.deliver_at as i64),
|
||||
},
|
||||
contact_info: config.contact_info.eval(&domain.as_str()).await.clone(),
|
||||
report_id: format!(
|
||||
"{}_{}",
|
||||
path.created,
|
||||
path.path.first().map_or(0, |p| p.policy)
|
||||
),
|
||||
policies: Vec::with_capacity(path.path.len()),
|
||||
};
|
||||
let mut rua = Vec::new();
|
||||
for path in &path.path {
|
||||
if let Some(tls) = json_read::<TlsFormat>(&path.inner).await {
|
||||
// Group duplicates
|
||||
let mut total_success = 0;
|
||||
let mut total_failure = 0;
|
||||
let mut record_map = AHashMap::with_capacity(tls.records.len());
|
||||
for record in tls.records {
|
||||
if let Some(record) = record {
|
||||
match record_map.entry(record) {
|
||||
Entry::Occupied(mut e) => {
|
||||
*e.get_mut() += 1;
|
||||
}
|
||||
Entry::Vacant(e) => {
|
||||
e.insert(1u32);
|
||||
}
|
||||
}
|
||||
total_failure += 1;
|
||||
} else {
|
||||
total_success += 1;
|
||||
}
|
||||
}
|
||||
report.policies.push(Policy {
|
||||
policy: tls.policy,
|
||||
summary: Summary {
|
||||
total_success,
|
||||
total_failure,
|
||||
},
|
||||
failure_details: record_map
|
||||
.into_iter()
|
||||
.map(|(mut r, count)| {
|
||||
r.failed_session_count = count;
|
||||
r
|
||||
})
|
||||
.collect(),
|
||||
});
|
||||
|
||||
rua = tls.rua;
|
||||
}
|
||||
}
|
||||
|
||||
if report.policies.is_empty() {
|
||||
return;
|
||||
}
|
||||
|
||||
// Compress and serialize report
|
||||
let json = report.to_json();
|
||||
let mut e = GzEncoder::new(Vec::with_capacity(json.len()), Compression::default());
|
||||
let json =
|
||||
match std::io::Write::write_all(&mut e, json.as_bytes()).and_then(|_| e.finish()) {
|
||||
Ok(report) => report,
|
||||
Err(err) => {
|
||||
tracing::error!(
|
||||
module = "report",
|
||||
event = "error",
|
||||
"Failed to compress report: {}",
|
||||
err
|
||||
);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
// Try delivering report over HTTP
|
||||
let mut rcpts = Vec::with_capacity(rua.len());
|
||||
for uri in &rua {
|
||||
match uri {
|
||||
ReportUri::Http(uri) => {
|
||||
if let Ok(client) = reqwest::Client::builder()
|
||||
.user_agent(USER_AGENT)
|
||||
.timeout(Duration::from_secs(2 * 60))
|
||||
.build()
|
||||
{
|
||||
match client
|
||||
.post(uri)
|
||||
.header(CONTENT_TYPE, "application/tlsrpt+gzip")
|
||||
.body(json.to_vec())
|
||||
.send()
|
||||
.await
|
||||
{
|
||||
Ok(response) => {
|
||||
if response.status().is_success() {
|
||||
let log = "fd";
|
||||
path.cleanup().await;
|
||||
return;
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
tracing::debug!(
|
||||
module = "report",
|
||||
event = "error",
|
||||
"Failed to submit TLS report to {}: {}",
|
||||
uri,
|
||||
err
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
ReportUri::Mail(mailto) => {
|
||||
rcpts.push(mailto.as_str());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Deliver report over SMTP
|
||||
if !rcpts.is_empty() {
|
||||
let from_addr = config.address.eval(&domain.as_str()).await;
|
||||
let mut message = Vec::with_capacity(path.size);
|
||||
let _ = report.write_rfc5322_from_bytes(
|
||||
&domain,
|
||||
core.report.config.submitter.eval(&domain.as_str()).await,
|
||||
(
|
||||
config.name.eval(&domain.as_str()).await.as_str(),
|
||||
from_addr.as_str(),
|
||||
),
|
||||
rcpts.iter().copied(),
|
||||
&json,
|
||||
&mut message,
|
||||
);
|
||||
|
||||
// Send report
|
||||
core.send_report(from_addr, rcpts.iter(), message, &config.sign)
|
||||
.await;
|
||||
} else {
|
||||
let log = "fd";
|
||||
}
|
||||
path.cleanup().await;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
@ -71,7 +224,12 @@ impl Scheduler {
|
|||
let path = e.into_mut().tls_path();
|
||||
path.path.push(ReportPolicy {
|
||||
inner: core
|
||||
.build_report_path(&domain, policy_hash, path.deliver_at, "tls")
|
||||
.build_report_path(
|
||||
ReportType::Tls(&domain),
|
||||
policy_hash,
|
||||
path.deliver_at,
|
||||
path.deliver_at - path.created,
|
||||
)
|
||||
.await,
|
||||
policy: policy_hash,
|
||||
});
|
||||
|
@ -92,7 +250,12 @@ impl Scheduler {
|
|||
.map_or(0, |d| d.as_secs());
|
||||
let deliver_at = created + event.interval.as_secs();
|
||||
let path = core
|
||||
.build_report_path(&domain, policy_hash, deliver_at, "tls")
|
||||
.build_report_path(
|
||||
ReportType::Tls(&domain),
|
||||
policy_hash,
|
||||
deliver_at,
|
||||
event.interval.as_secs(),
|
||||
)
|
||||
.await;
|
||||
let v = e.insert(ReportType::Tls(ReportPath {
|
||||
path: vec![ReportPolicy {
|
||||
|
@ -194,3 +357,19 @@ impl Scheduler {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ReportPath<Vec<ReportPolicy<PathBuf>>> {
|
||||
async fn cleanup(&self) {
|
||||
for path in &self.path {
|
||||
if let Err(err) = fs::remove_file(&path.inner).await {
|
||||
tracing::error!(
|
||||
module = "report",
|
||||
event = "error",
|
||||
"Failed to delete file {}: {}",
|
||||
path.inner.display(),
|
||||
err
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue