From 76f83fa280699fc232234666a6fa4fc060568108 Mon Sep 17 00:00:00 2001 From: Mauro D Date: Tue, 27 Dec 2022 18:11:50 +0000 Subject: [PATCH] Queue draft impl. --- Cargo.lock | 8 +- resources/config/config.toml | 58 +++++----- resources/tests/config/rules.toml | 2 +- resources/tests/config/throttle.toml | 2 +- src/config/condition.rs | 4 +- src/config/if_block.rs | 1 - src/config/mod.rs | 49 +++++++-- src/config/queue.rs | 154 +++++++++++++++++++++++++++ src/config/session.rs | 79 ++++++++------ src/config/throttle.rs | 2 - src/core/mod.rs | 22 +++- src/core/params.rs | 32 +++++- src/core/throttle.rs | 32 +++--- src/lib.rs | 1 + src/listener/ehlo.rs | 126 +++++++++++----------- src/listener/mail.rs | 16 +-- src/listener/rcpt.rs | 13 +-- src/listener/session.rs | 46 ++++---- src/listener/spawn.rs | 13 ++- src/listener/vrfy.rs | 44 +++----- src/main.rs | 20 +++- src/queue/manager.rs | 34 ++++++ src/queue/message.rs | 21 ++++ src/queue/mod.rs | 48 +++++++++ src/remote/smtp.rs | 2 +- 25 files changed, 570 insertions(+), 259 deletions(-) create mode 100644 src/config/queue.rs create mode 100644 src/queue/manager.rs create mode 100644 src/queue/message.rs create mode 100644 src/queue/mod.rs diff --git a/Cargo.lock b/Cargo.lock index e9c5b18..9c52086 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1382,18 +1382,18 @@ dependencies = [ [[package]] name = "serde" -version = "1.0.151" +version = "1.0.152" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "97fed41fc1a24994d044e6db6935e69511a1153b52c15eb42493b26fa87feba0" +checksum = "bb7d1f0d3021d347a83e556fc4683dea2ea09d87bccdf88ff5c12545d89d5efb" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.151" +version = "1.0.152" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "255abe9a125a985c05190d687b320c12f9b1f0b99445e608c21ba0782c719ad8" +checksum = "af487d118eecd09402d70a5d72551860e788df87b464af30e5ea6a38c75c541e" dependencies = [ "proc-macro2", "quote", diff --git a/resources/config/config.toml b/resources/config/config.toml index 0e498ad..ba4b0b6 100644 --- a/resources/config/config.toml +++ b/resources/config/config.toml @@ -41,6 +41,10 @@ log-level = "trace" concurrency = 1024 throttle-map = {shard = 32, capacity = 10} +[global.spool] +path = "/var/spool/queue" +hash = 123 + [session] timeout = "5m" transfer-limit = 5000000 @@ -57,9 +61,12 @@ rate = "3/1m" [session.ehlo] require = true #script = ehlo.sieve -multiple = true [session.ehlo.capabilities] +auth = [ + { if = "listener", eq = "submission", then = ["plain", "login"]}, + { else = false } +] pipelining = true chunking = true requiretls = true @@ -71,19 +78,12 @@ future-release = [ ] deliver-by = false mt-priority = false -size = 500000 -expn = false +vrfy = true +expn = true [session.auth] -enable = [ - { if = "listener", eq = "submission", then = ["plain", "login"]}, - { else = [] } -] -require = [ - { if = "listener", eq = "submission", then = true}, - { else = false } -] -lookup = "lmtp" +lookup = [ { if = "listener", eq = "submission", then = "local-addresses" }, + { else = false } ] [session.auth.errors] total = 3 @@ -100,15 +100,17 @@ rate = "10/1m" [session.rcpt] #script = rcpt-to.sieve -relay = [ { if = "authenticated-as", ne = "", then = true}, - { else = false} ] +relay = [ { if = "authenticated-as", ne = "", then = true }, + { else = false } ] max-recipients = 100 -vrfy = true -expn = true [session.rcpt.lookup] domains = "local-domains" addresses = "local-addresses" +vrfy = [ { if = "authenticated-as", ne = "", then = "local-addresses" }, + { else = false } ] +expn = [ { if = "authenticated-as", ne = "", then = "local-addresses" }, + { else = false } ] [session.rcpt.errors] total = 3 @@ -129,7 +131,7 @@ received-headers = 50 mime-parts = 50 nested-messages = 3 -[state.data.add-headers] +[session.data.add-headers] received = true received-spf = true return-path = true @@ -177,7 +179,6 @@ set-body-length = false address = 192.168.0.1 port = 25 protocol = "lmtp" -#tls = "optional, require, dane, dane-fallback-require, dane-require [remote."lmtp".auth] username = "hello" @@ -192,18 +193,21 @@ implicit = true allow-invalid-certs = true [queue] -retry = [0, 1, 15, 60, 90] -notify = [9, 10] -prefer = ipv6 +retry = ["0m", "2m", "5m", "10m", "15m", "30m", "1h", "2h"] +notify = ["1d", "3d"] source-ips = ["192.168.0.2", "162.168.0.1"] #tls = optional, require, dane, dane-fallback-require, dane-require -relay-host = "lmtp" +next-hop = "lmtp" [queue.limits] attempts = 100 -time = 3600 -queued-messages = 10000 -queue-size = 1000000 +lifetime = "5d" + +[[queue.size]] +match = {if = "remote-ip", eq = "127.0.0.1"} +key = [""] +messages = 10000 +bytes = 1000000 [[queue.throttle]] rate = "1/60s" @@ -224,9 +228,7 @@ a = 1000 mx = 9393 txt = 3233 -[general.spool] -path = "/var/spool/queue" -hash = 123 + [scripts] diff --git a/resources/tests/config/rules.toml b/resources/tests/config/rules.toml index 756e822..83f9fa6 100644 --- a/resources/tests/config/rules.toml +++ b/resources/tests/config/rules.toml @@ -25,5 +25,5 @@ if = "sender-domain" starts-with = "example" [[rule."expanded".all-of]] -if = "mx" +if = "sender" in-list = "test-list" diff --git a/resources/tests/config/throttle.toml b/resources/tests/config/throttle.toml index 6b83b3a..3e9dcba 100644 --- a/resources/tests/config/throttle.toml +++ b/resources/tests/config/throttle.toml @@ -1,5 +1,5 @@ [[throttle]] -if = {if = "remote-ip", eq = "127.0.0.1"} +match = {if = "remote-ip", eq = "127.0.0.1"} key = ["remote-ip", "authenticated-as"] concurrency = 100 rate = "50/30s" diff --git a/src/config/condition.rs b/src/config/condition.rs index edb3104..2cf0581 100644 --- a/src/config/condition.rs +++ b/src/config/condition.rs @@ -141,7 +141,6 @@ impl Config { | EnvelopeKey::Sender | EnvelopeKey::SenderDomain | EnvelopeKey::AuthenticatedAs - | EnvelopeKey::Mx | EnvelopeKey::LocalIp | EnvelopeKey::RemoteIp, _, @@ -239,7 +238,6 @@ impl Config { EnvelopeKey::SenderDomain, EnvelopeKey::AuthenticatedAs, EnvelopeKey::Listener, - EnvelopeKey::Mx, EnvelopeKey::RemoteIp, EnvelopeKey::LocalIp, EnvelopeKey::Priority, @@ -368,7 +366,7 @@ mod tests { }, Condition::JumpIfFalse { positions: 1 }, Condition::Match { - key: EnvelopeKey::Mx, + key: EnvelopeKey::Sender, op: ConditionOp::Equal, value: ConditionValue::List(list), not: false, diff --git a/src/config/if_block.rs b/src/config/if_block.rs index cf42932..31c1e69 100644 --- a/src/config/if_block.rs +++ b/src/config/if_block.rs @@ -270,7 +270,6 @@ mod tests { EnvelopeKey::SenderDomain, EnvelopeKey::AuthenticatedAs, EnvelopeKey::Listener, - EnvelopeKey::Mx, EnvelopeKey::RemoteIp, EnvelopeKey::LocalIp, EnvelopeKey::Priority, diff --git a/src/config/mod.rs b/src/config/mod.rs index a3803be..fc283b3 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -3,6 +3,7 @@ pub mod condition; pub mod if_block; pub mod list; pub mod parser; +pub mod queue; pub mod remote; pub mod resolver; pub mod server; @@ -12,12 +13,14 @@ pub mod utils; use std::{ collections::BTreeMap, - net::{Ipv4Addr, Ipv6Addr, SocketAddr}, + net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}, + path::PathBuf, sync::Arc, time::Duration, }; use ahash::{AHashMap, AHashSet}; +use mail_send::Credentials; use regex::Regex; use rustls::ServerConfig; use smtp_proto::MtPriority; @@ -79,9 +82,6 @@ impl Default for List { } } -#[derive(Debug, Default)] -pub struct Queue {} - #[derive(Debug, PartialEq, Eq, Clone, Copy, Default)] pub enum ServerProtocol { #[default] @@ -168,7 +168,6 @@ pub enum EnvelopeKey { HeloDomain, AuthenticatedAs, Listener, - Mx, RemoteIp, LocalIp, Priority, @@ -234,9 +233,9 @@ pub struct Connect { pub struct Ehlo { pub script: IfBlock>>, pub require: IfBlock, - pub multiple: IfBlock, // Capabilities + pub auth: IfBlock, pub pipelining: IfBlock, pub chunking: IfBlock, pub requiretls: IfBlock, @@ -245,13 +244,13 @@ pub struct Ehlo { pub deliver_by: IfBlock>, pub mt_priority: IfBlock>, pub size: IfBlock>, + pub expn: IfBlock, + pub vrfy: IfBlock, } pub struct Auth { pub script: IfBlock>>, - pub require: IfBlock, pub lookup: IfBlock>>, - pub mechanisms: IfBlock, pub errors_max: IfBlock, pub errors_wait: IfBlock, } @@ -264,10 +263,10 @@ pub struct Mail { pub struct Rcpt { pub script: IfBlock>>, pub relay: IfBlock, - pub expn: IfBlock, - pub vrfy: IfBlock, pub lookup_domains: IfBlock>>, pub lookup_addresses: IfBlock>>, + pub lookup_expn: IfBlock>>, + pub lookup_vrfy: IfBlock>>, // Errors pub errors_max: IfBlock, @@ -312,6 +311,35 @@ pub struct SessionConfig { pub data: Data, } +pub struct RelayHost { + pub address: String, + pub port: u16, + pub protocol: ServerProtocol, + pub auth: Option>, + pub tls_implicit: bool, + pub tls_allow_invalid_certs: bool, +} + +pub struct Queue { + pub path: PathBuf, + pub hash: usize, + + pub retry: IfBlock>, + pub notify: IfBlock>, + pub source_ips: IfBlock>, + pub relay_host: IfBlock>, + pub tls: IfBlock, + + // Limits + pub attempts_max: IfBlock, + pub lifetime_max: IfBlock, + pub messages_max: IfBlock, + pub size_max: IfBlock, + + // Throttle + pub throttle: Vec, +} + pub enum AuthLevel { Enable, Disable, @@ -329,7 +357,6 @@ pub struct ConfigContext { pub hosts: AHashMap, pub scripts: AHashMap>, pub lists: AHashMap>, - pub queues: AHashMap>, } pub type Result = std::result::Result; diff --git a/src/config/queue.rs b/src/config/queue.rs new file mode 100644 index 0000000..be1cb09 --- /dev/null +++ b/src/config/queue.rs @@ -0,0 +1,154 @@ +use std::{fs, time::Duration}; + +use mail_send::Credentials; + +use super::{utils::ParseValue, *}; + +impl Config { + pub fn parse_queue(&self, ctx: &ConfigContext) -> super::Result { + let available_envelope_keys = [ + EnvelopeKey::Recipient, + EnvelopeKey::RecipientDomain, + EnvelopeKey::Sender, + EnvelopeKey::SenderDomain, + EnvelopeKey::Priority, + ]; + let available_throttle_keys = + THROTTLE_RCPT_DOMAIN | THROTTLE_MX | THROTTLE_REMOTE_IP | THROTTLE_LOCAL_IP; + + let relay_host = self + .parse_if_block::>("queue.relay-host", ctx, &available_envelope_keys)? + .unwrap_or_else(|| IfBlock::new(None)); + + let path = self.property_require::("global.spool.path")?; + if !path.exists() { + fs::create_dir(&path) + .map_err(|err| format!("Failed to create spool directory {:?}: {}", path, err))?; + } + + Ok(Queue { + path, + hash: self.property("global.spool.hash")?.unwrap_or(32), + retry: self + .parse_if_block("queue.retry", ctx, &available_envelope_keys)? + .unwrap_or_else(|| { + IfBlock::new(vec![ + Duration::from_secs(0), + Duration::from_secs(2 * 60), + Duration::from_secs(5 * 60), + Duration::from_secs(10 * 60), + Duration::from_secs(15 * 60), + Duration::from_secs(30 * 60), + Duration::from_secs(3600), + Duration::from_secs(2 * 3600), + ]) + }), + notify: self + .parse_if_block( + "queue.notify", + ctx, + &[ + EnvelopeKey::Sender, + EnvelopeKey::SenderDomain, + EnvelopeKey::Priority, + ], + )? + .unwrap_or_else(|| { + IfBlock::new(vec![ + Duration::from_secs(86400), + Duration::from_secs(2 * 86400), + ]) + }), + source_ips: self + .parse_if_block("queue.source-ips", ctx, &available_envelope_keys)? + .unwrap_or_else(|| IfBlock::new(Vec::new())), + relay_host: IfBlock { + if_then: { + let mut if_then = Vec::with_capacity(relay_host.if_then.len()); + + for i in relay_host.if_then { + if_then.push(IfThen { + conditions: i.conditions, + then: if let Some(then) = i.then { + Some( + ctx.hosts + .get(&then) + .ok_or_else(|| { + format!( + "Relay host {:?} not found for property \"queue.relay-host\".", + then + ) + })? + .into(), + ) + } else { + None + }, + }); + } + + if_then + }, + default: if let Some(default) = relay_host.default { + Some( + ctx.hosts + .get(&default) + .ok_or_else(|| { + format!( + "Relay host {:?} not found for property \"queue.relay-host\".", + default + ) + })? + .into(), + ) + } else { + None + }, + }, + tls: self + .parse_if_block("queue.tls", ctx, &available_envelope_keys)? + .unwrap_or_else(|| IfBlock::new(true)), + attempts_max: self + .parse_if_block("queue.limits.attempts", ctx, &available_envelope_keys)? + .unwrap_or_else(|| IfBlock::new(100)), + lifetime_max: self + .parse_if_block("queue.limits.lifetime", ctx, &available_envelope_keys)? + .unwrap_or_else(|| IfBlock::new(Duration::from_secs(5 * 86400))), + messages_max: self + .parse_if_block("queue.limits.messages", ctx, &available_envelope_keys)? + .unwrap_or_else(|| IfBlock::new(1024 * 1024)), + size_max: self + .parse_if_block("queue.limits.size", ctx, &available_envelope_keys)? + .unwrap_or_else(|| IfBlock::new(1024 * 1024 * 1024)), + throttle: self.parse_throttle( + "queue.throttle", + ctx, + &available_envelope_keys, + available_throttle_keys, + )?, + }) + } +} + +impl From<&Host> for RelayHost { + fn from(host: &Host) -> Self { + RelayHost { + address: host.address.to_string(), + port: host.port, + protocol: host.protocol, + auth: if let (Some(username), Some(secret)) = (&host.username, &host.secret) { + Credentials::new(username.to_string(), secret.to_string()).into() + } else { + None + }, + tls_implicit: host.tls_implicit, + tls_allow_invalid_certs: host.tls_allow_invalid_certs, + } + } +} + +impl ParseValue for PathBuf { + fn parse_value(_key: impl utils::AsKey, value: &str) -> super::Result { + Ok(PathBuf::from(value)) + } +} diff --git a/src/config/session.rs b/src/config/session.rs index db6111a..f73875e 100644 --- a/src/config/session.rs +++ b/src/config/session.rs @@ -62,6 +62,14 @@ impl Config { EnvelopeKey::RemoteIp, EnvelopeKey::LocalIp, ]; + let mechanisms = self + .parse_if_block::>( + "session.ehlo.capabilities.auth", + ctx, + &available_keys, + )? + .unwrap_or_default(); + Ok(Ehlo { script: self .parse_if_block::>("session.ehlo.script", ctx, &available_keys)? @@ -70,9 +78,6 @@ impl Config { require: self .parse_if_block("session.ehlo.require", ctx, &available_keys)? .unwrap_or_else(|| IfBlock::new(true)), - multiple: self - .parse_if_block("session.ehlo.multiple", ctx, &available_keys)? - .unwrap_or_else(|| IfBlock::new(true)), pipelining: self .parse_if_block("session.ehlo.capabilities.pipelining", ctx, &available_keys)? .unwrap_or_else(|| IfBlock::new(true)), @@ -109,32 +114,13 @@ impl Config { size: self .parse_if_block("session.ehlo.capabilities.size", ctx, &available_keys)? .unwrap_or_else(|| IfBlock::new(Some(25 * 1024 * 1024))), - }) - } - - fn parse_session_auth(&self, ctx: &ConfigContext) -> super::Result { - let available_keys = [ - EnvelopeKey::Listener, - EnvelopeKey::RemoteIp, - EnvelopeKey::LocalIp, - EnvelopeKey::HeloDomain, - ]; - let mechanisms = self - .parse_if_block::>("session.auth.enable", ctx, &available_keys)? - .unwrap_or_default(); - Ok(Auth { - script: self - .parse_if_block::>("session.auth.script", ctx, &available_keys)? - .unwrap_or_default() - .map_if_block(&ctx.scripts, "session.auth.script", "script")?, - require: self - .parse_if_block("session.auth.require", ctx, &available_keys)? - .unwrap_or_default(), - lookup: self - .parse_if_block::>("session.auth.lookup", ctx, &available_keys)? - .unwrap_or_default() - .map_if_block(&ctx.lists, "session.auth.lookup", "lookup list")?, - mechanisms: IfBlock { + expn: self + .parse_if_block("session.ehlo.capabilities.expn", ctx, &available_keys)? + .unwrap_or_else(|| IfBlock::new(false)), + vrfy: self + .parse_if_block("session.ehlo.capabilities.vrfy", ctx, &available_keys)? + .unwrap_or_else(|| IfBlock::new(false)), + auth: IfBlock { if_then: mechanisms .if_then .into_iter() @@ -148,6 +134,26 @@ impl Config { .into_iter() .fold(0, |acc, m| acc | m.mechanism), }, + }) + } + + fn parse_session_auth(&self, ctx: &ConfigContext) -> super::Result { + let available_keys = [ + EnvelopeKey::Listener, + EnvelopeKey::RemoteIp, + EnvelopeKey::LocalIp, + EnvelopeKey::HeloDomain, + ]; + + Ok(Auth { + script: self + .parse_if_block::>("session.auth.script", ctx, &available_keys)? + .unwrap_or_default() + .map_if_block(&ctx.scripts, "session.auth.script", "script")?, + lookup: self + .parse_if_block::>("session.auth.lookup", ctx, &available_keys)? + .unwrap_or_default() + .map_if_block(&ctx.lists, "session.auth.lookup", "lookup list")?, errors_max: self .parse_if_block("session.auth.errors.max", ctx, &available_keys)? .unwrap_or_else(|| IfBlock::new(3)), @@ -203,12 +209,7 @@ impl Config { relay: self .parse_if_block("session.rcpt.relay", ctx, &available_keys)? .unwrap_or_else(|| IfBlock::new(false)), - expn: self - .parse_if_block("session.rcpt.expn", ctx, &available_keys)? - .unwrap_or_else(|| IfBlock::new(false)), - vrfy: self - .parse_if_block("session.rcpt.vrfy", ctx, &available_keys)? - .unwrap_or_else(|| IfBlock::new(false)), + lookup_domains: self .parse_if_block::>( "session.rcpt.lookup.domains", @@ -225,6 +226,14 @@ impl Config { )? .unwrap_or_default() .map_if_block(&ctx.lists, "session.rcpt.lookup.addresses", "lookup list")?, + lookup_expn: self + .parse_if_block::>("session.rcpt.lookup.expn", ctx, &available_keys)? + .unwrap_or_default() + .map_if_block(&ctx.lists, "session.rcpt.lookup.expn", "lookup list")?, + lookup_vrfy: self + .parse_if_block::>("session.rcpt.lookup.vrfy", ctx, &available_keys)? + .unwrap_or_default() + .map_if_block(&ctx.lists, "session.rcpt.lookup.vrfy", "lookup list")?, errors_max: self .parse_if_block("session.rcpt.errors.max", ctx, &available_keys)? .unwrap_or_else(|| IfBlock::new(10)), diff --git a/src/config/throttle.rs b/src/config/throttle.rs index 45d498e..3d8eaca 100644 --- a/src/config/throttle.rs +++ b/src/config/throttle.rs @@ -137,7 +137,6 @@ impl ParseValue for EnvelopeKey { "sender" => EnvelopeKey::Sender, "sender-domain" => EnvelopeKey::SenderDomain, "listener" => EnvelopeKey::Listener, - "mx" => EnvelopeKey::Mx, "remote-ip" => EnvelopeKey::RemoteIp, "local-ip" => EnvelopeKey::LocalIp, "priority" => EnvelopeKey::Priority, @@ -178,7 +177,6 @@ mod tests { EnvelopeKey::SenderDomain, EnvelopeKey::AuthenticatedAs, EnvelopeKey::Listener, - EnvelopeKey::Mx, EnvelopeKey::RemoteIp, EnvelopeKey::LocalIp, EnvelopeKey::Priority, diff --git a/src/core/mod.rs b/src/core/mod.rs index 03089d8..e8b6f43 100644 --- a/src/core/mod.rs +++ b/src/core/mod.rs @@ -13,12 +13,16 @@ use smtp_proto::{ }, MtPriority, }; -use tokio::io::{AsyncRead, AsyncWrite}; +use tokio::{ + io::{AsyncRead, AsyncWrite}, + sync::mpsc, +}; use tracing::Span; use crate::{ config::{EnvelopeKey, List, Script, ServerProtocol, SessionConfig}, listener::auth::SaslToken, + queue, }; use self::throttle::{ @@ -33,6 +37,7 @@ pub struct Core { pub config: SessionConfig, pub concurrency: ConcurrencyLimiter, pub throttle: DashMap, + pub queue_tx: mpsc::Sender, } pub enum State { @@ -75,7 +80,9 @@ pub struct SessionData { pub authenticated_as: String, pub auth_errors: usize, pub priority: i16, + pub valid_until: Instant, + pub bytes_left: usize, pub messages_sent: usize, } @@ -87,27 +94,30 @@ pub struct SessionAddress { #[derive(Debug, Default)] pub struct SessionParameters { + // Global parameters + pub timeout: Duration, + // Ehlo parameters pub ehlo_script: Option>, pub ehlo_require: bool, - pub ehlo_multiple: bool, // Supported capabilities pub pipelining: bool, pub chunking: bool, pub requiretls: bool, pub starttls: bool, + pub expn: bool, + pub vrfy: bool, pub no_soliciting: Option, pub future_release: Option, pub deliver_by: Option, pub mt_priority: Option, pub size: Option, + pub auth: u64, // Auth parameters pub auth_script: Option>, - pub auth_require: bool, pub auth_lookup: Option>, - pub auth_mechanisms: u64, pub auth_errors_max: usize, pub auth_errors_wait: Duration, @@ -122,6 +132,8 @@ pub struct SessionParameters { pub rcpt_max: usize, pub rcpt_lookup_domain: Option>, pub rcpt_lookup_addresses: Option>, + pub rcpt_lookup_expn: Option>, + pub rcpt_lookup_vrfy: Option>, // Data parameters pub data_script: Option>, @@ -153,6 +165,7 @@ impl SessionData { message: Vec::with_capacity(0), auth_errors: 0, messages_sent: 0, + bytes_left: 0, } } } @@ -184,7 +197,6 @@ pub trait Envelope { EnvelopeKey::Sender => self.sender().into(), EnvelopeKey::SenderDomain => self.sender_domain().into(), EnvelopeKey::AuthenticatedAs => self.authenticated_as().into(), - EnvelopeKey::Mx => self.mx().into(), EnvelopeKey::HeloDomain => self.helo_domain().into(), EnvelopeKey::Listener => self.listener_id().to_string().into(), EnvelopeKey::RemoteIp => self.remote_ip().to_string().into(), diff --git a/src/core/params.rs b/src/core/params.rs index 0e07943..d0631ef 100644 --- a/src/core/params.rs +++ b/src/core/params.rs @@ -3,13 +3,16 @@ use tokio::io::{AsyncRead, AsyncWrite}; use super::{Session, SessionParameters}; impl Session { - pub async fn eval_ehlo_params(&mut self) { + pub async fn eval_session_params(&mut self) { + self.params.timeout = *self.core.config.timeout.eval(self).await; + self.data.bytes_left = *self.core.config.transfer_limit.eval(self).await; self.data.valid_until += *self.core.config.duration.eval(self).await; + } - // Ehlo parameter + pub async fn eval_ehlo_params(&mut self) { + // Ehlo parameters self.params.ehlo_script = self.core.config.ehlo.script.eval(self).await.clone(); self.params.ehlo_require = *self.core.config.ehlo.require.eval(self).await; - self.params.ehlo_multiple = *self.core.config.ehlo.multiple.eval(self).await; // Capabilities self.params.pipelining = *self.core.config.ehlo.pipelining.eval(self).await; @@ -20,14 +23,15 @@ impl Session { self.params.deliver_by = *self.core.config.ehlo.deliver_by.eval(self).await; self.params.mt_priority = *self.core.config.ehlo.mt_priority.eval(self).await; self.params.size = *self.core.config.ehlo.size.eval(self).await; + self.params.auth = *self.core.config.ehlo.auth.eval(self).await; + self.params.expn = *self.core.config.ehlo.expn.eval(self).await; + self.params.vrfy = *self.core.config.ehlo.vrfy.eval(self).await; } pub async fn eval_auth_params(&mut self) { // Auth parameters self.params.auth_script = self.core.config.auth.script.eval(self).await.clone(); - self.params.auth_require = *self.core.config.auth.require.eval(self).await; self.params.auth_lookup = self.core.config.auth.lookup.eval(self).await.clone(); - self.params.auth_mechanisms = *self.core.config.auth.mechanisms.eval(self).await; self.params.auth_errors_max = *self.core.config.auth.errors_max.eval(self).await; self.params.auth_errors_wait = *self.core.config.auth.errors_wait.eval(self).await; } @@ -42,6 +46,24 @@ impl Session { self.params.rcpt_errors_max = *self.core.config.rcpt.errors_max.eval(self).await; self.params.rcpt_errors_wait = *self.core.config.rcpt.errors_wait.eval(self).await; self.params.rcpt_max = *self.core.config.rcpt.max_recipients.eval(self).await; + self.params.rcpt_lookup_domain = self + .core + .config + .rcpt + .lookup_domains + .eval(self) + .await + .clone(); + self.params.rcpt_lookup_addresses = self + .core + .config + .rcpt + .lookup_addresses + .eval(self) + .await + .clone(); + self.params.rcpt_lookup_expn = self.core.config.rcpt.lookup_expn.eval(self).await.clone(); + self.params.rcpt_lookup_vrfy = self.core.config.rcpt.lookup_vrfy.eval(self).await.clone(); } pub async fn eval_data_params(&mut self) { diff --git a/src/core/throttle.rs b/src/core/throttle.rs index b91a4b5..6f5bf9d 100644 --- a/src/core/throttle.rs +++ b/src/core/throttle.rs @@ -1,5 +1,4 @@ use dashmap::mapref::entry::Entry; -use parking_lot::Mutex; use tokio::io::{AsyncRead, AsyncWrite}; use std::{ @@ -26,7 +25,7 @@ pub struct Limiter { pub struct RateLimiter { max_requests: f64, max_interval: f64, - limiter: Arc>, + limiter: (Instant, f64), } #[derive(Debug)] @@ -50,29 +49,28 @@ impl RateLimiter { RateLimiter { max_requests: max_requests as f64, max_interval: max_interval as f64, - limiter: Arc::new(Mutex::new((Instant::now(), max_requests as f64))), + limiter: (Instant::now(), max_requests as f64), } } - pub fn is_allowed(&self) -> bool { + pub fn is_allowed(&mut self) -> bool { // Check rate limit - let mut limiter = self.limiter.lock(); - let elapsed = limiter.0.elapsed().as_secs_f64(); - limiter.0 = Instant::now(); - limiter.1 += elapsed * (self.max_requests / self.max_interval); - if limiter.1 > self.max_requests { - limiter.1 = self.max_requests; + let elapsed = self.limiter.0.elapsed().as_secs_f64(); + self.limiter.1 += elapsed * (self.max_requests / self.max_interval); + if self.limiter.1 > self.max_requests { + self.limiter.1 = self.max_requests; } - if limiter.1 >= 1.0 { - limiter.1 -= 1.0; + if self.limiter.1 >= 1.0 { + self.limiter.0 = Instant::now(); + self.limiter.1 -= 1.0; true } else { false } } - pub fn reset(&self) { - *self.limiter.lock() = (Instant::now(), self.max_requests); + pub fn reset(&mut self) { + self.limiter = (Instant::now(), self.max_requests); } } @@ -208,8 +206,8 @@ impl Session { if t.conditions.conditions.is_empty() || t.conditions.eval(self).await { // Build throttle key match self.core.throttle.entry(ThrottleKey::new(self, t)) { - Entry::Occupied(e) => { - let limiter = e.get(); + Entry::Occupied(mut e) => { + let limiter = e.get_mut(); if let Some(limiter) = &limiter.concurrency { if let Some(inflight) = limiter.is_allowed() { self.in_flight.push(inflight); @@ -224,7 +222,7 @@ impl Session { return false; } } - if let Some(limiter) = &limiter.rate { + if let Some(limiter) = &mut limiter.rate { if !limiter.is_allowed() { tracing::info!( parent: &self.span, diff --git a/src/lib.rs b/src/lib.rs index bdcadb7..3989336 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,5 @@ pub mod config; pub mod core; pub mod listener; +pub mod queue; pub mod remote; diff --git a/src/listener/ehlo.rs b/src/listener/ehlo.rs index 39014b2..ee3d853 100644 --- a/src/listener/ehlo.rs +++ b/src/listener/ehlo.rs @@ -7,74 +7,72 @@ use crate::core::Session; impl Session { pub async fn handle_ehlo(&mut self, domain: String) -> Result<(), ()> { - if self.data.helo_domain.is_empty() || self.params.ehlo_multiple { - // Set EHLO domain + // Set EHLO domain + if domain != self.data.helo_domain { self.data.helo_domain = domain; // Eval mail parameters self.eval_mail_params().await; - - let mut response = EhloResponse::new(self.instance.hostname.as_str()); - response.capabilities = - EXT_ENHANCED_STATUS_CODES | EXT_8BIT_MIME | EXT_BINARY_MIME | EXT_SMTP_UTF8; - if self.params.starttls { - response.capabilities |= EXT_START_TLS; - } - if self.params.pipelining { - response.capabilities |= EXT_PIPELINING; - } - if self.params.chunking { - response.capabilities |= EXT_CHUNKING; - } - if self.can_expn().await { - response.capabilities |= EXT_EXPN; - } - if self.can_vrfy().await { - response.capabilities |= EXT_VRFY; - } - if self.params.requiretls { - response.capabilities |= EXT_REQUIRE_TLS; - } - if self.params.auth_mechanisms != 0 { - response.capabilities |= EXT_AUTH; - response.auth_mechanisms = self.params.auth_mechanisms; - } - if let Some(value) = &self.params.future_release { - response.capabilities |= EXT_FUTURE_RELEASE; - response.future_release_interval = value.as_secs(); - response.future_release_datetime = SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .map(|d| d.as_secs()) - .unwrap_or(0) - + value.as_secs(); - } - if let Some(value) = &self.params.deliver_by { - response.capabilities |= EXT_DELIVER_BY; - response.deliver_by = value.as_secs(); - } - if let Some(value) = &self.params.mt_priority { - response.capabilities |= EXT_MT_PRIORITY; - response.mt_priority = *value; - } - if let Some(value) = &self.params.size { - response.capabilities |= EXT_SIZE; - response.size = *value; - } - if let Some(value) = &self.params.no_soliciting { - response.capabilities |= EXT_NO_SOLICITING; - response.no_soliciting = if !value.is_empty() { - value.to_string().into() - } else { - None - }; - } - - // Generate response - let mut buf = Vec::with_capacity(64); - response.write(&mut buf).ok(); - self.write(&buf).await - } else { - self.write(b"503 5.5.1 Already said hello.\r\n").await } + + let mut response = EhloResponse::new(self.instance.hostname.as_str()); + response.capabilities = + EXT_ENHANCED_STATUS_CODES | EXT_8BIT_MIME | EXT_BINARY_MIME | EXT_SMTP_UTF8; + if self.params.starttls { + response.capabilities |= EXT_START_TLS; + } + if self.params.pipelining { + response.capabilities |= EXT_PIPELINING; + } + if self.params.chunking { + response.capabilities |= EXT_CHUNKING; + } + if self.params.expn { + response.capabilities |= EXT_EXPN; + } + if self.params.vrfy { + response.capabilities |= EXT_VRFY; + } + if self.params.requiretls { + response.capabilities |= EXT_REQUIRE_TLS; + } + if self.params.auth != 0 { + response.capabilities |= EXT_AUTH; + response.auth_mechanisms = self.params.auth; + } + if let Some(value) = &self.params.future_release { + response.capabilities |= EXT_FUTURE_RELEASE; + response.future_release_interval = value.as_secs(); + response.future_release_datetime = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .map(|d| d.as_secs()) + .unwrap_or(0) + + value.as_secs(); + } + if let Some(value) = &self.params.deliver_by { + response.capabilities |= EXT_DELIVER_BY; + response.deliver_by = value.as_secs(); + } + if let Some(value) = &self.params.mt_priority { + response.capabilities |= EXT_MT_PRIORITY; + response.mt_priority = *value; + } + if let Some(value) = &self.params.size { + response.capabilities |= EXT_SIZE; + response.size = *value; + } + if let Some(value) = &self.params.no_soliciting { + response.capabilities |= EXT_NO_SOLICITING; + response.no_soliciting = if !value.is_empty() { + value.to_string().into() + } else { + None + }; + } + + // Generate response + let mut buf = Vec::with_capacity(64); + response.write(&mut buf).ok(); + self.write(&buf).await } } diff --git a/src/listener/mail.rs b/src/listener/mail.rs index 0aef4ce..bd237c3 100644 --- a/src/listener/mail.rs +++ b/src/listener/mail.rs @@ -1,14 +1,10 @@ -use smtp_proto::Parameter; +use smtp_proto::MailFrom; use tokio::io::{AsyncRead, AsyncWrite}; use crate::core::{Session, SessionAddress}; impl Session { - pub async fn handle_mail_from( - &mut self, - from: String, - parameters: Vec>, - ) -> Result<(), ()> { + pub async fn handle_mail_from(&mut self, from: MailFrom) -> Result<(), ()> { if self.data.helo_domain.is_empty() && self.params.ehlo_require { return self .write(b"503 5.5.1 Polite people say EHLO first.\r\n") @@ -17,14 +13,12 @@ impl Session { return self .write(b"503 5.5.1 Multiple MAIL commands not allowed.\r\n") .await; - } else if self.params.auth_require && self.data.authenticated_as.is_empty() { - return self.write(b"530 5.7.0 Authentication required.\r\n").await; } - self.data.mail_from = if !from.is_empty() { - let address_lcase = from.to_lowercase(); + self.data.mail_from = if !from.address.is_empty() { + let address_lcase = from.address.to_lowercase(); SessionAddress { - address: from, + address: from.address, domain: address_lcase .rsplit_once('@') .map(|(_, d)| d) diff --git a/src/listener/rcpt.rs b/src/listener/rcpt.rs index 728685c..fb8e7d9 100644 --- a/src/listener/rcpt.rs +++ b/src/listener/rcpt.rs @@ -1,14 +1,10 @@ -use smtp_proto::Parameter; +use smtp_proto::RcptTo; use tokio::io::{AsyncRead, AsyncWrite}; use crate::core::{Session, SessionAddress}; impl Session { - pub async fn handle_rcpt_to( - &mut self, - to: String, - parameters: Vec>, - ) -> Result<(), ()> { + pub async fn handle_rcpt_to(&mut self, to: RcptTo) -> Result<(), ()> { if self.data.mail_from.is_none() { return self.write(b"503 5.5.1 MAIL is required first.\r\n").await; } else if self.data.rcpt_to.len() >= self.params.rcpt_max { @@ -16,7 +12,7 @@ impl Session { } // Build RCPT - let address_lcase = to.to_lowercase(); + let address_lcase = to.address.to_lowercase(); let rcpt = SessionAddress { domain: address_lcase .rsplit_once('@') @@ -24,7 +20,7 @@ impl Session { .unwrap_or_default() .to_string(), address_lcase, - address: to, + address: to.address, }; // Verify address @@ -63,7 +59,6 @@ impl Session { .is_allowed(&self.core.clone().config.rcpt.throttle) .await { - self.eval_data_params().await; self.write(b"250 2.1.5 OK\r\n").await } else { self.data.rcpt_to.pop(); diff --git a/src/listener/session.rs b/src/listener/session.rs index b666239..bacf8be 100644 --- a/src/listener/session.rs +++ b/src/listener/session.rs @@ -26,11 +26,11 @@ impl Session { State::Request(receiver) => loop { match receiver.ingest(&mut iter, bytes) { Ok(request) => match request { - Request::Rcpt { to, parameters } => { - self.handle_rcpt_to(to, parameters).await?; + Request::Rcpt { to } => { + self.handle_rcpt_to(to).await?; } - Request::Mail { from, parameters } => { - self.handle_mail_from(from, parameters).await?; + Request::Mail { from } => { + self.handle_mail_from(from).await?; } Request::Ehlo { host } => { if self.instance.protocol == ServerProtocol::Smtp { @@ -40,6 +40,7 @@ impl Session { } } Request::Data => { + self.eval_data_params().await; if self.can_send_data().await? { self.write(b"354 Start mail input; end with .\r\n") .await?; @@ -52,6 +53,7 @@ impl Session { chunk_size, is_last, } => { + self.eval_data_params().await; state = if chunk_size + self.data.message.len() < self.params.data_max_message_size { @@ -71,25 +73,24 @@ impl Session { mechanism, initial_response, } => { - if let Some(mut token) = SaslToken::from_mechanism( - mechanism & self.params.auth_mechanisms, - ) { - if self.data.authenticated_as.is_empty() { - if self - .handle_sasl_response( - &mut token, - initial_response.as_bytes(), - ) - .await? - { - state = State::Sasl(LineReceiver::new(token)); - continue 'outer; - } - } else { - self.write(b"503 5.5.1 Already authenticated.\r\n").await?; - } - } else if self.params.auth_mechanisms == 0 { + // TODO no plain auth over plaintext + if self.params.auth == 0 || self.params.auth_lookup.is_none() { self.write(b"503 5.5.1 AUTH not allowed.\r\n").await?; + } else if !self.data.authenticated_as.is_empty() { + self.write(b"503 5.5.1 Already authenticated.\r\n").await?; + } else if let Some(mut token) = + SaslToken::from_mechanism(mechanism & self.params.auth) + { + if self + .handle_sasl_response( + &mut token, + initial_response.as_bytes(), + ) + .await? + { + state = State::Sasl(LineReceiver::new(token)); + continue 'outer; + } } else { self.write( b"554 5.7.8 Authentication mechanism not supported.\r\n", @@ -132,7 +133,6 @@ impl Session { Request::Helo { host } => { if self.instance.protocol == ServerProtocol::Smtp && self.data.helo_domain.is_empty() - || self.params.ehlo_multiple { self.data.helo_domain = host; self.eval_mail_params().await; diff --git a/src/listener/spawn.rs b/src/listener/spawn.rs index c5fa203..edb1909 100644 --- a/src/listener/spawn.rs +++ b/src/listener/spawn.rs @@ -114,6 +114,7 @@ impl Server { if tls_implicit { if let Ok(mut session) = session.into_tls(tls_acceptor.unwrap()).await { if session.write(&instance.greeting).await.is_ok() { + session.eval_session_params().await; session.eval_ehlo_params().await; session.eval_auth_params().await; if !session.params.ehlo_require { @@ -123,6 +124,7 @@ impl Server { } } } else if session.write(&instance.greeting).await.is_ok() { + session.eval_session_params().await; session.eval_ehlo_params().await; session.eval_auth_params().await; if !session.params.ehlo_require { @@ -213,20 +215,17 @@ impl Session { mut shutdown_rx: watch::Receiver, ) -> Option<(Session, watch::Receiver)> { let mut buf = vec![0; 8192]; - let timeout = *self.core.config.timeout.eval(&self).await; - let max_bytes = *self.core.config.transfer_limit.eval(&self).await; - let mut bytes_received = 0; loop { tokio::select! { result = tokio::time::timeout( - timeout, + self.params.timeout, self.read(&mut buf)) => { match result { Ok(Ok(bytes_read)) => { if bytes_read > 0 { - bytes_received += bytes_read; - if Instant::now() < self.data.valid_until && bytes_received < max_bytes { + if Instant::now() < self.data.valid_until && bytes_read <= self.data.bytes_left { + self.data.bytes_left -= bytes_read; match self.ingest(&buf[..bytes_read]).await { Ok(true) => (), Ok(false) => { @@ -236,7 +235,7 @@ impl Session { break; } } - } else if bytes_received >= max_bytes { + } else if bytes_read > self.data.bytes_left { self .write(format!("451 4.7.28 {} Session exceeded transfer quota.\r\n", self.instance.hostname).as_bytes()) .await diff --git a/src/listener/vrfy.rs b/src/listener/vrfy.rs index def8a4e..8ce17a6 100644 --- a/src/listener/vrfy.rs +++ b/src/listener/vrfy.rs @@ -8,16 +8,12 @@ use std::fmt::Write; impl Session { pub async fn handle_vrfy(&mut self, address: String) -> Result<(), ()> { - if !self.can_vrfy().await { - return self.write(b"252 2.5.1 VRFY is disabled.\r\n").await; - } - - if let Some(address_lookup) = &self.params.rcpt_lookup_addresses { + if let Some(address_lookup) = &self.params.rcpt_lookup_vrfy { if let Some(result) = address_lookup .lookup(Item::Verify(address.to_lowercase())) .await { - return if let LookupResult::Values(values) = result { + if let LookupResult::Values(values) = result { let mut result = String::with_capacity(32); for (pos, value) in values.iter().enumerate() { let _ = write!( @@ -30,24 +26,23 @@ impl Session { self.write(result.as_bytes()).await } else { self.write(b"550 5.1.2 Address not found.\r\n").await - }; + } + } else { + self.write(b"252 2.4.3 Unable to verify address at this time.\r\n") + .await } + } else { + self.write(b"252 2.5.1 VRFY is disabled.\r\n").await } - self.write(b"252 2.4.3 Unable to verify address at this time.\r\n") - .await } pub async fn handle_expn(&mut self, address: String) -> Result<(), ()> { - if !self.can_vrfy().await { - return self.write(b"252 2.5.1 EXPN is disabled.\r\n").await; - } - - if let Some(address_lookup) = &self.params.rcpt_lookup_addresses { + if let Some(address_lookup) = &self.params.rcpt_lookup_expn { if let Some(result) = address_lookup .lookup(Item::Expand(address.to_lowercase())) .await { - return if let LookupResult::Values(values) = result { + if let LookupResult::Values(values) = result { let mut result = String::with_capacity(32); for (pos, value) in values.iter().enumerate() { let _ = write!( @@ -60,20 +55,13 @@ impl Session { self.write(result.as_bytes()).await } else { self.write(b"550 5.1.2 Mailing list not found.\r\n").await - }; + } + } else { + self.write(b"252 2.4.3 Unable to expand mailing list at this time.\r\n") + .await } + } else { + self.write(b"252 2.5.1 EXPN is disabled.\r\n").await } - self.write(b"252 2.4.3 Unable to expand mailing list at this time.\r\n") - .await - } - - #[inline(always)] - pub async fn can_expn(&self) -> bool { - *self.core.config.rcpt.vrfy.eval(self).await - } - - #[inline(always)] - pub async fn can_vrfy(&self) -> bool { - *self.core.config.rcpt.expn.eval(self).await } } diff --git a/src/main.rs b/src/main.rs index 535155d..98fc65c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -7,8 +7,9 @@ use smtp_server::{ throttle::{ConcurrencyLimiter, ThrottleKeyHasherBuilder}, Core, }, + queue, }; -use tokio::sync::watch; +use tokio::sync::{mpsc, watch}; #[tokio::main] async fn main() -> std::io::Result<()> { @@ -27,6 +28,12 @@ async fn main() -> std::io::Result<()> { let session_config = config .parse_session_config(&config_context) .failed("Configuration error"); + let queue = config + .parse_queue(&config_context) + .failed("Configuration error"); + + // Build core + let (queue_tx, queue_rx) = mpsc::channel(1024); let core = Arc::new(Core { config: session_config, concurrency: ConcurrencyLimiter::new( @@ -46,6 +53,7 @@ async fn main() -> std::io::Result<()> { .failed("Failed to parse throttle map shard amount") .unwrap_or(32), ), + queue_tx, }); // Enable logging @@ -60,12 +68,17 @@ async fn main() -> std::io::Result<()> { .finish(), ) .failed("Failed to set subscriber"); - - // Spawn listeners tracing::info!( "Starting Stalwart SMTP server v{}...", env!("CARGO_PKG_VERSION") ); + + // Spawn queue manager + queue + .spawn(queue_rx) + .failed("Failed to spawn queue manager"); + + // Spawn listeners let (shutdown_tx, shutdown_rx) = watch::channel(false); for server in config_context.servers { server @@ -105,6 +118,7 @@ async fn main() -> std::io::Result<()> { // Stop services shutdown_tx.send(true).ok(); + core.queue_tx.send(queue::Event::Stop).await.ok(); // Wait for services to finish tokio::time::sleep(Duration::from_secs(1)).await; diff --git a/src/queue/manager.rs b/src/queue/manager.rs new file mode 100644 index 0000000..d4f0249 --- /dev/null +++ b/src/queue/manager.rs @@ -0,0 +1,34 @@ +use std::{ + collections::{BinaryHeap, VecDeque}, + sync::Arc, +}; + +use ahash::{AHashMap, HashMap}; +use dashmap::DashMap; +use tokio::sync::{mpsc, watch}; + +use crate::{ + config::Queue, + core::{ + throttle::{RateLimiter, ThrottleKey, ThrottleKeyHasherBuilder}, + Core, + }, +}; + +use super::{Event, Message, QueueItem}; + +struct ThrottleQueue { + pub limiter: RateLimiter, + pub concurrent: usize, + pub queue: VecDeque>, +} + +impl Queue { + pub fn spawn(self, queue_rx: mpsc::Receiver) -> Result<(), String> { + let mut queue: BinaryHeap = BinaryHeap::new(); + let mut throttle: DashMap = + DashMap::with_capacity_and_hasher(100, ThrottleKeyHasherBuilder::default()); + + todo!() + } +} diff --git a/src/queue/message.rs b/src/queue/message.rs new file mode 100644 index 0000000..3156db7 --- /dev/null +++ b/src/queue/message.rs @@ -0,0 +1,21 @@ +use super::QueueItem; + +impl Ord for QueueItem { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + other.due.cmp(&self.due) + } +} + +impl PartialOrd for QueueItem { + fn partial_cmp(&self, other: &Self) -> Option { + other.due.partial_cmp(&self.due) + } +} + +impl PartialEq for QueueItem { + fn eq(&self, other: &Self) -> bool { + self.due == other.due + } +} + +impl Eq for QueueItem {} diff --git a/src/queue/mod.rs b/src/queue/mod.rs new file mode 100644 index 0000000..e75c726 --- /dev/null +++ b/src/queue/mod.rs @@ -0,0 +1,48 @@ +use std::time::Instant; + +pub mod manager; +pub mod message; + +pub enum Event { + Start, + Stop, +} + +pub struct QueueItem { + pub due: u64, + pub message: Box, +} + +pub struct Message { + pub id: u64, + pub created: u64, + + pub return_path: String, + pub recipients: Vec, + + pub flags: u64, + pub priority: i64, + pub size: usize, + + pub notify: Action, +} + +pub struct Recipient { + pub address: String, + pub domain: String, + pub status: Status, + pub flags: u64, + pub retry: Action, +} + +pub struct Action { + pub due_at: Instant, + pub count: u32, +} + +pub enum Status { + None, + Delivered, + TemporaryFailure, + PermanentFailure, +} diff --git a/src/remote/smtp.rs b/src/remote/smtp.rs index d4dbe6f..50bfba8 100644 --- a/src/remote/smtp.rs +++ b/src/remote/smtp.rs @@ -84,7 +84,7 @@ pub async fn lookup_smtp( .into(), true, ), - 550 | 551 | 500 | 502 => (LookupResult::False, true), + 550 | 551 | 553 | 500 | 502 => (LookupResult::False, true), _ => { return Err(mail_send::Error::UnexpectedReply(reply)); }