Queue draft impl.

This commit is contained in:
Mauro D 2022-12-27 18:11:50 +00:00
parent 635ac09add
commit 76f83fa280
25 changed files with 570 additions and 259 deletions

8
Cargo.lock generated
View file

@ -1382,18 +1382,18 @@ dependencies = [
[[package]]
name = "serde"
version = "1.0.151"
version = "1.0.152"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "97fed41fc1a24994d044e6db6935e69511a1153b52c15eb42493b26fa87feba0"
checksum = "bb7d1f0d3021d347a83e556fc4683dea2ea09d87bccdf88ff5c12545d89d5efb"
dependencies = [
"serde_derive",
]
[[package]]
name = "serde_derive"
version = "1.0.151"
version = "1.0.152"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "255abe9a125a985c05190d687b320c12f9b1f0b99445e608c21ba0782c719ad8"
checksum = "af487d118eecd09402d70a5d72551860e788df87b464af30e5ea6a38c75c541e"
dependencies = [
"proc-macro2",
"quote",

View file

@ -41,6 +41,10 @@ log-level = "trace"
concurrency = 1024
throttle-map = {shard = 32, capacity = 10}
[global.spool]
path = "/var/spool/queue"
hash = 123
[session]
timeout = "5m"
transfer-limit = 5000000
@ -57,9 +61,12 @@ rate = "3/1m"
[session.ehlo]
require = true
#script = ehlo.sieve
multiple = true
[session.ehlo.capabilities]
auth = [
{ if = "listener", eq = "submission", then = ["plain", "login"]},
{ else = false }
]
pipelining = true
chunking = true
requiretls = true
@ -71,19 +78,12 @@ future-release = [
]
deliver-by = false
mt-priority = false
size = 500000
expn = false
vrfy = true
expn = true
[session.auth]
enable = [
{ if = "listener", eq = "submission", then = ["plain", "login"]},
{ else = [] }
]
require = [
{ if = "listener", eq = "submission", then = true},
{ else = false }
]
lookup = "lmtp"
lookup = [ { if = "listener", eq = "submission", then = "local-addresses" },
{ else = false } ]
[session.auth.errors]
total = 3
@ -100,15 +100,17 @@ rate = "10/1m"
[session.rcpt]
#script = rcpt-to.sieve
relay = [ { if = "authenticated-as", ne = "", then = true},
{ else = false} ]
relay = [ { if = "authenticated-as", ne = "", then = true },
{ else = false } ]
max-recipients = 100
vrfy = true
expn = true
[session.rcpt.lookup]
domains = "local-domains"
addresses = "local-addresses"
vrfy = [ { if = "authenticated-as", ne = "", then = "local-addresses" },
{ else = false } ]
expn = [ { if = "authenticated-as", ne = "", then = "local-addresses" },
{ else = false } ]
[session.rcpt.errors]
total = 3
@ -129,7 +131,7 @@ received-headers = 50
mime-parts = 50
nested-messages = 3
[state.data.add-headers]
[session.data.add-headers]
received = true
received-spf = true
return-path = true
@ -177,7 +179,6 @@ set-body-length = false
address = 192.168.0.1
port = 25
protocol = "lmtp"
#tls = "optional, require, dane, dane-fallback-require, dane-require
[remote."lmtp".auth]
username = "hello"
@ -192,18 +193,21 @@ implicit = true
allow-invalid-certs = true
[queue]
retry = [0, 1, 15, 60, 90]
notify = [9, 10]
prefer = ipv6
retry = ["0m", "2m", "5m", "10m", "15m", "30m", "1h", "2h"]
notify = ["1d", "3d"]
source-ips = ["192.168.0.2", "162.168.0.1"]
#tls = optional, require, dane, dane-fallback-require, dane-require
relay-host = "lmtp"
next-hop = "lmtp"
[queue.limits]
attempts = 100
time = 3600
queued-messages = 10000
queue-size = 1000000
lifetime = "5d"
[[queue.size]]
match = {if = "remote-ip", eq = "127.0.0.1"}
key = [""]
messages = 10000
bytes = 1000000
[[queue.throttle]]
rate = "1/60s"
@ -224,9 +228,7 @@ a = 1000
mx = 9393
txt = 3233
[general.spool]
path = "/var/spool/queue"
hash = 123
[scripts]

View file

@ -25,5 +25,5 @@ if = "sender-domain"
starts-with = "example"
[[rule."expanded".all-of]]
if = "mx"
if = "sender"
in-list = "test-list"

View file

@ -1,5 +1,5 @@
[[throttle]]
if = {if = "remote-ip", eq = "127.0.0.1"}
match = {if = "remote-ip", eq = "127.0.0.1"}
key = ["remote-ip", "authenticated-as"]
concurrency = 100
rate = "50/30s"

View file

@ -141,7 +141,6 @@ impl Config {
| EnvelopeKey::Sender
| EnvelopeKey::SenderDomain
| EnvelopeKey::AuthenticatedAs
| EnvelopeKey::Mx
| EnvelopeKey::LocalIp
| EnvelopeKey::RemoteIp,
_,
@ -239,7 +238,6 @@ impl Config {
EnvelopeKey::SenderDomain,
EnvelopeKey::AuthenticatedAs,
EnvelopeKey::Listener,
EnvelopeKey::Mx,
EnvelopeKey::RemoteIp,
EnvelopeKey::LocalIp,
EnvelopeKey::Priority,
@ -368,7 +366,7 @@ mod tests {
},
Condition::JumpIfFalse { positions: 1 },
Condition::Match {
key: EnvelopeKey::Mx,
key: EnvelopeKey::Sender,
op: ConditionOp::Equal,
value: ConditionValue::List(list),
not: false,

View file

@ -270,7 +270,6 @@ mod tests {
EnvelopeKey::SenderDomain,
EnvelopeKey::AuthenticatedAs,
EnvelopeKey::Listener,
EnvelopeKey::Mx,
EnvelopeKey::RemoteIp,
EnvelopeKey::LocalIp,
EnvelopeKey::Priority,

View file

@ -3,6 +3,7 @@ pub mod condition;
pub mod if_block;
pub mod list;
pub mod parser;
pub mod queue;
pub mod remote;
pub mod resolver;
pub mod server;
@ -12,12 +13,14 @@ pub mod utils;
use std::{
collections::BTreeMap,
net::{Ipv4Addr, Ipv6Addr, SocketAddr},
net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
path::PathBuf,
sync::Arc,
time::Duration,
};
use ahash::{AHashMap, AHashSet};
use mail_send::Credentials;
use regex::Regex;
use rustls::ServerConfig;
use smtp_proto::MtPriority;
@ -79,9 +82,6 @@ impl Default for List {
}
}
#[derive(Debug, Default)]
pub struct Queue {}
#[derive(Debug, PartialEq, Eq, Clone, Copy, Default)]
pub enum ServerProtocol {
#[default]
@ -168,7 +168,6 @@ pub enum EnvelopeKey {
HeloDomain,
AuthenticatedAs,
Listener,
Mx,
RemoteIp,
LocalIp,
Priority,
@ -234,9 +233,9 @@ pub struct Connect {
pub struct Ehlo {
pub script: IfBlock<Option<Arc<Script>>>,
pub require: IfBlock<bool>,
pub multiple: IfBlock<bool>,
// Capabilities
pub auth: IfBlock<u64>,
pub pipelining: IfBlock<bool>,
pub chunking: IfBlock<bool>,
pub requiretls: IfBlock<bool>,
@ -245,13 +244,13 @@ pub struct Ehlo {
pub deliver_by: IfBlock<Option<Duration>>,
pub mt_priority: IfBlock<Option<MtPriority>>,
pub size: IfBlock<Option<usize>>,
pub expn: IfBlock<bool>,
pub vrfy: IfBlock<bool>,
}
pub struct Auth {
pub script: IfBlock<Option<Arc<Script>>>,
pub require: IfBlock<bool>,
pub lookup: IfBlock<Option<Arc<List>>>,
pub mechanisms: IfBlock<u64>,
pub errors_max: IfBlock<usize>,
pub errors_wait: IfBlock<Duration>,
}
@ -264,10 +263,10 @@ pub struct Mail {
pub struct Rcpt {
pub script: IfBlock<Option<Arc<Script>>>,
pub relay: IfBlock<bool>,
pub expn: IfBlock<bool>,
pub vrfy: IfBlock<bool>,
pub lookup_domains: IfBlock<Option<Arc<List>>>,
pub lookup_addresses: IfBlock<Option<Arc<List>>>,
pub lookup_expn: IfBlock<Option<Arc<List>>>,
pub lookup_vrfy: IfBlock<Option<Arc<List>>>,
// Errors
pub errors_max: IfBlock<usize>,
@ -312,6 +311,35 @@ pub struct SessionConfig {
pub data: Data,
}
pub struct RelayHost {
pub address: String,
pub port: u16,
pub protocol: ServerProtocol,
pub auth: Option<Credentials<String>>,
pub tls_implicit: bool,
pub tls_allow_invalid_certs: bool,
}
pub struct Queue {
pub path: PathBuf,
pub hash: usize,
pub retry: IfBlock<Vec<Duration>>,
pub notify: IfBlock<Vec<Duration>>,
pub source_ips: IfBlock<Vec<IpAddr>>,
pub relay_host: IfBlock<Option<RelayHost>>,
pub tls: IfBlock<bool>,
// Limits
pub attempts_max: IfBlock<usize>,
pub lifetime_max: IfBlock<Duration>,
pub messages_max: IfBlock<usize>,
pub size_max: IfBlock<usize>,
// Throttle
pub throttle: Vec<Throttle>,
}
pub enum AuthLevel {
Enable,
Disable,
@ -329,7 +357,6 @@ pub struct ConfigContext {
pub hosts: AHashMap<String, Host>,
pub scripts: AHashMap<String, Arc<Script>>,
pub lists: AHashMap<String, Arc<List>>,
pub queues: AHashMap<String, Arc<Queue>>,
}
pub type Result<T> = std::result::Result<T, String>;

154
src/config/queue.rs Normal file
View file

@ -0,0 +1,154 @@
use std::{fs, time::Duration};
use mail_send::Credentials;
use super::{utils::ParseValue, *};
impl Config {
pub fn parse_queue(&self, ctx: &ConfigContext) -> super::Result<Queue> {
let available_envelope_keys = [
EnvelopeKey::Recipient,
EnvelopeKey::RecipientDomain,
EnvelopeKey::Sender,
EnvelopeKey::SenderDomain,
EnvelopeKey::Priority,
];
let available_throttle_keys =
THROTTLE_RCPT_DOMAIN | THROTTLE_MX | THROTTLE_REMOTE_IP | THROTTLE_LOCAL_IP;
let relay_host = self
.parse_if_block::<Option<String>>("queue.relay-host", ctx, &available_envelope_keys)?
.unwrap_or_else(|| IfBlock::new(None));
let path = self.property_require::<PathBuf>("global.spool.path")?;
if !path.exists() {
fs::create_dir(&path)
.map_err(|err| format!("Failed to create spool directory {:?}: {}", path, err))?;
}
Ok(Queue {
path,
hash: self.property("global.spool.hash")?.unwrap_or(32),
retry: self
.parse_if_block("queue.retry", ctx, &available_envelope_keys)?
.unwrap_or_else(|| {
IfBlock::new(vec![
Duration::from_secs(0),
Duration::from_secs(2 * 60),
Duration::from_secs(5 * 60),
Duration::from_secs(10 * 60),
Duration::from_secs(15 * 60),
Duration::from_secs(30 * 60),
Duration::from_secs(3600),
Duration::from_secs(2 * 3600),
])
}),
notify: self
.parse_if_block(
"queue.notify",
ctx,
&[
EnvelopeKey::Sender,
EnvelopeKey::SenderDomain,
EnvelopeKey::Priority,
],
)?
.unwrap_or_else(|| {
IfBlock::new(vec![
Duration::from_secs(86400),
Duration::from_secs(2 * 86400),
])
}),
source_ips: self
.parse_if_block("queue.source-ips", ctx, &available_envelope_keys)?
.unwrap_or_else(|| IfBlock::new(Vec::new())),
relay_host: IfBlock {
if_then: {
let mut if_then = Vec::with_capacity(relay_host.if_then.len());
for i in relay_host.if_then {
if_then.push(IfThen {
conditions: i.conditions,
then: if let Some(then) = i.then {
Some(
ctx.hosts
.get(&then)
.ok_or_else(|| {
format!(
"Relay host {:?} not found for property \"queue.relay-host\".",
then
)
})?
.into(),
)
} else {
None
},
});
}
if_then
},
default: if let Some(default) = relay_host.default {
Some(
ctx.hosts
.get(&default)
.ok_or_else(|| {
format!(
"Relay host {:?} not found for property \"queue.relay-host\".",
default
)
})?
.into(),
)
} else {
None
},
},
tls: self
.parse_if_block("queue.tls", ctx, &available_envelope_keys)?
.unwrap_or_else(|| IfBlock::new(true)),
attempts_max: self
.parse_if_block("queue.limits.attempts", ctx, &available_envelope_keys)?
.unwrap_or_else(|| IfBlock::new(100)),
lifetime_max: self
.parse_if_block("queue.limits.lifetime", ctx, &available_envelope_keys)?
.unwrap_or_else(|| IfBlock::new(Duration::from_secs(5 * 86400))),
messages_max: self
.parse_if_block("queue.limits.messages", ctx, &available_envelope_keys)?
.unwrap_or_else(|| IfBlock::new(1024 * 1024)),
size_max: self
.parse_if_block("queue.limits.size", ctx, &available_envelope_keys)?
.unwrap_or_else(|| IfBlock::new(1024 * 1024 * 1024)),
throttle: self.parse_throttle(
"queue.throttle",
ctx,
&available_envelope_keys,
available_throttle_keys,
)?,
})
}
}
impl From<&Host> for RelayHost {
fn from(host: &Host) -> Self {
RelayHost {
address: host.address.to_string(),
port: host.port,
protocol: host.protocol,
auth: if let (Some(username), Some(secret)) = (&host.username, &host.secret) {
Credentials::new(username.to_string(), secret.to_string()).into()
} else {
None
},
tls_implicit: host.tls_implicit,
tls_allow_invalid_certs: host.tls_allow_invalid_certs,
}
}
}
impl ParseValue for PathBuf {
fn parse_value(_key: impl utils::AsKey, value: &str) -> super::Result<Self> {
Ok(PathBuf::from(value))
}
}

View file

@ -62,6 +62,14 @@ impl Config {
EnvelopeKey::RemoteIp,
EnvelopeKey::LocalIp,
];
let mechanisms = self
.parse_if_block::<Vec<Mechanism>>(
"session.ehlo.capabilities.auth",
ctx,
&available_keys,
)?
.unwrap_or_default();
Ok(Ehlo {
script: self
.parse_if_block::<Option<String>>("session.ehlo.script", ctx, &available_keys)?
@ -70,9 +78,6 @@ impl Config {
require: self
.parse_if_block("session.ehlo.require", ctx, &available_keys)?
.unwrap_or_else(|| IfBlock::new(true)),
multiple: self
.parse_if_block("session.ehlo.multiple", ctx, &available_keys)?
.unwrap_or_else(|| IfBlock::new(true)),
pipelining: self
.parse_if_block("session.ehlo.capabilities.pipelining", ctx, &available_keys)?
.unwrap_or_else(|| IfBlock::new(true)),
@ -109,32 +114,13 @@ impl Config {
size: self
.parse_if_block("session.ehlo.capabilities.size", ctx, &available_keys)?
.unwrap_or_else(|| IfBlock::new(Some(25 * 1024 * 1024))),
})
}
fn parse_session_auth(&self, ctx: &ConfigContext) -> super::Result<Auth> {
let available_keys = [
EnvelopeKey::Listener,
EnvelopeKey::RemoteIp,
EnvelopeKey::LocalIp,
EnvelopeKey::HeloDomain,
];
let mechanisms = self
.parse_if_block::<Vec<Mechanism>>("session.auth.enable", ctx, &available_keys)?
.unwrap_or_default();
Ok(Auth {
script: self
.parse_if_block::<Option<String>>("session.auth.script", ctx, &available_keys)?
.unwrap_or_default()
.map_if_block(&ctx.scripts, "session.auth.script", "script")?,
require: self
.parse_if_block("session.auth.require", ctx, &available_keys)?
.unwrap_or_default(),
lookup: self
.parse_if_block::<Option<String>>("session.auth.lookup", ctx, &available_keys)?
.unwrap_or_default()
.map_if_block(&ctx.lists, "session.auth.lookup", "lookup list")?,
mechanisms: IfBlock {
expn: self
.parse_if_block("session.ehlo.capabilities.expn", ctx, &available_keys)?
.unwrap_or_else(|| IfBlock::new(false)),
vrfy: self
.parse_if_block("session.ehlo.capabilities.vrfy", ctx, &available_keys)?
.unwrap_or_else(|| IfBlock::new(false)),
auth: IfBlock {
if_then: mechanisms
.if_then
.into_iter()
@ -148,6 +134,26 @@ impl Config {
.into_iter()
.fold(0, |acc, m| acc | m.mechanism),
},
})
}
fn parse_session_auth(&self, ctx: &ConfigContext) -> super::Result<Auth> {
let available_keys = [
EnvelopeKey::Listener,
EnvelopeKey::RemoteIp,
EnvelopeKey::LocalIp,
EnvelopeKey::HeloDomain,
];
Ok(Auth {
script: self
.parse_if_block::<Option<String>>("session.auth.script", ctx, &available_keys)?
.unwrap_or_default()
.map_if_block(&ctx.scripts, "session.auth.script", "script")?,
lookup: self
.parse_if_block::<Option<String>>("session.auth.lookup", ctx, &available_keys)?
.unwrap_or_default()
.map_if_block(&ctx.lists, "session.auth.lookup", "lookup list")?,
errors_max: self
.parse_if_block("session.auth.errors.max", ctx, &available_keys)?
.unwrap_or_else(|| IfBlock::new(3)),
@ -203,12 +209,7 @@ impl Config {
relay: self
.parse_if_block("session.rcpt.relay", ctx, &available_keys)?
.unwrap_or_else(|| IfBlock::new(false)),
expn: self
.parse_if_block("session.rcpt.expn", ctx, &available_keys)?
.unwrap_or_else(|| IfBlock::new(false)),
vrfy: self
.parse_if_block("session.rcpt.vrfy", ctx, &available_keys)?
.unwrap_or_else(|| IfBlock::new(false)),
lookup_domains: self
.parse_if_block::<Option<String>>(
"session.rcpt.lookup.domains",
@ -225,6 +226,14 @@ impl Config {
)?
.unwrap_or_default()
.map_if_block(&ctx.lists, "session.rcpt.lookup.addresses", "lookup list")?,
lookup_expn: self
.parse_if_block::<Option<String>>("session.rcpt.lookup.expn", ctx, &available_keys)?
.unwrap_or_default()
.map_if_block(&ctx.lists, "session.rcpt.lookup.expn", "lookup list")?,
lookup_vrfy: self
.parse_if_block::<Option<String>>("session.rcpt.lookup.vrfy", ctx, &available_keys)?
.unwrap_or_default()
.map_if_block(&ctx.lists, "session.rcpt.lookup.vrfy", "lookup list")?,
errors_max: self
.parse_if_block("session.rcpt.errors.max", ctx, &available_keys)?
.unwrap_or_else(|| IfBlock::new(10)),

View file

@ -137,7 +137,6 @@ impl ParseValue for EnvelopeKey {
"sender" => EnvelopeKey::Sender,
"sender-domain" => EnvelopeKey::SenderDomain,
"listener" => EnvelopeKey::Listener,
"mx" => EnvelopeKey::Mx,
"remote-ip" => EnvelopeKey::RemoteIp,
"local-ip" => EnvelopeKey::LocalIp,
"priority" => EnvelopeKey::Priority,
@ -178,7 +177,6 @@ mod tests {
EnvelopeKey::SenderDomain,
EnvelopeKey::AuthenticatedAs,
EnvelopeKey::Listener,
EnvelopeKey::Mx,
EnvelopeKey::RemoteIp,
EnvelopeKey::LocalIp,
EnvelopeKey::Priority,

View file

@ -13,12 +13,16 @@ use smtp_proto::{
},
MtPriority,
};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::{
io::{AsyncRead, AsyncWrite},
sync::mpsc,
};
use tracing::Span;
use crate::{
config::{EnvelopeKey, List, Script, ServerProtocol, SessionConfig},
listener::auth::SaslToken,
queue,
};
use self::throttle::{
@ -33,6 +37,7 @@ pub struct Core {
pub config: SessionConfig,
pub concurrency: ConcurrencyLimiter,
pub throttle: DashMap<ThrottleKey, Limiter, ThrottleKeyHasherBuilder>,
pub queue_tx: mpsc::Sender<queue::Event>,
}
pub enum State {
@ -75,7 +80,9 @@ pub struct SessionData {
pub authenticated_as: String,
pub auth_errors: usize,
pub priority: i16,
pub valid_until: Instant,
pub bytes_left: usize,
pub messages_sent: usize,
}
@ -87,27 +94,30 @@ pub struct SessionAddress {
#[derive(Debug, Default)]
pub struct SessionParameters {
// Global parameters
pub timeout: Duration,
// Ehlo parameters
pub ehlo_script: Option<Arc<Script>>,
pub ehlo_require: bool,
pub ehlo_multiple: bool,
// Supported capabilities
pub pipelining: bool,
pub chunking: bool,
pub requiretls: bool,
pub starttls: bool,
pub expn: bool,
pub vrfy: bool,
pub no_soliciting: Option<String>,
pub future_release: Option<Duration>,
pub deliver_by: Option<Duration>,
pub mt_priority: Option<MtPriority>,
pub size: Option<usize>,
pub auth: u64,
// Auth parameters
pub auth_script: Option<Arc<Script>>,
pub auth_require: bool,
pub auth_lookup: Option<Arc<List>>,
pub auth_mechanisms: u64,
pub auth_errors_max: usize,
pub auth_errors_wait: Duration,
@ -122,6 +132,8 @@ pub struct SessionParameters {
pub rcpt_max: usize,
pub rcpt_lookup_domain: Option<Arc<List>>,
pub rcpt_lookup_addresses: Option<Arc<List>>,
pub rcpt_lookup_expn: Option<Arc<List>>,
pub rcpt_lookup_vrfy: Option<Arc<List>>,
// Data parameters
pub data_script: Option<Arc<Script>>,
@ -153,6 +165,7 @@ impl SessionData {
message: Vec::with_capacity(0),
auth_errors: 0,
messages_sent: 0,
bytes_left: 0,
}
}
}
@ -184,7 +197,6 @@ pub trait Envelope {
EnvelopeKey::Sender => self.sender().into(),
EnvelopeKey::SenderDomain => self.sender_domain().into(),
EnvelopeKey::AuthenticatedAs => self.authenticated_as().into(),
EnvelopeKey::Mx => self.mx().into(),
EnvelopeKey::HeloDomain => self.helo_domain().into(),
EnvelopeKey::Listener => self.listener_id().to_string().into(),
EnvelopeKey::RemoteIp => self.remote_ip().to_string().into(),

View file

@ -3,13 +3,16 @@ use tokio::io::{AsyncRead, AsyncWrite};
use super::{Session, SessionParameters};
impl<T: AsyncRead + AsyncWrite> Session<T> {
pub async fn eval_ehlo_params(&mut self) {
pub async fn eval_session_params(&mut self) {
self.params.timeout = *self.core.config.timeout.eval(self).await;
self.data.bytes_left = *self.core.config.transfer_limit.eval(self).await;
self.data.valid_until += *self.core.config.duration.eval(self).await;
}
// Ehlo parameter
pub async fn eval_ehlo_params(&mut self) {
// Ehlo parameters
self.params.ehlo_script = self.core.config.ehlo.script.eval(self).await.clone();
self.params.ehlo_require = *self.core.config.ehlo.require.eval(self).await;
self.params.ehlo_multiple = *self.core.config.ehlo.multiple.eval(self).await;
// Capabilities
self.params.pipelining = *self.core.config.ehlo.pipelining.eval(self).await;
@ -20,14 +23,15 @@ impl<T: AsyncRead + AsyncWrite> Session<T> {
self.params.deliver_by = *self.core.config.ehlo.deliver_by.eval(self).await;
self.params.mt_priority = *self.core.config.ehlo.mt_priority.eval(self).await;
self.params.size = *self.core.config.ehlo.size.eval(self).await;
self.params.auth = *self.core.config.ehlo.auth.eval(self).await;
self.params.expn = *self.core.config.ehlo.expn.eval(self).await;
self.params.vrfy = *self.core.config.ehlo.vrfy.eval(self).await;
}
pub async fn eval_auth_params(&mut self) {
// Auth parameters
self.params.auth_script = self.core.config.auth.script.eval(self).await.clone();
self.params.auth_require = *self.core.config.auth.require.eval(self).await;
self.params.auth_lookup = self.core.config.auth.lookup.eval(self).await.clone();
self.params.auth_mechanisms = *self.core.config.auth.mechanisms.eval(self).await;
self.params.auth_errors_max = *self.core.config.auth.errors_max.eval(self).await;
self.params.auth_errors_wait = *self.core.config.auth.errors_wait.eval(self).await;
}
@ -42,6 +46,24 @@ impl<T: AsyncRead + AsyncWrite> Session<T> {
self.params.rcpt_errors_max = *self.core.config.rcpt.errors_max.eval(self).await;
self.params.rcpt_errors_wait = *self.core.config.rcpt.errors_wait.eval(self).await;
self.params.rcpt_max = *self.core.config.rcpt.max_recipients.eval(self).await;
self.params.rcpt_lookup_domain = self
.core
.config
.rcpt
.lookup_domains
.eval(self)
.await
.clone();
self.params.rcpt_lookup_addresses = self
.core
.config
.rcpt
.lookup_addresses
.eval(self)
.await
.clone();
self.params.rcpt_lookup_expn = self.core.config.rcpt.lookup_expn.eval(self).await.clone();
self.params.rcpt_lookup_vrfy = self.core.config.rcpt.lookup_vrfy.eval(self).await.clone();
}
pub async fn eval_data_params(&mut self) {

View file

@ -1,5 +1,4 @@
use dashmap::mapref::entry::Entry;
use parking_lot::Mutex;
use tokio::io::{AsyncRead, AsyncWrite};
use std::{
@ -26,7 +25,7 @@ pub struct Limiter {
pub struct RateLimiter {
max_requests: f64,
max_interval: f64,
limiter: Arc<Mutex<(Instant, f64)>>,
limiter: (Instant, f64),
}
#[derive(Debug)]
@ -50,29 +49,28 @@ impl RateLimiter {
RateLimiter {
max_requests: max_requests as f64,
max_interval: max_interval as f64,
limiter: Arc::new(Mutex::new((Instant::now(), max_requests as f64))),
limiter: (Instant::now(), max_requests as f64),
}
}
pub fn is_allowed(&self) -> bool {
pub fn is_allowed(&mut self) -> bool {
// Check rate limit
let mut limiter = self.limiter.lock();
let elapsed = limiter.0.elapsed().as_secs_f64();
limiter.0 = Instant::now();
limiter.1 += elapsed * (self.max_requests / self.max_interval);
if limiter.1 > self.max_requests {
limiter.1 = self.max_requests;
let elapsed = self.limiter.0.elapsed().as_secs_f64();
self.limiter.1 += elapsed * (self.max_requests / self.max_interval);
if self.limiter.1 > self.max_requests {
self.limiter.1 = self.max_requests;
}
if limiter.1 >= 1.0 {
limiter.1 -= 1.0;
if self.limiter.1 >= 1.0 {
self.limiter.0 = Instant::now();
self.limiter.1 -= 1.0;
true
} else {
false
}
}
pub fn reset(&self) {
*self.limiter.lock() = (Instant::now(), self.max_requests);
pub fn reset(&mut self) {
self.limiter = (Instant::now(), self.max_requests);
}
}
@ -208,8 +206,8 @@ impl<T: AsyncRead + AsyncWrite> Session<T> {
if t.conditions.conditions.is_empty() || t.conditions.eval(self).await {
// Build throttle key
match self.core.throttle.entry(ThrottleKey::new(self, t)) {
Entry::Occupied(e) => {
let limiter = e.get();
Entry::Occupied(mut e) => {
let limiter = e.get_mut();
if let Some(limiter) = &limiter.concurrency {
if let Some(inflight) = limiter.is_allowed() {
self.in_flight.push(inflight);
@ -224,7 +222,7 @@ impl<T: AsyncRead + AsyncWrite> Session<T> {
return false;
}
}
if let Some(limiter) = &limiter.rate {
if let Some(limiter) = &mut limiter.rate {
if !limiter.is_allowed() {
tracing::info!(
parent: &self.span,

View file

@ -1,4 +1,5 @@
pub mod config;
pub mod core;
pub mod listener;
pub mod queue;
pub mod remote;

View file

@ -7,74 +7,72 @@ use crate::core::Session;
impl<T: AsyncWrite + AsyncRead + Unpin> Session<T> {
pub async fn handle_ehlo(&mut self, domain: String) -> Result<(), ()> {
if self.data.helo_domain.is_empty() || self.params.ehlo_multiple {
// Set EHLO domain
// Set EHLO domain
if domain != self.data.helo_domain {
self.data.helo_domain = domain;
// Eval mail parameters
self.eval_mail_params().await;
let mut response = EhloResponse::new(self.instance.hostname.as_str());
response.capabilities =
EXT_ENHANCED_STATUS_CODES | EXT_8BIT_MIME | EXT_BINARY_MIME | EXT_SMTP_UTF8;
if self.params.starttls {
response.capabilities |= EXT_START_TLS;
}
if self.params.pipelining {
response.capabilities |= EXT_PIPELINING;
}
if self.params.chunking {
response.capabilities |= EXT_CHUNKING;
}
if self.can_expn().await {
response.capabilities |= EXT_EXPN;
}
if self.can_vrfy().await {
response.capabilities |= EXT_VRFY;
}
if self.params.requiretls {
response.capabilities |= EXT_REQUIRE_TLS;
}
if self.params.auth_mechanisms != 0 {
response.capabilities |= EXT_AUTH;
response.auth_mechanisms = self.params.auth_mechanisms;
}
if let Some(value) = &self.params.future_release {
response.capabilities |= EXT_FUTURE_RELEASE;
response.future_release_interval = value.as_secs();
response.future_release_datetime = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0)
+ value.as_secs();
}
if let Some(value) = &self.params.deliver_by {
response.capabilities |= EXT_DELIVER_BY;
response.deliver_by = value.as_secs();
}
if let Some(value) = &self.params.mt_priority {
response.capabilities |= EXT_MT_PRIORITY;
response.mt_priority = *value;
}
if let Some(value) = &self.params.size {
response.capabilities |= EXT_SIZE;
response.size = *value;
}
if let Some(value) = &self.params.no_soliciting {
response.capabilities |= EXT_NO_SOLICITING;
response.no_soliciting = if !value.is_empty() {
value.to_string().into()
} else {
None
};
}
// Generate response
let mut buf = Vec::with_capacity(64);
response.write(&mut buf).ok();
self.write(&buf).await
} else {
self.write(b"503 5.5.1 Already said hello.\r\n").await
}
let mut response = EhloResponse::new(self.instance.hostname.as_str());
response.capabilities =
EXT_ENHANCED_STATUS_CODES | EXT_8BIT_MIME | EXT_BINARY_MIME | EXT_SMTP_UTF8;
if self.params.starttls {
response.capabilities |= EXT_START_TLS;
}
if self.params.pipelining {
response.capabilities |= EXT_PIPELINING;
}
if self.params.chunking {
response.capabilities |= EXT_CHUNKING;
}
if self.params.expn {
response.capabilities |= EXT_EXPN;
}
if self.params.vrfy {
response.capabilities |= EXT_VRFY;
}
if self.params.requiretls {
response.capabilities |= EXT_REQUIRE_TLS;
}
if self.params.auth != 0 {
response.capabilities |= EXT_AUTH;
response.auth_mechanisms = self.params.auth;
}
if let Some(value) = &self.params.future_release {
response.capabilities |= EXT_FUTURE_RELEASE;
response.future_release_interval = value.as_secs();
response.future_release_datetime = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0)
+ value.as_secs();
}
if let Some(value) = &self.params.deliver_by {
response.capabilities |= EXT_DELIVER_BY;
response.deliver_by = value.as_secs();
}
if let Some(value) = &self.params.mt_priority {
response.capabilities |= EXT_MT_PRIORITY;
response.mt_priority = *value;
}
if let Some(value) = &self.params.size {
response.capabilities |= EXT_SIZE;
response.size = *value;
}
if let Some(value) = &self.params.no_soliciting {
response.capabilities |= EXT_NO_SOLICITING;
response.no_soliciting = if !value.is_empty() {
value.to_string().into()
} else {
None
};
}
// Generate response
let mut buf = Vec::with_capacity(64);
response.write(&mut buf).ok();
self.write(&buf).await
}
}

View file

@ -1,14 +1,10 @@
use smtp_proto::Parameter;
use smtp_proto::MailFrom;
use tokio::io::{AsyncRead, AsyncWrite};
use crate::core::{Session, SessionAddress};
impl<T: AsyncWrite + AsyncRead + Unpin> Session<T> {
pub async fn handle_mail_from(
&mut self,
from: String,
parameters: Vec<Parameter<String>>,
) -> Result<(), ()> {
pub async fn handle_mail_from(&mut self, from: MailFrom<String>) -> Result<(), ()> {
if self.data.helo_domain.is_empty() && self.params.ehlo_require {
return self
.write(b"503 5.5.1 Polite people say EHLO first.\r\n")
@ -17,14 +13,12 @@ impl<T: AsyncWrite + AsyncRead + Unpin> Session<T> {
return self
.write(b"503 5.5.1 Multiple MAIL commands not allowed.\r\n")
.await;
} else if self.params.auth_require && self.data.authenticated_as.is_empty() {
return self.write(b"530 5.7.0 Authentication required.\r\n").await;
}
self.data.mail_from = if !from.is_empty() {
let address_lcase = from.to_lowercase();
self.data.mail_from = if !from.address.is_empty() {
let address_lcase = from.address.to_lowercase();
SessionAddress {
address: from,
address: from.address,
domain: address_lcase
.rsplit_once('@')
.map(|(_, d)| d)

View file

@ -1,14 +1,10 @@
use smtp_proto::Parameter;
use smtp_proto::RcptTo;
use tokio::io::{AsyncRead, AsyncWrite};
use crate::core::{Session, SessionAddress};
impl<T: AsyncWrite + AsyncRead + Unpin> Session<T> {
pub async fn handle_rcpt_to(
&mut self,
to: String,
parameters: Vec<Parameter<String>>,
) -> Result<(), ()> {
pub async fn handle_rcpt_to(&mut self, to: RcptTo<String>) -> Result<(), ()> {
if self.data.mail_from.is_none() {
return self.write(b"503 5.5.1 MAIL is required first.\r\n").await;
} else if self.data.rcpt_to.len() >= self.params.rcpt_max {
@ -16,7 +12,7 @@ impl<T: AsyncWrite + AsyncRead + Unpin> Session<T> {
}
// Build RCPT
let address_lcase = to.to_lowercase();
let address_lcase = to.address.to_lowercase();
let rcpt = SessionAddress {
domain: address_lcase
.rsplit_once('@')
@ -24,7 +20,7 @@ impl<T: AsyncWrite + AsyncRead + Unpin> Session<T> {
.unwrap_or_default()
.to_string(),
address_lcase,
address: to,
address: to.address,
};
// Verify address
@ -63,7 +59,6 @@ impl<T: AsyncWrite + AsyncRead + Unpin> Session<T> {
.is_allowed(&self.core.clone().config.rcpt.throttle)
.await
{
self.eval_data_params().await;
self.write(b"250 2.1.5 OK\r\n").await
} else {
self.data.rcpt_to.pop();

View file

@ -26,11 +26,11 @@ impl<T: AsyncWrite + AsyncRead + Unpin> Session<T> {
State::Request(receiver) => loop {
match receiver.ingest(&mut iter, bytes) {
Ok(request) => match request {
Request::Rcpt { to, parameters } => {
self.handle_rcpt_to(to, parameters).await?;
Request::Rcpt { to } => {
self.handle_rcpt_to(to).await?;
}
Request::Mail { from, parameters } => {
self.handle_mail_from(from, parameters).await?;
Request::Mail { from } => {
self.handle_mail_from(from).await?;
}
Request::Ehlo { host } => {
if self.instance.protocol == ServerProtocol::Smtp {
@ -40,6 +40,7 @@ impl<T: AsyncWrite + AsyncRead + Unpin> Session<T> {
}
}
Request::Data => {
self.eval_data_params().await;
if self.can_send_data().await? {
self.write(b"354 Start mail input; end with <CRLF>.<CRLF>\r\n")
.await?;
@ -52,6 +53,7 @@ impl<T: AsyncWrite + AsyncRead + Unpin> Session<T> {
chunk_size,
is_last,
} => {
self.eval_data_params().await;
state = if chunk_size + self.data.message.len()
< self.params.data_max_message_size
{
@ -71,25 +73,24 @@ impl<T: AsyncWrite + AsyncRead + Unpin> Session<T> {
mechanism,
initial_response,
} => {
if let Some(mut token) = SaslToken::from_mechanism(
mechanism & self.params.auth_mechanisms,
) {
if self.data.authenticated_as.is_empty() {
if self
.handle_sasl_response(
&mut token,
initial_response.as_bytes(),
)
.await?
{
state = State::Sasl(LineReceiver::new(token));
continue 'outer;
}
} else {
self.write(b"503 5.5.1 Already authenticated.\r\n").await?;
}
} else if self.params.auth_mechanisms == 0 {
// TODO no plain auth over plaintext
if self.params.auth == 0 || self.params.auth_lookup.is_none() {
self.write(b"503 5.5.1 AUTH not allowed.\r\n").await?;
} else if !self.data.authenticated_as.is_empty() {
self.write(b"503 5.5.1 Already authenticated.\r\n").await?;
} else if let Some(mut token) =
SaslToken::from_mechanism(mechanism & self.params.auth)
{
if self
.handle_sasl_response(
&mut token,
initial_response.as_bytes(),
)
.await?
{
state = State::Sasl(LineReceiver::new(token));
continue 'outer;
}
} else {
self.write(
b"554 5.7.8 Authentication mechanism not supported.\r\n",
@ -132,7 +133,6 @@ impl<T: AsyncWrite + AsyncRead + Unpin> Session<T> {
Request::Helo { host } => {
if self.instance.protocol == ServerProtocol::Smtp
&& self.data.helo_domain.is_empty()
|| self.params.ehlo_multiple
{
self.data.helo_domain = host;
self.eval_mail_params().await;

View file

@ -114,6 +114,7 @@ impl Server {
if tls_implicit {
if let Ok(mut session) = session.into_tls(tls_acceptor.unwrap()).await {
if session.write(&instance.greeting).await.is_ok() {
session.eval_session_params().await;
session.eval_ehlo_params().await;
session.eval_auth_params().await;
if !session.params.ehlo_require {
@ -123,6 +124,7 @@ impl Server {
}
}
} else if session.write(&instance.greeting).await.is_ok() {
session.eval_session_params().await;
session.eval_ehlo_params().await;
session.eval_auth_params().await;
if !session.params.ehlo_require {
@ -213,20 +215,17 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Session<T> {
mut shutdown_rx: watch::Receiver<bool>,
) -> Option<(Session<T>, watch::Receiver<bool>)> {
let mut buf = vec![0; 8192];
let timeout = *self.core.config.timeout.eval(&self).await;
let max_bytes = *self.core.config.transfer_limit.eval(&self).await;
let mut bytes_received = 0;
loop {
tokio::select! {
result = tokio::time::timeout(
timeout,
self.params.timeout,
self.read(&mut buf)) => {
match result {
Ok(Ok(bytes_read)) => {
if bytes_read > 0 {
bytes_received += bytes_read;
if Instant::now() < self.data.valid_until && bytes_received < max_bytes {
if Instant::now() < self.data.valid_until && bytes_read <= self.data.bytes_left {
self.data.bytes_left -= bytes_read;
match self.ingest(&buf[..bytes_read]).await {
Ok(true) => (),
Ok(false) => {
@ -236,7 +235,7 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Session<T> {
break;
}
}
} else if bytes_received >= max_bytes {
} else if bytes_read > self.data.bytes_left {
self
.write(format!("451 4.7.28 {} Session exceeded transfer quota.\r\n", self.instance.hostname).as_bytes())
.await

View file

@ -8,16 +8,12 @@ use std::fmt::Write;
impl<T: AsyncWrite + AsyncRead + Unpin> Session<T> {
pub async fn handle_vrfy(&mut self, address: String) -> Result<(), ()> {
if !self.can_vrfy().await {
return self.write(b"252 2.5.1 VRFY is disabled.\r\n").await;
}
if let Some(address_lookup) = &self.params.rcpt_lookup_addresses {
if let Some(address_lookup) = &self.params.rcpt_lookup_vrfy {
if let Some(result) = address_lookup
.lookup(Item::Verify(address.to_lowercase()))
.await
{
return if let LookupResult::Values(values) = result {
if let LookupResult::Values(values) = result {
let mut result = String::with_capacity(32);
for (pos, value) in values.iter().enumerate() {
let _ = write!(
@ -30,24 +26,23 @@ impl<T: AsyncWrite + AsyncRead + Unpin> Session<T> {
self.write(result.as_bytes()).await
} else {
self.write(b"550 5.1.2 Address not found.\r\n").await
};
}
} else {
self.write(b"252 2.4.3 Unable to verify address at this time.\r\n")
.await
}
} else {
self.write(b"252 2.5.1 VRFY is disabled.\r\n").await
}
self.write(b"252 2.4.3 Unable to verify address at this time.\r\n")
.await
}
pub async fn handle_expn(&mut self, address: String) -> Result<(), ()> {
if !self.can_vrfy().await {
return self.write(b"252 2.5.1 EXPN is disabled.\r\n").await;
}
if let Some(address_lookup) = &self.params.rcpt_lookup_addresses {
if let Some(address_lookup) = &self.params.rcpt_lookup_expn {
if let Some(result) = address_lookup
.lookup(Item::Expand(address.to_lowercase()))
.await
{
return if let LookupResult::Values(values) = result {
if let LookupResult::Values(values) = result {
let mut result = String::with_capacity(32);
for (pos, value) in values.iter().enumerate() {
let _ = write!(
@ -60,20 +55,13 @@ impl<T: AsyncWrite + AsyncRead + Unpin> Session<T> {
self.write(result.as_bytes()).await
} else {
self.write(b"550 5.1.2 Mailing list not found.\r\n").await
};
}
} else {
self.write(b"252 2.4.3 Unable to expand mailing list at this time.\r\n")
.await
}
} else {
self.write(b"252 2.5.1 EXPN is disabled.\r\n").await
}
self.write(b"252 2.4.3 Unable to expand mailing list at this time.\r\n")
.await
}
#[inline(always)]
pub async fn can_expn(&self) -> bool {
*self.core.config.rcpt.vrfy.eval(self).await
}
#[inline(always)]
pub async fn can_vrfy(&self) -> bool {
*self.core.config.rcpt.expn.eval(self).await
}
}

View file

@ -7,8 +7,9 @@ use smtp_server::{
throttle::{ConcurrencyLimiter, ThrottleKeyHasherBuilder},
Core,
},
queue,
};
use tokio::sync::watch;
use tokio::sync::{mpsc, watch};
#[tokio::main]
async fn main() -> std::io::Result<()> {
@ -27,6 +28,12 @@ async fn main() -> std::io::Result<()> {
let session_config = config
.parse_session_config(&config_context)
.failed("Configuration error");
let queue = config
.parse_queue(&config_context)
.failed("Configuration error");
// Build core
let (queue_tx, queue_rx) = mpsc::channel(1024);
let core = Arc::new(Core {
config: session_config,
concurrency: ConcurrencyLimiter::new(
@ -46,6 +53,7 @@ async fn main() -> std::io::Result<()> {
.failed("Failed to parse throttle map shard amount")
.unwrap_or(32),
),
queue_tx,
});
// Enable logging
@ -60,12 +68,17 @@ async fn main() -> std::io::Result<()> {
.finish(),
)
.failed("Failed to set subscriber");
// Spawn listeners
tracing::info!(
"Starting Stalwart SMTP server v{}...",
env!("CARGO_PKG_VERSION")
);
// Spawn queue manager
queue
.spawn(queue_rx)
.failed("Failed to spawn queue manager");
// Spawn listeners
let (shutdown_tx, shutdown_rx) = watch::channel(false);
for server in config_context.servers {
server
@ -105,6 +118,7 @@ async fn main() -> std::io::Result<()> {
// Stop services
shutdown_tx.send(true).ok();
core.queue_tx.send(queue::Event::Stop).await.ok();
// Wait for services to finish
tokio::time::sleep(Duration::from_secs(1)).await;

34
src/queue/manager.rs Normal file
View file

@ -0,0 +1,34 @@
use std::{
collections::{BinaryHeap, VecDeque},
sync::Arc,
};
use ahash::{AHashMap, HashMap};
use dashmap::DashMap;
use tokio::sync::{mpsc, watch};
use crate::{
config::Queue,
core::{
throttle::{RateLimiter, ThrottleKey, ThrottleKeyHasherBuilder},
Core,
},
};
use super::{Event, Message, QueueItem};
struct ThrottleQueue {
pub limiter: RateLimiter,
pub concurrent: usize,
pub queue: VecDeque<Box<Message>>,
}
impl Queue {
pub fn spawn(self, queue_rx: mpsc::Receiver<Event>) -> Result<(), String> {
let mut queue: BinaryHeap<QueueItem> = BinaryHeap::new();
let mut throttle: DashMap<ThrottleKey, ThrottleQueue, ThrottleKeyHasherBuilder> =
DashMap::with_capacity_and_hasher(100, ThrottleKeyHasherBuilder::default());
todo!()
}
}

21
src/queue/message.rs Normal file
View file

@ -0,0 +1,21 @@
use super::QueueItem;
impl Ord for QueueItem {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
other.due.cmp(&self.due)
}
}
impl PartialOrd for QueueItem {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
other.due.partial_cmp(&self.due)
}
}
impl PartialEq for QueueItem {
fn eq(&self, other: &Self) -> bool {
self.due == other.due
}
}
impl Eq for QueueItem {}

48
src/queue/mod.rs Normal file
View file

@ -0,0 +1,48 @@
use std::time::Instant;
pub mod manager;
pub mod message;
pub enum Event {
Start,
Stop,
}
pub struct QueueItem {
pub due: u64,
pub message: Box<Message>,
}
pub struct Message {
pub id: u64,
pub created: u64,
pub return_path: String,
pub recipients: Vec<Recipient>,
pub flags: u64,
pub priority: i64,
pub size: usize,
pub notify: Action,
}
pub struct Recipient {
pub address: String,
pub domain: String,
pub status: Status,
pub flags: u64,
pub retry: Action,
}
pub struct Action {
pub due_at: Instant,
pub count: u32,
}
pub enum Status {
None,
Delivered,
TemporaryFailure,
PermanentFailure,
}

View file

@ -84,7 +84,7 @@ pub async fn lookup_smtp(
.into(),
true,
),
550 | 551 | 500 | 502 => (LookupResult::False, true),
550 | 551 | 553 | 500 | 502 => (LookupResult::False, true),
_ => {
return Err(mail_send::Error::UnexpectedReply(reply));
}