Skip to content

Commit

Permalink
Fix sending note to self as primary without secondary devices (#339)
Browse files Browse the repository at this point in the history
* Fix sending note to self as primary without secondary devices

* Also add a guard to send SyncMessage only if is_multi_device is true

* Fix typo

* Change logging msg

* Small cleanup
  • Loading branch information
gferon authored Oct 26, 2024
1 parent 468fa6f commit b6b9cf3
Showing 1 changed file with 52 additions and 36 deletions.
88 changes: 52 additions & 36 deletions src/sender.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use core::time;
use std::{collections::HashSet, time::SystemTime};

use chrono::prelude::*;
Expand All @@ -7,7 +6,7 @@ use libsignal_protocol::{
ProtocolStore, SenderCertificate, SenderKeyStore, SignalProtocolError,
};
use rand::{CryptoRng, Rng};
use tracing::{debug, error, info, trace};
use tracing::{debug, error, info, trace, warn};
use tracing_futures::Instrument;
use uuid::Uuid;
use zkgroup::GROUP_IDENTIFIER_LEN;
Expand Down Expand Up @@ -132,6 +131,9 @@ pub enum MessageSenderError {

#[error("Recipient not found: {addr:?}")]
NotFound { addr: ServiceAddress },

#[error("no messages were encrypted: this should not really happen and most likely implies a logic error")]
NoMessagesToSend,
}

pub type GroupV2Id = [u8; GROUP_IDENTIFIER_LEN];
Expand All @@ -142,6 +144,12 @@ pub enum ThreadIdentifier {
Group(GroupV2Id),
}

#[derive(Debug)]
pub struct EncryptedMessages {
messages: Vec<OutgoingPushMessage>,
used_identity_key: IdentityKey,
}

impl<S, R> MessageSender<S, R>
where
S: ProtocolStore + SenderKeyStore + SessionStoreExt + Sync + Clone,
Expand Down Expand Up @@ -349,6 +357,7 @@ where
) -> SendMessageResult {
let content_body = message.into();
let message_to_self = recipient == &self.local_aci;
let is_multi_device = self.is_multi_device().await;

use crate::proto::data_message::Flags;

Expand All @@ -360,7 +369,7 @@ where
};

// only send a sync message when sending to self and skip the rest of the process
if message_to_self {
if message_to_self && is_multi_device {
debug!("sending note to self");
let sync_message =
Self::create_multi_device_sent_transcript_content(
Expand Down Expand Up @@ -403,7 +412,7 @@ where
_ => false,
};

if needs_sync || self.is_multi_device().await {
if needs_sync || is_multi_device {
debug!("sending multi-device sync message");
let sync_message =
Self::create_multi_device_sent_transcript_content(
Expand Down Expand Up @@ -533,13 +542,22 @@ where
let content_bytes = content.encode_to_vec();

for _ in 0..4u8 {
let (messages, used_identity_key) = self
let Some(EncryptedMessages {
messages,
used_identity_key,
}) = self
.create_encrypted_messages(
&recipient,
unidentified_access.map(|x| &x.certificate),
&content_bytes,
)
.await?;
.await?
else {
// this can happen for example when a device is primary, without any secondaries
// and we send a message to ourselves (which is only a SyncMessage { sent: ... })
// addressed to self
return Err(MessageSenderError::NoMessagesToSend);
};

let messages = OutgoingPushMessages {
destination: recipient.uuid,
Expand Down Expand Up @@ -839,8 +857,7 @@ where
recipient: &ServiceAddress,
unidentified_access: Option<&SenderCertificate>,
content: &[u8],
) -> Result<(Vec<OutgoingPushMessage>, IdentityKey), MessageSenderError>
{
) -> Result<Option<EncryptedMessages>, MessageSenderError> {
let mut messages = vec![];

let mut devices: HashSet<DeviceId> = self
Expand Down Expand Up @@ -891,32 +908,24 @@ where
messages.push(message);
break;
},
Err(
e @ MessageSenderError::ServiceError(
ServiceError::SignalProtocolError(
SignalProtocolError::SessionNotFound(_),
),
Err(MessageSenderError::ServiceError(
ServiceError::SignalProtocolError(
SignalProtocolError::SessionNotFound(addr),
),
) => {
let MessageSenderError::ServiceError(
ServiceError::SignalProtocolError(
SignalProtocolError::SessionNotFound(addr),
),
) = &e
else {
// We can't bind to addr above, because we move into `e`.
unreachable!()
};
)) => {
// SessionNotFound is returned on certain session corruption.
// Since delete_session *creates* a session if it doesn't exist,
// the NotFound error is an indicator of session corruption.
// Try to delete this session, if it gets succesfully deleted, retry. Otherwise, fail.
tracing::warn!("Potential session corruption for {}, deleting session", addr);
match self.protocol_store.delete_session(addr).await {
match self.protocol_store.delete_session(&addr).await {
Ok(()) => continue,
Err(_e) => {
tracing::warn!("Failed to delete session for {}, failing message. {}", addr, _e);
return Err(e);
Err(error) => {
tracing::warn!(%error, %addr, "failed to delete session");
return Err(
SignalProtocolError::SessionNotFound(addr)
.into(),
);
},
}
},
Expand All @@ -925,15 +934,22 @@ where
}
}

let identity_key = self
.protocol_store
.get_identity(&recipient.to_protocol_address(DEFAULT_DEVICE_ID))
.await?
.ok_or(MessageSenderError::UntrustedIdentity {
address: *recipient,
})?;

Ok((messages, identity_key))
if messages.is_empty() {
Ok(None)
} else {
Ok(Some(EncryptedMessages {
messages,
used_identity_key: self
.protocol_store
.get_identity(
&recipient.to_protocol_address(DEFAULT_DEVICE_ID),
)
.await?
.ok_or(MessageSenderError::UntrustedIdentity {
address: *recipient,
})?,
}))
}
}

/// Equivalent to `getEncryptedMessage`
Expand Down

0 comments on commit b6b9cf3

Please sign in to comment.