Remote lookup implementation.

This commit is contained in:
Mauro D 2022-12-22 17:52:55 +00:00
parent 6da860cc21
commit 2ef8408750
12 changed files with 599 additions and 12 deletions

5
Cargo.lock generated
View file

@ -749,9 +749,9 @@ dependencies = [
[[package]]
name = "libc"
version = "0.2.138"
version = "0.2.139"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "db6d7e329c562c5dfab7a46a2afabc8b987ab9a4834c9d1ca04dc54c1546cef8"
checksum = "201de327520df007757c1f0adce6e827fe8562fbc28bfd9c15571c66ca1f5f79"
[[package]]
name = "libm"
@ -1501,6 +1501,7 @@ dependencies = [
"blake3",
"criterion",
"dashmap",
"lru-cache",
"mail-auth",
"mail-send",
"parking_lot",

View file

@ -20,6 +20,7 @@ parking_lot = "0.12"
regex = "1.7.0"
dashmap = "5.4"
blake3 = "1.3"
lru-cache = "0.1.2"
[dev-dependencies]
criterion = "0.4.0"

View file

@ -97,7 +97,13 @@ rate = "10/1m"
[session.rcpt]
#script = rcpt-to.sieve
relay = false
relay = [
{if = { any-of = [
all-of = { [{key = "rcpt-domain", in-list="local-domains"}, {key = "rcpt", in-list="local-users"}] },
all-of = { [{key = "rcpt-domain", not-in-list="local-domains"}, {key = "authenticated-as", ne = ""}] },
] }, then = true},
{else = false}
]
errors = {total = 3, wait = "5s"}
max-recipients = 100
@ -168,7 +174,7 @@ protocol = "lmtp"
[remote."lmtp".auth]
username = "hello"
password = "world"
secret = "world"
[remote."lmtp".cache]
entries = 1000

View file

@ -44,7 +44,20 @@ pub struct Listener {
}
#[derive(Debug, Default, PartialEq, Eq)]
pub struct Host {}
pub struct Host {
pub address: String,
pub port: u16,
pub protocol: ServerProtocol,
pub concurrency: usize,
pub timeout: Duration,
pub tls_implicit: bool,
pub tls_allow_invalid_certs: bool,
pub username: Option<String>,
pub secret: Option<String>,
pub cache_entries: usize,
pub cache_ttl_positive: Duration,
pub cache_ttl_negative: Duration,
}
#[derive(Debug, Default)]
pub struct Script {}
@ -69,6 +82,7 @@ pub enum ServerProtocol {
#[default]
Smtp,
Lmtp,
Imap,
}
#[derive(Debug, Clone, PartialEq, Eq)]

View file

@ -13,6 +13,19 @@ impl Config {
}
fn parse_host(&self, _id: &str, _ctx: &ConfigContext) -> super::Result<Host> {
Ok(Host {})
Ok(Host {
address: todo!(),
port: todo!(),
protocol: todo!(),
concurrency: todo!(),
tls_implicit: todo!(),
username: todo!(),
secret: todo!(),
cache_entries: todo!(),
cache_ttl_positive: todo!(),
cache_ttl_negative: todo!(),
tls_allow_invalid_certs: todo!(),
timeout: todo!(),
})
}
}

View file

@ -275,6 +275,8 @@ impl ParseValue for ServerProtocol {
Ok(Self::Smtp)
} else if value.eq_ignore_ascii_case("lmtp") {
Ok(Self::Lmtp)
} else if value.eq_ignore_ascii_case("imap") {
Ok(Self::Imap)
} else {
Err(format!(
"Invalid server protocol type {:?} for property {:?}.",

View file

@ -88,10 +88,8 @@ impl<T: AsyncWrite + AsyncRead + Unpin> Session<T> {
self.write(b"451 4.4.5 Rate limit exceeded, try again later.\r\n").await?;
}
} else {
self.write(
b"503 5.7.1 Authentication is required.\r\n",
)
.await?;
self.write(b"530 5.7.0 Authentication required.\r\n")
.await?;
}
} else {
self.write(

45
src/remote/cache.rs Normal file
View file

@ -0,0 +1,45 @@
use std::{
borrow::Borrow,
hash::Hash,
time::{Duration, Instant},
};
pub struct LookupCache<K: Hash + Eq> {
cache: lru_cache::LruCache<K, Instant, ahash::RandomState>,
ttl: Duration,
}
impl<K: Hash + Eq> LookupCache<K> {
pub fn new(capacity: usize, ttl: Duration) -> Self {
Self {
cache: lru_cache::LruCache::with_hasher(capacity, ahash::RandomState::new()),
ttl,
}
}
pub fn get<Q: ?Sized>(&mut self, name: &Q) -> bool
where
K: Borrow<Q>,
Q: Hash + Eq,
{
match self.cache.get_mut(name) {
Some(valid_until) => {
if *valid_until >= Instant::now() {
true
} else {
self.cache.remove(name);
false
}
}
None => false,
}
}
pub fn insert(&mut self, key: K) {
self.cache.insert(key, Instant::now() + self.ttl);
}
pub fn clear(&mut self) {
self.cache.clear();
}
}

View file

@ -1,33 +1,188 @@
use std::time::Duration;
use std::{fmt::Display, sync::Arc, time::Duration};
use mail_send::Credentials;
use rustls::ServerName;
use smtp_proto::{
request::{parser::Rfc5321Parser, AUTH},
response::generate::BitToString,
IntoString, AUTH_OAUTHBEARER, AUTH_PLAIN, AUTH_XOAUTH2,
IntoString, AUTH_CRAM_MD5, AUTH_LOGIN, AUTH_OAUTHBEARER, AUTH_PLAIN, AUTH_XOAUTH2,
};
use tokio::{
io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt},
net::{TcpStream, ToSocketAddrs},
sync::mpsc,
};
use tokio_rustls::{client::TlsStream, TlsConnector};
use crate::remote::lookup::LoggedUnwrap;
use super::lookup::{Event, Item, Lookup, RemoteLookup};
pub struct ImapAuthClient<T: AsyncRead + AsyncWrite> {
stream: T,
timeout: Duration,
}
pub struct ImapAuthClientBuilder {
pub addr: String,
timeout: Duration,
tls_connector: TlsConnector,
tls_hostname: String,
tls_implicit: bool,
mechanisms: u64,
}
impl ImapAuthClientBuilder {
pub fn new(
addr: String,
timeout: Duration,
tls_connector: TlsConnector,
tls_hostname: String,
tls_implicit: bool,
) -> Self {
Self {
addr,
timeout,
tls_connector,
tls_hostname,
tls_implicit,
mechanisms: AUTH_PLAIN,
}
}
pub async fn init(mut self) -> Self {
let err = match self.connect().await {
Ok(mut client) => match client.authentication_mechanisms().await {
Ok(mechanisms) => {
client.logout().await.ok();
self.mechanisms = mechanisms;
return self;
}
Err(err) => err,
},
Err(err) => err,
};
tracing::warn!(
event = "error",
class = "remote",
remote.addr = &self.addr,
remote.protocol = "imap",
"Could not obtain auth mechanisms: {}",
err
);
self
}
pub async fn connect(&self) -> Result<ImapAuthClient<TlsStream<TcpStream>>, Error> {
ImapAuthClient::connect(
&self.addr,
self.timeout,
&self.tls_connector,
&self.tls_hostname,
self.tls_implicit,
)
.await
}
}
#[derive(Debug)]
pub enum Error {
Io(std::io::Error),
Timeout,
InvalidResponse(String),
InvalidChallenge(String),
AuthenticationFailed,
TLSInvalidName,
Disconnected,
}
impl RemoteLookup for Arc<ImapAuthClientBuilder> {
fn spawn_lookup(&self, lookup: Lookup, tx: mpsc::Sender<Event>) {
let builder = self.clone();
tokio::spawn(async move {
if let Err(err) = builder.lookup(lookup, &tx).await {
tracing::warn!(
event = "error",
class = "remote",
remote.addr = &builder.addr,
remote.protocol = "imap",
"Remote lookup failed: {}",
err
);
tx.send(Event::WorkerFailed).await.logged_unwrap();
}
});
}
}
impl ImapAuthClientBuilder {
pub async fn lookup(&self, lookup: Lookup, tx: &mpsc::Sender<Event>) -> Result<(), Error> {
match &lookup.item {
Item::Credentials(credentials) => {
let mut client = self.connect().await?;
let mechanism = match credentials {
Credentials::Plain { .. }
if (self.mechanisms & (AUTH_PLAIN | AUTH_LOGIN | AUTH_CRAM_MD5)) != 0 =>
{
if self.mechanisms & AUTH_CRAM_MD5 != 0 {
AUTH_CRAM_MD5
} else if self.mechanisms & AUTH_PLAIN != 0 {
AUTH_PLAIN
} else {
AUTH_LOGIN
}
}
Credentials::OAuthBearer { .. } if self.mechanisms & AUTH_OAUTHBEARER != 0 => {
AUTH_OAUTHBEARER
}
Credentials::XOauth2 { .. } if self.mechanisms & AUTH_XOAUTH2 != 0 => {
AUTH_XOAUTH2
}
_ => {
tracing::warn!(
event = "error",
class = "remote",
remote.addr = &self.addr,
remote.protocol = "imap",
"IMAP does not offer any supported auth mechanisms.",
);
tx.send(Event::WorkerFailed).await.logged_unwrap();
return Ok(());
}
};
let result = match client.authenticate(mechanism, credentials).await {
Ok(_) => true,
Err(err) => match &err {
Error::AuthenticationFailed => false,
_ => return Err(err),
},
};
lookup.result.send(result).logged_unwrap();
tx.send(Event::WorkerReady {
item: lookup.item,
result,
next_lookup: None,
})
.await
.logged_unwrap();
}
Item::Entry(_) => {
tracing::warn!(
event = "error",
class = "remote",
remote.addr = &self.addr,
remote.protocol = "imap",
"IMAP does not support validating recipients.",
);
tx.send(Event::WorkerFailed).await.logged_unwrap();
}
}
Ok(())
}
}
impl ImapAuthClient<TcpStream> {
async fn start_tls(
mut self,
@ -143,6 +298,8 @@ impl<T: AsyncRead + AsyncWrite + Unpin> ImapAuthClient<T> {
line = self.read_line().await?;
} else if matches!(line.get(..5), Some(b"C3 OK")) {
return Ok(());
} else if matches!(line.get(..5), Some(b"C3 BAD")) {
return Err(Error::AuthenticationFailed);
} else {
return Err(Error::InvalidResponse(line.into_string()));
}
@ -256,6 +413,20 @@ impl From<std::io::Error> for Error {
}
}
impl Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Error::Io(io) => write!(f, "I/O error: {}", io),
Error::Timeout => f.write_str("Connection time-out"),
Error::InvalidResponse(response) => write!(f, "Unexpected response: {:?}", response),
Error::InvalidChallenge(response) => write!(f, "Invalid auth challenge: {}", response),
Error::TLSInvalidName => f.write_str("Invalid TLS name"),
Error::Disconnected => f.write_str("Connection disconnected by peer"),
Error::AuthenticationFailed => f.write_str("Authentication failed"),
}
}
}
#[cfg(test)]
mod test {
use crate::remote::imap::ImapAuthClient;

215
src/remote/lookup.rs Normal file
View file

@ -0,0 +1,215 @@
use std::{collections::VecDeque, fmt::Debug, sync::Arc, time::Duration};
use crate::config::{Config, Host, ServerProtocol};
use mail_send::{smtp::tls::build_tls_connector, Credentials, SmtpClientBuilder};
use tokio::sync::{mpsc, oneshot};
use super::{cache::LookupCache, imap::ImapAuthClientBuilder};
#[derive(Debug)]
pub enum Event {
Lookup(Lookup),
WorkerReady {
item: Item,
result: bool,
next_lookup: Option<oneshot::Sender<Option<Lookup>>>,
},
WorkerFailed,
Reload,
Stop,
}
#[derive(Clone, PartialEq, Eq, Hash)]
pub enum Item {
Entry(String),
Credentials(Credentials<String>),
}
#[derive(Debug)]
pub struct Lookup {
pub item: Item,
pub result: oneshot::Sender<bool>,
}
#[derive(Clone)]
struct RemoteHost<T: RemoteLookup> {
tx: mpsc::Sender<Event>,
host: T,
}
pub trait RemoteLookup: Clone {
fn spawn_lookup(&self, lookup: Lookup, tx: mpsc::Sender<Event>);
}
impl Host {
pub fn spawn(self, config: &Config) -> mpsc::Sender<Event> {
// Create channel
let (tx, rx) = mpsc::channel(1024);
let local_host = config
.value("server.hostname")
.unwrap_or("[127.0.0.1]")
.to_string();
let tx_ = tx.clone();
tokio::spawn(async move {
// Prepare builders
match self.protocol {
ServerProtocol::Smtp | ServerProtocol::Lmtp => {
RemoteHost {
tx,
host: Arc::new(SmtpClientBuilder {
addr: format!("{}:{}", self.address, self.port),
timeout: self.timeout,
tls_connector: build_tls_connector(self.tls_allow_invalid_certs),
tls_hostname: self.address,
tls_implicit: self.tls_implicit,
is_lmtp: matches!(self.protocol, ServerProtocol::Lmtp),
local_host,
}),
}
.run(
rx,
self.cache_entries,
self.cache_ttl_positive,
self.cache_ttl_negative,
self.concurrency,
)
.await;
}
ServerProtocol::Imap => {
RemoteHost {
tx,
host: Arc::new(
ImapAuthClientBuilder::new(
format!("{}:{}", self.address, self.port),
self.timeout,
build_tls_connector(self.tls_allow_invalid_certs),
self.address,
self.tls_implicit,
)
.init()
.await,
),
}
.run(
rx,
self.cache_entries,
self.cache_ttl_positive,
self.cache_ttl_negative,
self.concurrency,
)
.await;
}
}
});
tx_
}
}
impl<T: RemoteLookup> RemoteHost<T> {
pub async fn run(
&self,
mut rx: mpsc::Receiver<Event>,
entries: usize,
ttl_pos: Duration,
ttl_neg: Duration,
max_concurrent: usize,
) {
// Create caches and queue
let mut cache_pos = LookupCache::<Item>::new(entries, ttl_pos);
let mut cache_neg = LookupCache::<Item>::new(entries, ttl_neg);
let mut queue = VecDeque::new();
let mut active_lookups = 0;
while let Some(event) = rx.recv().await {
match event {
Event::Lookup(lookup) => {
if cache_pos.get(&lookup.item) {
lookup.result.send(true).logged_unwrap();
} else if cache_neg.get(&lookup.item) {
lookup.result.send(false).logged_unwrap();
} else if active_lookups < max_concurrent {
active_lookups += 1;
self.host.spawn_lookup(lookup, self.tx.clone());
} else {
queue.push_back(lookup);
}
}
Event::WorkerReady {
item,
result,
next_lookup,
} => {
if result {
cache_pos.insert(item);
} else {
cache_neg.insert(item);
}
let mut lookup = None;
while let Some(queued_lookup) = queue.pop_front() {
if cache_pos.get(&queued_lookup.item) {
queued_lookup.result.send(true).logged_unwrap();
} else if cache_neg.get(&queued_lookup.item) {
queued_lookup.result.send(false).logged_unwrap();
} else {
lookup = queued_lookup.into();
break;
}
}
if let Some(next_lookup) = next_lookup {
if lookup.is_none() {
active_lookups -= 1;
}
next_lookup.send(lookup).logged_unwrap();
} else if let Some(lookup) = lookup {
self.host.spawn_lookup(lookup, self.tx.clone());
} else {
active_lookups -= 1;
}
}
Event::WorkerFailed => {
if let Some(queued_lookup) = queue.pop_front() {
self.host.spawn_lookup(queued_lookup, self.tx.clone());
} else {
active_lookups -= 1;
}
}
Event::Stop => {
queue.clear();
break;
}
Event::Reload => {
cache_pos.clear();
cache_neg.clear();
}
}
}
}
}
pub trait LoggedUnwrap {
fn logged_unwrap(self) -> bool;
}
impl<T, E: std::fmt::Debug> LoggedUnwrap for Result<T, E> {
fn logged_unwrap(self) -> bool {
match self {
Ok(_) => true,
Err(err) => {
tracing::debug!("Failed to send message over channel: {:?}", err);
false
}
}
}
}
impl Debug for Item {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Entry(arg0) => f.debug_tuple("Entry").field(arg0).finish(),
Self::Credentials(_) => f.debug_tuple("Credentials").finish(),
}
}
}

View file

@ -1 +1,4 @@
pub mod cache;
pub mod imap;
pub mod lookup;
pub mod smtp;

118
src/remote/smtp.rs Normal file
View file

@ -0,0 +1,118 @@
use std::sync::Arc;
use mail_send::{smtp::AssertReply, SmtpClientBuilder};
use smtp_proto::Severity;
use tokio::sync::{mpsc, oneshot};
use super::lookup::{Event, Item, LoggedUnwrap, Lookup, RemoteLookup};
const MAX_RCPTS_PER_SESSION: usize = 50;
const MAX_AUTH_FAILURES_PER_SESSION: usize = 3;
// TODO use async traits when stabilized
pub async fn lookup_smtp(
builder: &SmtpClientBuilder<String>,
mut lookup: Lookup,
tx: &mpsc::Sender<Event>,
) -> Result<(), mail_send::Error> {
let mut client = builder.connect().await?;
let mut sent_mail_from = false;
let mut num_rcpts = 0;
let mut num_auth_failures = 0;
loop {
let (result, is_reusable) = match &lookup.item {
Item::Entry(rcpt_to) => {
if !sent_mail_from {
client
.cmd(b"MAIL FROM:<>\r\n")
.await?
.assert_positive_completion()?;
sent_mail_from = true;
}
let reply = client
.cmd(format!("RCPT TO:<{}>\r\n", rcpt_to).as_bytes())
.await?;
let result = match reply.severity() {
Severity::PositiveCompletion => {
num_rcpts += 1;
true
}
Severity::PermanentNegativeCompletion => false,
_ => return Err(mail_send::Error::UnexpectedReply(reply)),
};
// Try to reuse the connection with any queued requests
(result, num_rcpts < MAX_RCPTS_PER_SESSION)
}
Item::Credentials(credentials) => {
let result = match client.authenticate(credentials).await {
Ok(_) => true,
Err(err) => match &err {
mail_send::Error::AuthenticationFailed(err) if err.code() == 535 => {
num_auth_failures += 1;
false
}
_ => {
return Err(err);
}
},
};
(
result,
!result && num_auth_failures < MAX_AUTH_FAILURES_PER_SESSION,
)
}
};
// Try to reuse the connection with any queued requests
lookup.result.send(result).logged_unwrap();
if is_reusable {
let (next_lookup_tx, next_lookup_rx) = oneshot::channel::<Option<Lookup>>();
if tx
.send(Event::WorkerReady {
item: lookup.item,
result,
next_lookup: next_lookup_tx.into(),
})
.await
.logged_unwrap()
{
if let Ok(Some(next_lookup)) = next_lookup_rx.await {
lookup = next_lookup;
continue;
}
}
} else {
tx.send(Event::WorkerReady {
item: lookup.item,
result,
next_lookup: None,
})
.await
.logged_unwrap();
}
break;
}
Ok(())
}
impl RemoteLookup for Arc<SmtpClientBuilder<String>> {
fn spawn_lookup(&self, lookup: Lookup, tx: mpsc::Sender<Event>) {
let builder = self.clone();
tokio::spawn(async move {
if let Err(err) = lookup_smtp(builder.as_ref(), lookup, &tx).await {
tracing::warn!(
event = "error",
class = "remote",
remote.addr = &builder.addr,
remote.protocol = "smtp",
"Remote lookup failed: {}",
err
);
tx.send(Event::WorkerFailed).await.logged_unwrap();
}
});
}
}