Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix sockets issues #3015

Merged
merged 2 commits into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions .idea/code.iml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

56 changes: 25 additions & 31 deletions apps/labrinth/src/routes/internal/statuses.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,15 @@ pub enum ServerToClientMessage {
UserOffline { id: UserId },
FriendStatuses { statuses: Vec<UserStatus> },
FriendRequest { from: UserId },
FriendRequestRejected { from: UserId },
}

#[derive(Deserialize)]
struct LauncherHeartbeatInit {
code: String,
}

#[get("launcher_heartbeat")]
#[get("launcher_socket")]
pub async fn ws_init(
req: HttpRequest,
pool: Data<PgPool>,
Expand Down Expand Up @@ -122,16 +123,12 @@ pub async fn ws_init(
user.id,
ServerToClientMessage::StatusUpdate { status },
&pool,
&redis,
&db,
Some(friends),
)
.await?;

let mut stream = msg_stream
.aggregate_continuations()
// aggregate continuation frames up to 1MiB
.max_continuation_size(2_usize.pow(20));
let mut stream = msg_stream.aggregate_continuations();

actix_web::rt::spawn(async move {
// receive messages from websocket
Expand Down Expand Up @@ -168,7 +165,6 @@ pub async fn ws_init(
status: status.clone(),
},
&pool,
&redis,
&db,
None,
)
Expand All @@ -180,12 +176,23 @@ pub async fn ws_init(
}

Ok(AggregatedMessage::Close(_)) => {
let _ = close_socket(user.id, &pool, &redis, &db).await;
let _ = close_socket(user.id, &pool, &db).await;
}

Ok(AggregatedMessage::Ping(msg)) => {
if let Some(mut socket) =
db.auth_sockets.get_mut(&user.id.into())
{
let (_, socket) = socket.value_mut();
let _ = socket.pong(&msg).await;
}
}

_ => {}
}
}

let _ = close_socket(user.id, &pool, &db).await;
});

Ok(res)
Expand All @@ -195,7 +202,6 @@ pub async fn broadcast_friends(
user_id: UserId,
message: ServerToClientMessage,
pool: &PgPool,
redis: &RedisPool,
sockets: &ActiveSockets,
friends: Option<Vec<FriendItem>>,
) -> Result<(), crate::database::models::DatabaseError> {
Expand All @@ -218,17 +224,7 @@ pub async fn broadcast_friends(
{
let (_, socket) = socket.value_mut();

// TODO: bulk close sockets for better perf
if socket.text(serde_json::to_string(&message)?).await.is_err()
{
Box::pin(close_socket(
friend_id.into(),
pool,
redis,
sockets,
))
.await?;
}
let _ = socket.text(serde_json::to_string(&message)?).await;
}
}
}
Expand All @@ -239,22 +235,20 @@ pub async fn broadcast_friends(
pub async fn close_socket(
id: UserId,
pool: &PgPool,
redis: &RedisPool,
sockets: &ActiveSockets,
) -> Result<(), crate::database::models::DatabaseError> {
if let Some((_, (_, socket))) = sockets.auth_sockets.remove(&id) {
let _ = socket.close(None).await;
}

broadcast_friends(
id,
ServerToClientMessage::UserOffline { id },
pool,
redis,
sockets,
None,
)
.await?;
broadcast_friends(
id,
ServerToClientMessage::UserOffline { id },
pool,
sockets,
None,
)
.await?;
}

Ok(())
}
49 changes: 18 additions & 31 deletions apps/labrinth/src/routes/v3/friends.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,6 @@ pub async fn add_friend(
async fn send_friend_status(
user_id: UserId,
friend_id: UserId,
pool: &PgPool,
redis: &RedisPool,
sockets: &ActiveSockets,
) -> Result<(), ApiError> {
if let Some(pair) = sockets.auth_sockets.get(&user_id.into()) {
Expand All @@ -85,45 +83,21 @@ pub async fn add_friend(
{
let (_, socket) = socket.value_mut();

if socket
let _ = socket
.text(serde_json::to_string(
&ServerToClientMessage::StatusUpdate {
status: friend_status.clone(),
},
)?)
.await
.is_err()
{
close_socket(
friend_id.into(),
pool,
redis,
sockets,
)
.await?;
}
.await;
}
}

Ok(())
}

send_friend_status(
friend.user_id,
friend.friend_id,
&pool,
&redis,
&db,
)
.await?;
send_friend_status(
friend.friend_id,
friend.user_id,
&pool,
&redis,
&db,
)
.await?;
send_friend_status(friend.user_id, friend.friend_id, &db).await?;
send_friend_status(friend.friend_id, friend.user_id, &db).await?;
} else {
if friend.id == user.id.into() {
return Err(ApiError::InvalidInput(
Expand Down Expand Up @@ -157,7 +131,7 @@ pub async fn add_friend(
.await
.is_err()
{
close_socket(user.id, &pool, &redis, &db).await?;
close_socket(user.id, &pool, &db).await?;
}
}
}
Expand All @@ -177,6 +151,7 @@ pub async fn remove_friend(
pool: web::Data<PgPool>,
redis: web::Data<RedisPool>,
session_queue: web::Data<AuthQueue>,
db: web::Data<ActiveSockets>,
) -> Result<HttpResponse, ApiError> {
let user = get_user_from_headers(
&req,
Expand All @@ -202,6 +177,18 @@ pub async fn remove_friend(
)
.await?;

if let Some(mut socket) = db.auth_sockets.get_mut(&friend.id.into()) {
let (_, socket) = socket.value_mut();

let _ = socket
.text(serde_json::to_string(
&ServerToClientMessage::FriendRequestRejected {
from: user.id,
},
)?)
.await;
}

transaction.commit().await?;

Ok(HttpResponse::NoContent().body(""))
Expand Down
68 changes: 33 additions & 35 deletions packages/app-lib/src/state/friends.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ use reqwest::header::HeaderValue;
use reqwest::Method;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use tokio::sync::Mutex;
use tokio::sync::RwLock;

type WriteSocket =
Arc<Mutex<Option<SplitSink<WebSocketStream<ConnectStream>, Message>>>>;
Arc<RwLock<Option<SplitSink<WebSocketStream<ConnectStream>, Message>>>>;

pub struct FriendsSocket {
write: WriteSocket,
Expand Down Expand Up @@ -67,7 +67,7 @@ impl Default for FriendsSocket {
impl FriendsSocket {
pub fn new() -> Self {
Self {
write: Arc::new(Mutex::new(None)),
write: Arc::new(RwLock::new(None)),
user_statuses: Arc::new(DashMap::new()),
}
}
Expand All @@ -83,7 +83,7 @@ impl FriendsSocket {

if let Some(credentials) = credentials {
let mut request = format!(
"{MODRINTH_SOCKET_URL}_internal/launcher_heartbeat?code={}",
"{MODRINTH_SOCKET_URL}_internal/launcher_socket?code={}",
credentials.session
)
.into_client_request()?;
Expand All @@ -105,7 +105,7 @@ impl FriendsSocket {
let (write, read) = socket.split();

{
let mut write_lock = self.write.lock().await;
let mut write_lock = self.write.write().await;
*write_lock = Some(write);
}

Expand Down Expand Up @@ -181,19 +181,15 @@ impl FriendsSocket {
}
}

let mut w = write_handle.lock().await;
let mut w = write_handle.write().await;
*w = None;

Self::reconnect_task();
});
}
Err(e) => {
tracing::error!(
"Error connecting to friends socket: {e:?}"
);

Self::reconnect_task();

return Err(crate::Error::from(e));
}
}
Expand All @@ -202,40 +198,42 @@ impl FriendsSocket {
Ok(())
}

fn reconnect_task() {
pub async fn reconnect_task() -> crate::Result<()> {
let state = crate::State::get().await?;

tokio::task::spawn(async move {
let res = async {
let state = crate::State::get().await?;
let mut last_connection = Utc::now();

loop {
let connected = {
let read = state.friends_socket.write.read().await;
read.is_some()
};

if !connected
&& Utc::now().signed_duration_since(last_connection)
> chrono::Duration::seconds(30)
{
if state.friends_socket.write.lock().await.is_some() {
return Ok(());
}
last_connection = Utc::now();
let _ = state
.friends_socket
.connect(
&state.pool,
&state.api_semaphore,
&state.process_manager,
)
.await;
}

state
.friends_socket
.connect(
&state.pool,
&state.api_semaphore,
&state.process_manager,
)
.await?;

Ok::<(), crate::Error>(())
};

if let Err(e) = res.await {
tracing::info!("Error reconnecting to friends socket: {e:?}");

tokio::time::sleep(std::time::Duration::from_secs(30)).await;
FriendsSocket::reconnect_task();
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
});

Ok(())
}

pub async fn disconnect(&self) -> crate::Result<()> {
let mut write_lock = self.write.lock().await;
let mut write_lock = self.write.write().await;
if let Some(ref mut write_half) = *write_lock {
write_half.close().await?;
*write_lock = None;
Expand All @@ -247,7 +245,7 @@ impl FriendsSocket {
&self,
profile_name: Option<String>,
) -> crate::Result<()> {
let mut write_lock = self.write.lock().await;
let mut write_lock = self.write.write().await;
if let Some(ref mut write_half) = *write_lock {
write_half
.send(Message::Text(serde_json::to_string(
Expand Down
Loading
Loading