Outbound delivery first part.

This commit is contained in:
Mauro D 2022-12-30 18:29:48 +00:00
parent 61777a85ce
commit 9ec6fe1bb2
20 changed files with 658 additions and 408 deletions

4
Cargo.lock generated
View file

@ -967,9 +967,9 @@ dependencies = [
[[package]]
name = "once_cell"
version = "1.16.0"
version = "1.17.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "86f0b0d4bf799edbc74508c1e8bf170ff5f41238e5f8225603ca7caaae2b7860"
checksum = "6f61fba1741ea2b3d6a1e3178721804bb716a68a6aeba1149b5d52e3d464ea66"
[[package]]
name = "oorandom"

View file

@ -41,10 +41,6 @@ log-level = "trace"
concurrency = 1024
throttle-map = {shard = 32, capacity = 10}
[global.spool]
path = "/var/spool/queue"
hash = 123
[session]
timeout = "5m"
transfer-limit = 5000000
@ -188,21 +184,24 @@ implicit = true
allow-invalid-certs = true
[queue]
path = "/var/spool/queue"
hash = 123
[queue.schedule]
retry = ["0m", "2m", "5m", "10m", "15m", "30m", "1h", "2h"]
notify = ["1d", "3d"]
expire = "5d"
[queue.outbound]
source-ips = ["192.168.0.2", "162.168.0.1"]
#tls = optional, require, dane, dane-fallback-require, dane-require
next-hop = "lmtp"
[queue.limits]
attempts = 100
lifetime = "5d"
[queue.timeouts]
[queue.outbound.timeout]
connect = "10ms"
ehlo = "1m"
[[queue.capacity]]
[[queue.quota]]
match = {if = "remote-ip", eq = "127.0.0.1"}
key = [""]
messages = 10000

View file

@ -242,6 +242,12 @@ impl IfBlock<Vec<String>> {
}
}
impl<T> IfBlock<Vec<T>> {
pub fn has_empty_list(&self) -> bool {
self.default.is_empty() || self.if_then.iter().any(|v| v.then.is_empty())
}
}
#[cfg(test)]
mod tests {
use std::{fs, path::PathBuf, time::Duration};

View file

@ -324,34 +324,37 @@ pub struct RelayHost {
}
pub struct QueueConfig {
pub path: PathBuf,
pub hash: u64,
pub path: IfBlock<PathBuf>,
pub hash: IfBlock<u64>,
// Schedule
pub retry: IfBlock<Vec<Duration>>,
pub notify: IfBlock<Vec<Duration>>,
pub expire: IfBlock<Duration>,
// Outbound
pub source_ips: IfBlock<Vec<IpAddr>>,
pub next_hop: IfBlock<Option<RelayHost>>,
pub tls: IfBlock<bool>,
// Limits, Throttle and Capacity
pub attempts_max: IfBlock<usize>,
pub lifetime_max: IfBlock<Duration>,
// Throttle and Quotas
pub throttle: QueueThrottle,
pub capacity: QueueCapacities,
pub quota: QueueQuotas,
}
pub struct QueueThrottle {
pub sender: Vec<Throttle>,
pub recipient: Vec<Throttle>,
pub rcpt: Vec<Throttle>,
pub host: Vec<Throttle>,
}
pub struct QueueCapacities {
pub sender: Vec<QueueCapacity>,
pub rcpt: Vec<QueueCapacity>,
pub rcpt_domain: Vec<QueueCapacity>,
pub struct QueueQuotas {
pub sender: Vec<QueueQuota>,
pub rcpt: Vec<QueueQuota>,
pub rcpt_domain: Vec<QueueQuota>,
}
pub struct QueueCapacity {
pub struct QueueQuota {
pub conditions: Conditions,
pub keys: u16,
pub size: Option<usize>,

View file

@ -10,74 +10,90 @@ use super::{
impl Config {
pub fn parse_queue(&self, ctx: &ConfigContext) -> super::Result<QueueConfig> {
let available_envelope_keys = [
let rcpt_envelope_keys = [
EnvelopeKey::RecipientDomain,
EnvelopeKey::Sender,
EnvelopeKey::SenderDomain,
EnvelopeKey::Priority,
];
let available_throttle_keys = THROTTLE_RCPT_DOMAIN
| THROTTLE_SENDER
| THROTTLE_SENDER_DOMAIN
| THROTTLE_MX
| THROTTLE_REMOTE_IP
| THROTTLE_LOCAL_IP;
let sender_envelope_keys = [
EnvelopeKey::Sender,
EnvelopeKey::SenderDomain,
EnvelopeKey::Priority,
];
let host_envelope_keys = [
EnvelopeKey::RecipientDomain,
EnvelopeKey::Sender,
EnvelopeKey::SenderDomain,
EnvelopeKey::Priority,
EnvelopeKey::LocalIp,
EnvelopeKey::RemoteIp,
EnvelopeKey::Mx,
];
let next_hop = self
.parse_if_block::<Option<String>>("queue.next-hop", ctx, &available_envelope_keys)?
.parse_if_block::<Option<String>>("queue.outbound.next-hop", ctx, &rcpt_envelope_keys)?
.unwrap_or_else(|| IfBlock::new(None));
let path = self.property_require::<PathBuf>("global.spool.path")?;
if !path.exists() {
fs::create_dir(&path)
.map_err(|err| format!("Failed to create spool directory {:?}: {}", path, err))?;
}
// Parse throttle
let mut throttle = QueueThrottle {
sender: Vec::new(),
recipient: Vec::new(),
rcpt: Vec::new(),
host: Vec::new(),
};
let all_throttles = self.parse_throttle(
"queue.throttle",
ctx,
&available_envelope_keys,
available_throttle_keys,
&rcpt_envelope_keys,
THROTTLE_RCPT_DOMAIN
| THROTTLE_SENDER
| THROTTLE_SENDER_DOMAIN
| THROTTLE_MX
| THROTTLE_REMOTE_IP
| THROTTLE_LOCAL_IP,
)?;
for t in all_throttles {
if (t.keys
& (THROTTLE_RCPT_DOMAIN | THROTTLE_MX | THROTTLE_REMOTE_IP | THROTTLE_LOCAL_IP))
!= 0
if (t.keys & (THROTTLE_MX | THROTTLE_REMOTE_IP | THROTTLE_LOCAL_IP)) != 0
|| t.conditions.conditions.iter().any(|c| {
matches!(
c,
Condition::Match {
key: EnvelopeKey::RecipientDomain
| EnvelopeKey::Mx
| EnvelopeKey::RemoteIp
| EnvelopeKey::LocalIp,
key: EnvelopeKey::Mx | EnvelopeKey::RemoteIp | EnvelopeKey::LocalIp,
..
}
)
})
{
throttle.recipient.push(t);
throttle.host.push(t);
} else if (t.keys & (THROTTLE_RCPT_DOMAIN)) != 0
|| t.conditions.conditions.iter().any(|c| {
matches!(
c,
Condition::Match {
key: EnvelopeKey::RecipientDomain,
..
}
)
})
{
throttle.rcpt.push(t);
} else {
throttle.sender.push(t);
}
}
Ok(QueueConfig {
path,
let config = QueueConfig {
path: self
.parse_if_block("queue.path", ctx, &sender_envelope_keys)?
.ok_or("Missing \"queue.path\" property.")?,
hash: self
.property::<u64>("global.spool.hash")?
.unwrap_or(32)
.next_power_of_two(),
.parse_if_block("queue.hash", ctx, &sender_envelope_keys)?
.unwrap_or_else(|| IfBlock::new(32)),
retry: self
.parse_if_block("queue.retry", ctx, &available_envelope_keys)?
.parse_if_block("queue.schedule.retry", ctx, &host_envelope_keys)?
.unwrap_or_else(|| {
IfBlock::new(vec![
Duration::from_secs(0),
Duration::from_secs(60),
Duration::from_secs(2 * 60),
Duration::from_secs(5 * 60),
Duration::from_secs(10 * 60),
@ -88,15 +104,18 @@ impl Config {
])
}),
notify: self
.parse_if_block("queue.notify", ctx, &available_envelope_keys)?
.parse_if_block("queue.schedule.notify", ctx, &rcpt_envelope_keys)?
.unwrap_or_else(|| {
IfBlock::new(vec![
Duration::from_secs(86400),
Duration::from_secs(2 * 86400),
Duration::from_secs(3 * 86400),
])
}),
expire: self
.parse_if_block("queue.schedule.expire", ctx, &rcpt_envelope_keys)?
.unwrap_or_else(|| IfBlock::new(Duration::from_secs(5 * 86400))),
source_ips: self
.parse_if_block("queue.source-ips", ctx, &available_envelope_keys)?
.parse_if_block("queue.outbound.source-ips", ctx, &host_envelope_keys)?
.unwrap_or_else(|| IfBlock::new(Vec::new())),
next_hop: IfBlock {
if_then: {
@ -111,7 +130,7 @@ impl Config {
.get(&then)
.ok_or_else(|| {
format!(
"Relay host {:?} not found for property \"queue.next-hop\".",
"Host {:?} not found for property \"queue.next-hop\".",
then
)
})?
@ -142,31 +161,33 @@ impl Config {
},
},
tls: self
.parse_if_block("queue.tls", ctx, &available_envelope_keys)?
.parse_if_block("queue.outbound.tls", ctx, &host_envelope_keys)?
.unwrap_or_else(|| IfBlock::new(true)),
attempts_max: self
.parse_if_block("queue.limits.attempts", ctx, &available_envelope_keys)?
.unwrap_or_else(|| IfBlock::new(100)),
lifetime_max: self
.parse_if_block("queue.limits.lifetime", ctx, &available_envelope_keys)?
.unwrap_or_else(|| IfBlock::new(Duration::from_secs(5 * 86400))),
throttle,
capacity: self.parse_queue_capacity(ctx)?,
})
quota: self.parse_queue_quota(ctx)?,
};
if config.retry.has_empty_list() {
Err("Property \"queue.schedule.retry\" cannot contain empty lists.".to_string())
} else if config.notify.has_empty_list() {
Err("Property \"queue.schedule.notify\" cannot contain empty lists.".to_string())
} else {
Ok(config)
}
}
fn parse_queue_capacity(&self, ctx: &ConfigContext) -> super::Result<QueueCapacities> {
let mut capacities = QueueCapacities {
fn parse_queue_quota(&self, ctx: &ConfigContext) -> super::Result<QueueQuotas> {
let mut capacities = QueueQuotas {
sender: Vec::new(),
rcpt: Vec::new(),
rcpt_domain: Vec::new(),
};
for array_pos in self.sub_keys("queue.capacity") {
let capacity = self.parse_queue_capacity_item(("queue.capacity", array_pos), ctx)?;
for array_pos in self.sub_keys("queue.quota") {
let quota = self.parse_queue_quota_item(("queue.quota", array_pos), ctx)?;
if (capacity.keys & THROTTLE_RCPT) != 0
|| capacity.conditions.conditions.iter().any(|c| {
if (quota.keys & THROTTLE_RCPT) != 0
|| quota.conditions.conditions.iter().any(|c| {
matches!(
c,
Condition::Match {
@ -176,9 +197,9 @@ impl Config {
)
})
{
capacities.rcpt.push(capacity);
} else if (capacity.keys & THROTTLE_RCPT_DOMAIN) != 0
|| capacity.conditions.conditions.iter().any(|c| {
capacities.rcpt.push(quota);
} else if (quota.keys & THROTTLE_RCPT_DOMAIN) != 0
|| quota.conditions.conditions.iter().any(|c| {
matches!(
c,
Condition::Match {
@ -188,20 +209,20 @@ impl Config {
)
})
{
capacities.rcpt_domain.push(capacity);
capacities.rcpt_domain.push(quota);
} else {
capacities.sender.push(capacity);
capacities.sender.push(quota);
}
}
Ok(capacities)
}
fn parse_queue_capacity_item(
fn parse_queue_quota_item(
&self,
prefix: impl AsKey,
ctx: &ConfigContext,
) -> super::Result<QueueCapacity> {
) -> super::Result<QueueQuota> {
let prefix = prefix.as_key();
let mut keys = 0;
for (key_, value) in self.values((&prefix, "key")) {
@ -219,7 +240,7 @@ impl Config {
}
}
let capacity = QueueCapacity {
let quota = QueueQuota {
conditions: if self.values((&prefix, "match")).next().is_some() {
self.parse_condition(
(&prefix, "match"),
@ -247,16 +268,16 @@ impl Config {
};
// Validate
if capacity.size.is_none() && capacity.messages.is_none() {
if quota.size.is_none() && quota.messages.is_none() {
Err(format!(
concat!(
"Queue capacity {:?} needs to define a ",
"Queue quota {:?} needs to define a ",
"valid 'size' and/or 'messages' property."
),
prefix
))
} else {
Ok(capacity)
Ok(quota)
}
}
}
@ -280,6 +301,13 @@ impl From<&Host> for RelayHost {
impl ParseValue for PathBuf {
fn parse_value(_key: impl utils::AsKey, value: &str) -> super::Result<Self> {
Ok(PathBuf::from(value))
let path = PathBuf::from(value);
if !path.exists() {
fs::create_dir(&path)
.map_err(|err| format!("Failed to create spool directory {:?}: {}", path, err))?;
}
Ok(path)
}
}

View file

@ -7,6 +7,7 @@ use std::{
};
use dashmap::DashMap;
use mail_auth::Resolver;
use smtp_proto::{
request::receiver::{
BdatReceiver, DataReceiver, DummyDataReceiver, DummyLineReceiver, LineReceiver,
@ -23,7 +24,7 @@ use tracing::Span;
use crate::{
config::{EnvelopeKey, List, QueueConfig, Script, ServerProtocol, SessionConfig},
listener::auth::SaslToken,
queue::{self, QueueLimiter},
queue::{self, QuotaLimiter},
};
use self::throttle::{
@ -37,6 +38,7 @@ pub mod throttle;
pub struct Core {
pub session: SessionCore,
pub queue: QueueCore,
pub resolver: Resolver,
}
pub struct SessionCore {
@ -48,7 +50,7 @@ pub struct SessionCore {
pub struct QueueCore {
pub config: QueueConfig,
pub throttle: DashMap<ThrottleKey, Limiter, ThrottleKeyHasherBuilder>,
pub capacity: DashMap<ThrottleKey, Arc<QueueLimiter>, ThrottleKeyHasherBuilder>,
pub quota: DashMap<ThrottleKey, Arc<QuotaLimiter>, ThrottleKeyHasherBuilder>,
pub tx: mpsc::Sender<queue::Event>,
pub id_seq: AtomicU32,
}
@ -86,13 +88,18 @@ pub struct SessionData {
pub local_ip: IpAddr,
pub remote_ip: IpAddr,
pub helo_domain: String,
pub mail_from: Option<SessionAddress>,
pub rcpt_to: Vec<SessionAddress>,
pub rcpt_errors: usize,
pub message: Vec<u8>,
pub authenticated_as: String,
pub auth_errors: usize,
pub priority: i16,
pub delivery_by: u64,
pub future_release: u64,
pub valid_until: Instant,
pub bytes_left: usize,
@ -103,6 +110,7 @@ pub struct SessionAddress {
pub address: String,
pub address_lcase: String,
pub domain: String,
pub flags: u64,
}
#[derive(Debug, Default)]
@ -179,6 +187,8 @@ impl SessionData {
auth_errors: 0,
messages_sent: 0,
bytes_left: 0,
delivery_by: 0,
future_release: 0,
}
}
}

View file

@ -149,7 +149,7 @@ impl BuildHasher for ThrottleKeyHasherBuilder {
}
}
impl QueueCapacity {
impl QueueQuota {
pub fn new_key(&self, e: &impl Envelope) -> ThrottleKey {
let mut hasher = blake3::Hasher::new();

View file

@ -1,10 +1,13 @@
use std::time::SystemTime;
use std::{
path::PathBuf,
time::{Duration, Instant, SystemTime},
};
use tokio::io::{AsyncRead, AsyncWrite};
use crate::{
core::Session,
queue::{self, Message},
queue::{self, Message, SimpleEnvelope},
};
impl<T: AsyncWrite + AsyncRead + Unpin> Session<T> {
@ -13,6 +16,7 @@ impl<T: AsyncWrite + AsyncRead + Unpin> Session<T> {
let mail_from = self.data.mail_from.take().unwrap();
let mut message = Box::new(Message {
id: 0,
path: PathBuf::new(),
created: SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.map(|d| d.as_secs())
@ -22,13 +26,14 @@ impl<T: AsyncWrite + AsyncRead + Unpin> Session<T> {
return_path_domain: mail_from.domain,
recipients: Vec::with_capacity(self.data.rcpt_to.len()),
domains: Vec::with_capacity(3),
notify: queue::Schedule::now(),
flags: 0,
priority: self.data.priority,
size: self.data.message.len(),
queue_refs: Vec::with_capacity(0),
});
let future_release = Duration::from_secs(self.data.future_release);
// Add recipients
self.data.rcpt_to.sort_unstable();
for rcpt in self.data.rcpt_to.drain(..) {
@ -37,11 +42,34 @@ impl<T: AsyncWrite + AsyncRead + Unpin> Session<T> {
.last()
.map_or(true, |d| d.domain != rcpt.domain)
{
let envelope = SimpleEnvelope::new(message.as_ref(), &rcpt.domain);
message.domains.push(queue::Domain {
domain: rcpt.domain,
retry: queue::Schedule::now(),
retry: if self.data.future_release == 0 {
queue::Schedule::now()
} else {
queue::Schedule::later(future_release)
},
notify: queue::Schedule::later(
future_release
+ *self
.core
.queue
.config
.notify
.eval(&envelope)
.await
.first()
.unwrap(),
),
expires: Instant::now()
+ if self.data.delivery_by == 0 {
*self.core.queue.config.expire.eval(&envelope).await
} else {
Duration::from_secs(self.data.delivery_by)
},
status: queue::Status::Scheduled,
queue_refs: Vec::new(),
domain: rcpt.domain,
});
}
message.recipients.push(queue::Recipient {
@ -49,13 +77,12 @@ impl<T: AsyncWrite + AsyncRead + Unpin> Session<T> {
address_lcase: rcpt.address_lcase,
status: queue::Status::Scheduled,
flags: 0,
queue_refs: Vec::new(),
domain_idx: message.domains.len() - 1,
});
}
// Verify queue capacity
if self.core.queue.queue_has_capacity(&mut message).await {
// Verify queue quota
if self.core.queue.has_quota(&mut message).await {
if self
.core
.queue
@ -73,8 +100,8 @@ impl<T: AsyncWrite + AsyncRead + Unpin> Session<T> {
tracing::warn!(
parent: &self.span,
event = "queue",
class = "capacity-exceeded",
"Queue capacity exceeded, rejecting message."
class = "quota-exceeded",
"Queue quota exceeded, rejecting message."
);
self.write(b"452 4.3.1 Mail system full, try again later.\r\n")
.await?;

View file

@ -25,12 +25,14 @@ impl<T: AsyncWrite + AsyncRead + Unpin> Session<T> {
.unwrap_or_default()
.to_string(),
address_lcase,
flags: from.flags,
}
} else {
SessionAddress {
address: String::new(),
address_lcase: String::new(),
domain: String::new(),
flags: from.flags,
}
}
.into();

View file

@ -21,6 +21,7 @@ impl<T: AsyncWrite + AsyncRead + Unpin> Session<T> {
.to_string(),
address_lcase,
address: to.address,
flags: to.flags,
};
// Verify address

View file

@ -280,6 +280,8 @@ impl<T: AsyncWrite + AsyncRead + Unpin> Session<T> {
self.data.rcpt_to.clear();
self.data.message = Vec::with_capacity(0);
self.data.priority = 0;
self.data.delivery_by = 0;
self.data.future_release = 0;
}
#[inline(always)]

View file

@ -35,6 +35,7 @@ async fn main() -> std::io::Result<()> {
// Build core
let (queue_tx, queue_rx) = mpsc::channel(1024);
let core = Arc::new(Core {
resolver: config.build_resolver().failed("Failed to build resolver"),
session: SessionCore {
config: session_config,
concurrency: ConcurrencyLimiter::new(
@ -69,7 +70,7 @@ async fn main() -> std::io::Result<()> {
.unwrap_or(32),
),
id_seq: 0.into(),
capacity: DashMap::with_capacity_and_hasher_and_shard_amount(
quota: DashMap::with_capacity_and_hasher_and_shard_amount(
config
.property("global.throttle-map.capacity")
.failed("Failed to parse throttle map capacity")

View file

@ -1,24 +1,40 @@
use std::{sync::Arc, time::Instant};
use std::{
sync::Arc,
time::{Duration, Instant},
};
use crate::core::Core;
use crate::core::{throttle::ConcurrencyLimiter, Core};
use super::{manager::Queue, throttle, DeliveryAttempt, Message, OnHold, Schedule, Status};
use super::{
manager::Queue, throttle, DeliveryAttempt, Domain, Error, Event, Message, OnHold, Schedule,
SimpleEnvelope, Status, WorkerResult,
};
impl DeliveryAttempt {
pub async fn try_deliver(mut self, core: Arc<Core>, queue: &mut Queue) {
// Throttle sender
if !core.queue.config.throttle.sender.is_empty() {
if let Err(err) = core.queue.throttle_sender(&mut self).await {
for throttle in &core.queue.config.throttle.sender {
if let Err(err) = core
.queue
.is_allowed(
throttle,
self.message.as_ref(),
&mut self.in_flight,
&self.span,
)
.await
{
match err {
throttle::Error::Concurrency { limiter } => {
queue.on_hold.push(OnHold {
next_due: self.message.next_event_after(Instant::now()),
max_concurrent: limiter.max_concurrent,
concurrent: limiter.concurrent,
message: self.message,
});
}
throttle::Error::Rate { retry_at } => {
queue.rate_limit.push(Schedule {
queue.main.push(Schedule {
due: retry_at,
inner: self.message,
});
@ -29,24 +45,112 @@ impl DeliveryAttempt {
}
tokio::spawn(async move {
let mut done = 0;
for domain in &self.message.domains {
match &domain.status {
Status::Scheduled | Status::TemporaryFailure(_)
if domain.retry.due <= Instant::now() => {}
Status::Delivered | Status::PermanentFailure(_) => {
done += 1;
let queue_config = &core.queue.config;
let mut on_hold: Option<ConcurrencyLimiter> = None;
let mut domains = std::mem::take(&mut self.message.domains);
'outer: for domain in &mut domains {
// Only process domains due for delivery
if !matches!(&domain.status, Status::Scheduled | Status::TemporaryFailure(_)
if domain.retry.due <= Instant::now())
{
continue;
}
// Throttle recipient domain
let mut in_flight = Vec::new();
let envelope = SimpleEnvelope::new(self.message.as_ref(), &domain.domain);
for throttle in &queue_config.throttle.rcpt {
if let Err(err) = core
.queue
.is_allowed(throttle, &envelope, &mut in_flight, &self.span)
.await
{
match err {
throttle::Error::Concurrency { limiter } => {
on_hold = limiter.into();
}
throttle::Error::Rate { retry_at } => {
domain.retry.due = retry_at;
}
}
continue;
}
_ => continue,
}
// Obtain next hop
if let Some(next_hop) = queue_config.next_hop.eval(&envelope).await {
} else {
let mx_list = match core.resolver.mx_lookup(&domain.domain).await {
Ok(mx) => mx,
Err(err) => {
domain.set_dns_error(err, queue_config.retry.eval(&envelope).await);
continue;
}
};
for mx in mx_list.iter() {
let ips = match core.resolver.ip_lookup(&mx.exchange).await {
Ok(ips) => ips,
Err(err) => {
domain.set_dns_error(err, queue_config.retry.eval(&envelope).await);
continue 'outer;
}
};
}
}
}
self.message.domains = domains;
if done == self.message.domains.len() {}
// Notify queue manager
let span = self.span;
let result = if let Some(on_hold) = on_hold {
WorkerResult::OnHold(OnHold {
next_due: self.message.next_event_after(Instant::now()),
max_concurrent: on_hold.max_concurrent,
concurrent: on_hold.concurrent,
message: self.message,
})
} else if let Some(due) = self.message.next_event() {
WorkerResult::Retry(Schedule {
due,
inner: self.message,
})
} else {
WorkerResult::Delivered
};
if core.queue.tx.send(Event::Done(result)).await.is_err() {
tracing::warn!(
parent: &span,
"Channel closed while trying to notify queue manager."
);
}
});
}
}
impl Domain {
pub fn set_dns_error(&mut self, err: mail_auth::Error, schedule: &[Duration]) {
match &err {
mail_auth::Error::DNSRecordNotFound(code) => {
self.status = Status::PermanentFailure(Error::DNSError(format!(
"Domain not found: {}",
code
)));
}
_ => {
self.status = Status::TemporaryFailure(Error::DNSError(err.to_string()));
self.retry(schedule);
}
}
}
pub fn retry(&mut self, schedule: &[Duration]) {
self.retry.due =
Instant::now() + schedule[std::cmp::min(self.retry.inner as usize, schedule.len() - 1)];
self.retry.inner += 1;
}
}
impl From<Box<Message>> for DeliveryAttempt {
fn from(message: Box<Message>) -> Self {
DeliveryAttempt {

3
src/queue/dsn.rs Normal file
View file

@ -0,0 +1,3 @@
use crate::core::QueueCore;
impl QueueCore {}

View file

@ -1,151 +0,0 @@
use std::sync::{atomic::Ordering, Arc};
use dashmap::mapref::entry::Entry;
use crate::{config::QueueCapacity, core::QueueCore};
use super::{Message, QueueLimiter, QueueLimiterRef, SimpleEnvelope};
impl QueueCore {
pub async fn queue_has_capacity(&self, message: &mut Message) -> bool {
if !self.config.capacity.sender.is_empty() {
let envelope = SimpleEnvelope::new(
&message.return_path_lcase,
&message.return_path_domain,
"",
"",
message.priority,
);
for capacity in &self.config.capacity.sender {
if !self
.reserve_capacity(capacity, &envelope, message.size, &mut message.queue_refs)
.await
{
return false;
}
}
}
for capacity in &self.config.capacity.rcpt_domain {
for domain in &mut message.domains {
if !self
.reserve_capacity(
capacity,
&SimpleEnvelope::new(
&message.return_path_lcase,
&message.return_path_domain,
"",
&domain.domain,
message.priority,
),
message.size,
&mut domain.queue_refs,
)
.await
{
return false;
}
}
}
for capacity in &self.config.capacity.rcpt {
for rcpt in &mut message.recipients {
if !self
.reserve_capacity(
capacity,
&SimpleEnvelope::new(
&message.return_path_lcase,
&message.return_path_domain,
&rcpt.address_lcase,
&message.domains[rcpt.domain_idx].domain,
message.priority,
),
message.size,
&mut rcpt.queue_refs,
)
.await
{
return false;
}
}
}
true
}
async fn reserve_capacity(
&self,
capacity: &QueueCapacity,
envelope: &SimpleEnvelope<'_>,
size: usize,
refs: &mut Vec<QueueLimiterRef>,
) -> bool {
if capacity.conditions.conditions.is_empty() || capacity.conditions.eval(envelope).await {
match self.capacity.entry(capacity.new_key(envelope)) {
Entry::Occupied(e) => {
if let Some(qref) = e.get().is_allowed(size) {
refs.push(qref);
} else {
return false;
}
}
Entry::Vacant(e) => {
let limiter = Arc::new(QueueLimiter {
max_size: capacity.size.unwrap_or(0),
max_messages: capacity.messages.unwrap_or(0),
size: 0.into(),
messages: 0.into(),
});
if let Some(qref) = limiter.is_allowed(size) {
refs.push(qref);
e.insert(limiter);
} else {
return false;
}
}
}
}
true
}
}
trait QueueLimiterAllowed {
fn is_allowed(&self, size: usize) -> Option<QueueLimiterRef>;
}
impl QueueLimiterAllowed for Arc<QueueLimiter> {
fn is_allowed(&self, size: usize) -> Option<QueueLimiterRef> {
if self.max_messages > 0 {
if self.messages.load(Ordering::Relaxed) < self.max_messages {
self.messages.fetch_add(1, Ordering::Relaxed);
} else {
return None;
}
}
if self.max_size > 0 {
if self.size.load(Ordering::Relaxed) + size < self.max_size {
self.size.fetch_add(size, Ordering::Relaxed);
} else {
return None;
}
}
Some(QueueLimiterRef {
size,
limiter: self.clone(),
})
}
}
impl Drop for QueueLimiterRef {
fn drop(&mut self) {
if self.limiter.max_messages > 0 {
self.limiter.messages.fetch_sub(1, Ordering::Relaxed);
}
if self.limiter.max_size > 0 {
self.limiter.size.fetch_sub(self.size, Ordering::Relaxed);
}
}
}

View file

@ -8,13 +8,12 @@ use tokio::sync::mpsc;
use crate::core::Core;
use super::{DeliveryAttempt, Event, Message, OnHold, Schedule, WorkerResult};
use super::{DeliveryAttempt, Event, Message, OnHold, Schedule, Status, WorkerResult};
pub struct Queue {
short_wait: Duration,
long_wait: Duration,
pub main: BinaryHeap<Schedule<Box<Message>>>,
pub rate_limit: BinaryHeap<Schedule<Box<Message>>>,
pub on_hold: Vec<OnHold>,
}
@ -25,12 +24,20 @@ impl SpawnQueue for mpsc::Receiver<Event> {
short_wait: Duration::from_millis(1),
long_wait: Duration::from_secs(86400 * 365),
main: BinaryHeap::with_capacity(128),
rate_limit: BinaryHeap::with_capacity(128),
on_hold: Vec::with_capacity(128),
};
loop {
match tokio::time::timeout(queue.wake_up_time(), self.recv()).await {
let result = tokio::time::timeout(queue.wake_up_time(), self.recv()).await;
// Deliver scheduled messages
while let Some(message) = queue.next_due() {
DeliveryAttempt::from(message)
.try_deliver(core.clone(), &mut queue)
.await;
}
match result {
Ok(Some(event)) => match event {
Event::Queue(item) => {
// Deliver any concurrency limited messages
@ -49,18 +56,18 @@ impl SpawnQueue for mpsc::Receiver<Event> {
}
}
Event::Done(result) => {
// Deliver concurrency limited messages
// A worker is done, try delivering concurrency limited messages
while let Some(message) = queue.next_on_hold() {
DeliveryAttempt::from(message)
.try_deliver(core.clone(), &mut queue)
.await;
}
match result {
WorkerResult::Success => (),
WorkerResult::RateLimited(schedule) => {
queue.rate_limit.push(schedule);
WorkerResult::Delivered => (),
WorkerResult::Retry(schedule) => {
queue.main.push(schedule);
}
WorkerResult::ConcurrencyExceeded(on_hold) => {
WorkerResult::OnHold(on_hold) => {
queue.on_hold.push(on_hold);
}
}
@ -70,13 +77,6 @@ impl SpawnQueue for mpsc::Receiver<Event> {
Ok(None) => break,
Err(_) => (),
}
// Deliver scheduled messages
while let Some(message) = queue.next_due() {
DeliveryAttempt::from(message)
.try_deliver(core.clone(), &mut queue)
.await;
}
}
});
@ -86,10 +86,8 @@ impl SpawnQueue for mpsc::Receiver<Event> {
impl Queue {
pub fn next_due(&mut self) -> Option<Box<Message>> {
let now = Instant::now();
if matches!(self.rate_limit.peek(), Some(item) if item.due <= now ) {
self.rate_limit.pop().map(|i| i.inner)
} else if matches!(self.main.peek(), Some(item) if item.due <= now ) {
let item = self.main.peek()?;
if item.due <= Instant::now() {
self.main.pop().map(|i| i.inner)
} else {
None
@ -97,27 +95,87 @@ impl Queue {
}
pub fn next_on_hold(&mut self) -> Option<Box<Message>> {
let now = Instant::now();
self.on_hold
.iter()
.position(|o| o.concurrent.load(Ordering::Relaxed) < o.max_concurrent)
.position(|o| {
o.concurrent.load(Ordering::Relaxed) < o.max_concurrent
|| o.next_due.map_or(false, |due| due <= now)
})
.map(|pos| self.on_hold.remove(pos).message)
}
pub fn wake_up_time(&self) -> Duration {
match (self.main.peek(), self.rate_limit.peek()) {
(Some(main), Some(rate_limit)) => {
if main.due < rate_limit.due {
&main.due
} else {
&rate_limit.due
self.main
.peek()
.map(|item| {
item.due
.checked_duration_since(Instant::now())
.unwrap_or(self.short_wait)
})
.unwrap_or(self.long_wait)
}
}
impl Message {
pub fn next_event(&self) -> Option<Instant> {
let mut next_event = Instant::now();
let mut has_events = false;
for domain in &self.domains {
if matches!(
domain.status,
Status::Scheduled | Status::TemporaryFailure(_)
) {
if !has_events || domain.retry.due < next_event {
next_event = domain.retry.due;
has_events = true;
}
if domain.notify.due < next_event {
next_event = domain.notify.due;
}
if domain.expires < next_event {
next_event = domain.expires;
}
}
(Some(main), None) => &main.due,
(None, Some(rate_limit)) => &rate_limit.due,
(None, None) => return self.long_wait,
}
.checked_duration_since(Instant::now())
.unwrap_or(self.short_wait)
if has_events {
next_event.into()
} else {
None
}
}
pub fn next_event_after(&self, instant: Instant) -> Option<Instant> {
let mut next_event = instant;
let mut has_events = false;
for domain in &self.domains {
if matches!(
domain.status,
Status::Scheduled | Status::TemporaryFailure(_)
) {
if domain.retry.due > instant && (!has_events || domain.retry.due < next_event) {
next_event = domain.retry.due;
has_events = true;
}
if domain.notify.due > instant && (!has_events || domain.notify.due < next_event) {
next_event = domain.notify.due;
has_events = true;
}
if domain.expires > instant && (!has_events || domain.expires < next_event) {
next_event = domain.expires;
has_events = true;
}
}
}
if has_events {
next_event.into()
} else {
None
}
}
}

View file

@ -1,10 +1,11 @@
use std::{
net::IpAddr,
path::PathBuf,
sync::{
atomic::{AtomicU64, AtomicUsize},
Arc,
},
time::Instant,
time::{Duration, Instant},
};
use smtp_proto::Response;
@ -12,8 +13,9 @@ use smtp_proto::Response;
use crate::core::{throttle::InFlight, Envelope};
pub mod delivery;
pub mod limiter;
pub mod dsn;
pub mod manager;
pub mod quota;
pub mod spool;
pub mod throttle;
@ -24,12 +26,13 @@ pub enum Event {
}
pub enum WorkerResult {
Success,
RateLimited(Schedule<Box<Message>>),
ConcurrencyExceeded(OnHold),
Delivered,
Retry(Schedule<Box<Message>>),
OnHold(OnHold),
}
pub struct OnHold {
next_due: Option<Instant>,
max_concurrent: u64,
concurrent: Arc<AtomicU64>,
message: Box<Message>,
@ -43,26 +46,27 @@ pub struct Schedule<T> {
pub struct Message {
pub id: u64,
pub created: u64,
pub path: PathBuf,
pub return_path: String,
pub return_path_lcase: String,
pub return_path_domain: String,
pub recipients: Vec<Recipient>,
pub domains: Vec<Domain>,
pub notify: Schedule<u32>,
pub flags: u64,
pub priority: i16,
pub size: usize,
pub queue_refs: Vec<QueueLimiterRef>,
pub queue_refs: Vec<UsedQuota>,
}
pub struct Domain {
pub domain: String,
pub retry: Schedule<u32>,
pub notify: Schedule<u32>,
pub expires: Instant,
pub status: Status,
pub queue_refs: Vec<QueueLimiterRef>,
}
pub struct Recipient {
pub domain_idx: usize,
@ -70,7 +74,6 @@ pub struct Recipient {
pub address_lcase: String,
pub status: Status,
pub flags: u64,
pub queue_refs: Vec<QueueLimiterRef>,
}
pub enum Status {
@ -81,6 +84,7 @@ pub enum Status {
}
pub enum Error {
DNSError(String),
UnexpectedResponse(Response<String>),
Timeout,
}
@ -91,16 +95,17 @@ pub struct DeliveryAttempt {
pub message: Box<Message>,
}
pub struct QueueLimiter {
pub struct QuotaLimiter {
pub max_size: usize,
pub max_messages: usize,
pub size: AtomicUsize,
pub messages: AtomicUsize,
}
pub struct QueueLimiterRef {
pub struct UsedQuota {
id: u64,
size: usize,
limiter: Arc<QueueLimiter>,
limiter: Arc<QuotaLimiter>,
}
impl<T> Ord for Schedule<T> {
@ -130,30 +135,35 @@ impl<T: Default> Schedule<T> {
inner: T::default(),
}
}
pub fn later(duration: Duration) -> Self {
Schedule {
due: Instant::now() + duration,
inner: T::default(),
}
}
}
pub struct SimpleEnvelope<'x> {
pub sender: &'x str,
pub sender_domain: &'x str,
pub rcpt: &'x str,
pub rcpt_domain: &'x str,
pub priority: i16,
pub message: &'x Message,
pub domain: &'x str,
pub recipient: &'x str,
}
impl<'x> SimpleEnvelope<'x> {
pub fn new(
sender: &'x str,
sender_domain: &'x str,
rcpt: &'x str,
rcpt_domain: &'x str,
priority: i16,
) -> Self {
pub fn new(message: &'x Message, domain: &'x str) -> Self {
Self {
sender,
sender_domain,
rcpt,
rcpt_domain,
priority,
message,
domain,
recipient: "",
}
}
pub fn new_rcpt(message: &'x Message, domain: &'x str, recipient: &'x str) -> Self {
Self {
message,
domain,
recipient,
}
}
}
@ -168,19 +178,19 @@ impl<'x> Envelope for SimpleEnvelope<'x> {
}
fn sender_domain(&self) -> &str {
self.sender_domain
&self.message.return_path_domain
}
fn sender(&self) -> &str {
self.sender
&self.message.return_path_lcase
}
fn rcpt_domain(&self) -> &str {
self.rcpt_domain
self.domain
}
fn rcpt(&self) -> &str {
self.rcpt
self.recipient
}
fn helo_domain(&self) -> &str {
@ -200,18 +210,16 @@ impl<'x> Envelope for SimpleEnvelope<'x> {
}
fn priority(&self) -> i16 {
self.priority
self.message.priority
}
}
pub struct QueueEnvelope<'x> {
pub sender: &'x str,
pub sender_domain: &'x str,
pub rcpt_domain: &'x str,
pub message: &'x Message,
pub domain: &'x str,
pub mx: &'x str,
pub remote_ip: IpAddr,
pub local_ip: IpAddr,
pub priority: i16,
}
impl<'x> Envelope for QueueEnvelope<'x> {
@ -224,15 +232,15 @@ impl<'x> Envelope for QueueEnvelope<'x> {
}
fn sender_domain(&self) -> &str {
self.sender_domain
&self.message.return_path_domain
}
fn sender(&self) -> &str {
self.sender
&self.message.return_path_lcase
}
fn rcpt_domain(&self) -> &str {
self.rcpt_domain
self.domain
}
fn rcpt(&self) -> &str {
@ -255,6 +263,52 @@ impl<'x> Envelope for QueueEnvelope<'x> {
0
}
fn priority(&self) -> i16 {
self.message.priority
}
}
impl Envelope for Message {
fn local_ip(&self) -> &IpAddr {
unreachable!()
}
fn remote_ip(&self) -> &IpAddr {
unreachable!()
}
fn sender_domain(&self) -> &str {
&self.return_path_domain
}
fn sender(&self) -> &str {
&self.return_path_lcase
}
fn rcpt_domain(&self) -> &str {
""
}
fn rcpt(&self) -> &str {
""
}
fn helo_domain(&self) -> &str {
""
}
fn authenticated_as(&self) -> &str {
""
}
fn mx(&self) -> &str {
""
}
fn listener_id(&self) -> u16 {
0
}
fn priority(&self) -> i16 {
self.priority
}

147
src/queue/quota.rs Normal file
View file

@ -0,0 +1,147 @@
use std::sync::{atomic::Ordering, Arc};
use dashmap::mapref::entry::Entry;
use crate::{
config::QueueQuota,
core::{Envelope, QueueCore},
};
use super::{Message, QuotaLimiter, SimpleEnvelope, UsedQuota};
impl QueueCore {
pub async fn has_quota(&self, message: &mut Message) -> bool {
let mut queue_refs = Vec::new();
if !self.config.quota.sender.is_empty() {
for quota in &self.config.quota.sender {
if !self
.reserve_quota(quota, message, message.size, 0, &mut queue_refs)
.await
{
return false;
}
}
}
for quota in &self.config.quota.rcpt_domain {
for (pos, domain) in message.domains.iter().enumerate() {
if !self
.reserve_quota(
quota,
&SimpleEnvelope::new(message, &domain.domain),
message.size,
((pos + 1) << 32) as u64,
&mut queue_refs,
)
.await
{
return false;
}
}
}
for quota in &self.config.quota.rcpt {
for (pos, rcpt) in message.recipients.iter().enumerate() {
if !self
.reserve_quota(
quota,
&SimpleEnvelope::new_rcpt(
message,
&message.domains[rcpt.domain_idx].domain,
&rcpt.address_lcase,
),
message.size,
(pos + 1) as u64,
&mut queue_refs,
)
.await
{
return false;
}
}
}
message.queue_refs = queue_refs;
true
}
async fn reserve_quota(
&self,
quota: &QueueQuota,
envelope: &impl Envelope,
size: usize,
id: u64,
refs: &mut Vec<UsedQuota>,
) -> bool {
if quota.conditions.conditions.is_empty() || quota.conditions.eval(envelope).await {
match self.quota.entry(quota.new_key(envelope)) {
Entry::Occupied(e) => {
if let Some(qref) = e.get().is_allowed(id, size) {
refs.push(qref);
} else {
return false;
}
}
Entry::Vacant(e) => {
let limiter = Arc::new(QuotaLimiter {
max_size: quota.size.unwrap_or(0),
max_messages: quota.messages.unwrap_or(0),
size: 0.into(),
messages: 0.into(),
});
if let Some(qref) = limiter.is_allowed(id, size) {
refs.push(qref);
e.insert(limiter);
} else {
return false;
}
}
}
}
true
}
}
trait QuotaLimiterAllowed {
fn is_allowed(&self, id: u64, size: usize) -> Option<UsedQuota>;
}
impl QuotaLimiterAllowed for Arc<QuotaLimiter> {
fn is_allowed(&self, id: u64, size: usize) -> Option<UsedQuota> {
if self.max_messages > 0 {
if self.messages.load(Ordering::Relaxed) < self.max_messages {
self.messages.fetch_add(1, Ordering::Relaxed);
} else {
return None;
}
}
if self.max_size > 0 {
if self.size.load(Ordering::Relaxed) + size < self.max_size {
self.size.fetch_add(size, Ordering::Relaxed);
} else {
return None;
}
}
Some(UsedQuota {
id,
size,
limiter: self.clone(),
})
}
}
impl Drop for UsedQuota {
fn drop(&mut self) {
if self.limiter.max_messages > 0 {
self.limiter.messages.fetch_sub(1, Ordering::Relaxed);
}
if self.limiter.max_size > 0 {
self.limiter.size.fetch_sub(self.size, Ordering::Relaxed);
}
}
}

View file

@ -1,4 +1,4 @@
use std::{path::PathBuf, sync::atomic::Ordering, time::Instant};
use std::sync::atomic::Ordering;
use tokio::{fs, io::AsyncWriteExt};
@ -9,30 +9,40 @@ use super::{Event, Message, Schedule};
impl QueueCore {
pub async fn queue_message(&self, mut message: Box<Message>, message_bytes: Vec<u8>) -> bool {
// Generate id
message.id = message.created.saturating_sub(946684800) << 32
| self.id_seq.fetch_add(1, Ordering::Relaxed) as u64;
message.id = (message.created.saturating_sub(946684800) & 0xFFFFFFFF)
| (self.id_seq.fetch_add(1, Ordering::Relaxed) as u64) << 32;
// Build path
let mut path = self.build_base_path(message.id);
let _ = fs::create_dir(&path).await;
message.path = self.config.path.eval(message.as_ref()).await.clone();
let hash = *self.config.hash.eval(message.as_ref()).await;
if hash > 0 {
message.path.push((message.id % hash).to_string());
}
let _ = fs::create_dir(&message.path).await;
message
.path
.push(format!("{}_{}.msg", message.id, message.size));
// Save message
path.push(format!("{}_{}.msg", message.id, message.size));
let mut file = match fs::File::create(&path).await {
let mut file = match fs::File::create(&message.path).await {
Ok(file) => file,
Err(err) => {
tracing::error!("Failed to create file {}: {}", path.display(), err);
tracing::error!("Failed to create file {}: {}", message.path.display(), err);
return false;
}
};
for bytes in [&message_bytes] {
if let Err(err) = file.write_all(bytes).await {
tracing::error!("Failed to write to file {}: {}", path.display(), err);
tracing::error!(
"Failed to write to file {}: {}",
message.path.display(),
err
);
return false;
}
}
if let Err(err) = file.flush().await {
tracing::error!("Failed to flush file {}: {}", path.display(), err);
tracing::error!("Failed to flush file {}: {}", message.path.display(), err);
return false;
}
@ -40,7 +50,7 @@ impl QueueCore {
if self
.tx
.send(Event::Queue(Schedule {
due: Instant::now(),
due: message.next_event().unwrap(),
inner: message,
}))
.await
@ -53,10 +63,4 @@ impl QueueCore {
true
}
pub fn build_base_path(&self, id: u64) -> PathBuf {
let mut path = self.config.path.clone();
path.push((id & self.config.hash).to_string());
path
}
}

View file

@ -1,4 +1,4 @@
use std::{net::IpAddr, time::Instant};
use std::time::Instant;
use dashmap::mapref::entry::Entry;
@ -10,61 +10,13 @@ use crate::{
},
};
use super::{DeliveryAttempt, QueueEnvelope, SimpleEnvelope};
pub enum Error {
Concurrency { limiter: ConcurrencyLimiter },
Rate { retry_at: Instant },
}
impl QueueCore {
pub async fn throttle_sender(&self, attempt: &mut DeliveryAttempt) -> Result<(), Error> {
if !self.config.throttle.sender.is_empty() {
let envelope = SimpleEnvelope::new(
&attempt.message.return_path_lcase,
&attempt.message.return_path_domain,
"",
"",
attempt.message.priority,
);
for throttle in &self.config.throttle.sender {
self.is_allowed(throttle, &envelope, &mut attempt.in_flight, &attempt.span)
.await?;
}
}
Ok(())
}
pub async fn throttle_recipient(
&self,
attempt: &DeliveryAttempt,
rcpt_domain: &str,
mx: &str,
local_ip: IpAddr,
remote_ip: IpAddr,
) -> Result<Vec<InFlight>, Error> {
let mut in_flight = Vec::new();
let envelope = QueueEnvelope {
sender: &attempt.message.return_path_lcase,
sender_domain: &attempt.message.return_path_domain,
rcpt_domain,
mx,
remote_ip,
local_ip,
priority: attempt.message.priority,
};
for throttle in &self.config.throttle.recipient {
self.is_allowed(throttle, &envelope, &mut in_flight, &attempt.span)
.await?;
}
Ok(in_flight)
}
async fn is_allowed(
pub async fn is_allowed(
&self,
throttle: &Throttle,
envelope: &impl Envelope,