Feature/remove postgres (#2570)

* wip: move postgres data to patchdb

* wip

* wip

* wip

* complete notifications and clean up warnings

* fill in user agent

* move os tor bindings to single call
This commit is contained in:
Aiden McClelland
2024-03-07 14:40:22 -07:00
committed by GitHub
parent a17ec4221b
commit e0c9f8a5aa
70 changed files with 2429 additions and 2383 deletions

View File

@@ -1,14 +1,17 @@
use std::borrow::Borrow;
use std::collections::BTreeSet;
use std::ops::Deref;
use std::sync::Arc;
use std::time::{Duration, Instant};
use axum::extract::Request;
use axum::response::Response;
use basic_cookies::Cookie;
use chrono::Utc;
use color_eyre::eyre::eyre;
use digest::Digest;
use helpers::const_true;
use http::header::COOKIE;
use http::header::{COOKIE, USER_AGENT};
use http::HeaderValue;
use imbl_value::InternedString;
use rpc_toolkit::yajrc::INTERNAL_ERROR;
@@ -38,24 +41,36 @@ pub struct HasLoggedOutSessions(());
impl HasLoggedOutSessions {
pub async fn new(
logged_out_sessions: impl IntoIterator<Item = impl AsLogoutSessionId>,
sessions: impl IntoIterator<Item = impl AsLogoutSessionId>,
ctx: &RpcContext,
) -> Result<Self, Error> {
let mut open_authed_websockets = ctx.open_authed_websockets.lock().await;
let mut sqlx_conn = ctx.secret_store.acquire().await?;
for session in logged_out_sessions {
let session = session.as_logout_session_id();
let session = &*session;
sqlx::query!(
"UPDATE session SET logged_out = CURRENT_TIMESTAMP WHERE id = $1",
session
)
.execute(sqlx_conn.as_mut())
let to_log_out: BTreeSet<_> = sessions
.into_iter()
.map(|s| s.as_logout_session_id())
.collect();
ctx.open_authed_websockets
.lock()
.await
.retain(|session, sockets| {
if to_log_out.contains(session.hashed()) {
for socket in std::mem::take(sockets) {
let _ = socket.send(());
}
false
} else {
true
}
});
ctx.db
.mutate(|db| {
let sessions = db.as_private_mut().as_sessions_mut();
for sid in &to_log_out {
sessions.remove(sid)?;
}
Ok(())
})
.await?;
for socket in open_authed_websockets.remove(session).unwrap_or_default() {
let _ = socket.send(());
}
}
Ok(HasLoggedOutSessions(()))
}
}
@@ -105,15 +120,20 @@ impl HasValidSession {
ctx: &RpcContext,
) -> Result<Self, Error> {
let session_hash = session_token.hashed();
let session = sqlx::query!("UPDATE session SET last_active = CURRENT_TIMESTAMP WHERE id = $1 AND logged_out IS NULL OR logged_out > CURRENT_TIMESTAMP", session_hash)
.execute(ctx.secret_store.acquire().await?.as_mut())
ctx.db
.mutate(|db| {
db.as_private_mut()
.as_sessions_mut()
.as_idx_mut(session_hash)
.ok_or_else(|| {
Error::new(eyre!("UNAUTHORIZED"), crate::ErrorKind::Authorization)
})?
.mutate(|s| {
s.last_active = Utc::now();
Ok(())
})
})
.await?;
if session.rows_affected() == 0 {
return Err(Error::new(
eyre!("UNAUTHORIZED"),
crate::ErrorKind::Authorization,
));
}
Ok(Self(SessionType::Session(session_token)))
}
@@ -181,8 +201,8 @@ impl HashSessionToken {
}
}
pub fn hashed(&self) -> &str {
&*self.hashed
pub fn hashed(&self) -> &InternedString {
&self.hashed
}
fn hash(token: &str) -> InternedString {
@@ -241,6 +261,7 @@ pub struct Auth {
cookie: Option<HeaderValue>,
is_login: bool,
set_cookie: Option<HeaderValue>,
user_agent: Option<HeaderValue>,
}
impl Auth {
pub fn new() -> Self {
@@ -249,6 +270,7 @@ impl Auth {
cookie: None,
is_login: false,
set_cookie: None,
user_agent: None,
}
}
}
@@ -260,7 +282,8 @@ impl Middleware<RpcContext> for Auth {
_: &RpcContext,
request: &mut Request,
) -> Result<(), Response> {
self.cookie = request.headers_mut().get(COOKIE).cloned();
self.cookie = request.headers_mut().remove(COOKIE);
self.user_agent = request.headers_mut().remove(USER_AGENT);
Ok(())
}
async fn process_rpc_request(
@@ -282,6 +305,10 @@ impl Middleware<RpcContext> for Auth {
.into()),
});
}
if let Some(user_agent) = self.user_agent.as_ref().and_then(|h| h.to_str().ok()) {
request.params["user-agent"] = Value::String(Arc::new(user_agent.to_owned()))
// TODO: will this panic?
}
} else if metadata.authenticated {
match HasValidSession::from_header(self.cookie.as_ref(), &context).await {
Err(e) => {
@@ -291,7 +318,8 @@ impl Middleware<RpcContext> for Auth {
})
}
Ok(HasValidSession(SessionType::Session(s))) if metadata.get_session => {
request.params["session"] = Value::String(Arc::new(s.hashed().into()));
request.params["session"] =
Value::String(Arc::new(s.hashed().deref().to_owned()));
// TODO: will this panic?
}
_ => (),