Skip to content

Commit

Permalink
Enable keep-alive for provisioning socket (#269)
Browse files Browse the repository at this point in the history
  • Loading branch information
gferon authored Dec 16, 2023
1 parent 34ad911 commit 9f81c52
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 17 deletions.
8 changes: 6 additions & 2 deletions libsignal-service-actix/src/push_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -625,9 +625,9 @@ impl PushService for AwcPushService {
async fn ws(
&mut self,
path: &str,
keep_alive_path: &str,
additional_headers: &[(&str, &str)],
credentials: Option<ServiceCredentials>,
keep_alive: bool,
) -> Result<SignalWebSocket, ServiceError> {
let (ws, stream) = AwcWebSocket::with_client(
&mut self.client,
Expand All @@ -637,7 +637,11 @@ impl PushService for AwcPushService {
credentials.as_ref(),
)
.await?;
let (ws, task) = SignalWebSocket::from_socket(ws, stream, keep_alive);
let (ws, task) = SignalWebSocket::from_socket(
ws,
stream,
keep_alive_path.to_owned(),
);
actix_rt::spawn(task);
Ok(ws)
}
Expand Down
5 changes: 3 additions & 2 deletions libsignal-service-hyper/src/push_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -582,9 +582,9 @@ impl PushService for HyperPushService {
async fn ws(
&mut self,
path: &str,
keepalive_path: &str,
additional_headers: &[(&str, &str)],
credentials: Option<ServiceCredentials>,
keep_alive: bool,
) -> Result<SignalWebSocket, ServiceError> {
let (ws, stream) = TungsteniteWebSocket::with_tls_config(
Self::tls_config(&self.cfg),
Expand All @@ -594,7 +594,8 @@ impl PushService for HyperPushService {
credentials.as_ref(),
)
.await?;
let (ws, task) = SignalWebSocket::from_socket(ws, stream, keep_alive);
let (ws, task) =
SignalWebSocket::from_socket(ws, stream, keepalive_path.to_owned());
tokio::task::spawn(task);
Ok(ws)
}
Expand Down
7 changes: 6 additions & 1 deletion libsignal-service/src/provisioning/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,12 @@ impl<P: PushService> LinkingManager<P> {
// open a websocket without authentication, to receive a tsurl://
let ws = self
.push_service
.ws("/v1/websocket/provisioning/", &[], None, false)
.ws(
"/v1/websocket/provisioning/",
"/v1/keepalive/provisioning",
&[],
None,
)
.await?;

let registration_id = csprng.gen_range(1..256);
Expand Down
2 changes: 1 addition & 1 deletion libsignal-service/src/push_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -625,9 +625,9 @@ pub trait PushService: MaybeSend {
async fn ws(
&mut self,
path: &str,
keepalive_path: &str,
additional_headers: &[(&str, &str)],
credentials: Option<ServiceCredentials>,
keep_alive: bool,
) -> Result<SignalWebSocket, ServiceError>;

/// Fetches a list of all devices tied to the authenticated account.
Expand Down
7 changes: 6 additions & 1 deletion libsignal-service/src/receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,12 @@ impl<Service: PushService> MessageReceiver<Service> {
)];
let ws = self
.service
.ws("/v1/websocket/", headers, Some(credentials.clone()), true)
.ws(
"/v1/websocket/",
"/v1/keepalive",
headers,
Some(credentials.clone()),
)
.await?;
Ok(MessagePipe::from_socket(ws, credentials))
}
Expand Down
17 changes: 7 additions & 10 deletions libsignal-service/src/websocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ struct SignalWebSocketInner {
}

struct SignalWebSocketProcess<WS: WebSocketService> {
/// Whether to enable keep-alive or not
keep_alive: bool,
/// Whether to enable keep-alive or not (and send a request to this path)
keep_alive_path: String,

/// Receives requests from the application, which we forward to Signal.
requests: mpsc::Receiver<(
Expand Down Expand Up @@ -207,13 +207,13 @@ impl<WS: WebSocketService> SignalWebSocketProcess<WS> {
Some(WebSocketStreamItem::Message(frame)) => {
self.process_frame(frame).await?;
}
Some(WebSocketStreamItem::KeepAliveRequest) if self.keep_alive => {
Some(WebSocketStreamItem::KeepAliveRequest) => {
// XXX: would be nicer if we could drop this request into the request
// queue above.
log::debug!("Sending keep alive upon request");
let request = WebSocketRequestMessage {
id: Some(self.next_request_id()),
path: Some("/v1/keepalive".into()),
path: Some(self.keep_alive_path.clone()),
verb: Some("GET".into()),
..Default::default()
};
Expand All @@ -224,10 +224,7 @@ impl<WS: WebSocketService> SignalWebSocketProcess<WS> {
..Default::default()
};
let buffer = msg.encode_to_vec();
self.ws.send_message(buffer.into()).await?
}
Some(WebSocketStreamItem::KeepAliveRequest) => {
log::trace!("keep alive is disabled: ignoring request");
self.ws.send_message(buffer.into()).await?;
}
None => {
return Err(ServiceError::WsError {
Expand Down Expand Up @@ -270,14 +267,14 @@ impl SignalWebSocket {
pub fn from_socket<WS: WebSocketService + 'static>(
ws: WS,
stream: WS::Stream,
keep_alive: bool,
keep_alive_path: String,
) -> (Self, impl Future<Output = ()>) {
// Create process
let (incoming_request_sink, incoming_request_stream) = mpsc::channel(1);
let (outgoing_request_sink, outgoing_requests) = mpsc::channel(1);

let process = SignalWebSocketProcess {
keep_alive,
keep_alive_path,
requests: outgoing_requests,
request_sink: incoming_request_sink,
outgoing_request_map: HashMap::default(),
Expand Down

0 comments on commit 9f81c52

Please sign in to comment.