1use std::mem;
4
5use errors::{CommitToPendingProposalsError, MergePendingCommitError};
6use openmls_traits::{crypto::OpenMlsCrypto, signatures::Signer, storage::StorageProvider as _};
7
8use crate::{
9 framing::mls_content::FramedContentBody,
10 group::{errors::MergeCommitError, StageCommitError, ValidationError},
11 messages::group_info::GroupInfo,
12 storage::OpenMlsProvider,
13 tree::sender_ratchet::SenderRatchetConfiguration,
14};
15
16use super::{errors::ProcessMessageError, *};
17
18impl MlsGroup {
19 pub fn process_message<Provider: OpenMlsProvider>(
29 &mut self,
30 provider: &Provider,
31 message: impl Into<ProtocolMessage>,
32 ) -> Result<ProcessedMessage, ProcessMessageError> {
33 if !self.is_active() {
35 return Err(ProcessMessageError::GroupStateError(
36 MlsGroupStateError::UseAfterEviction,
37 ));
38 }
39 let message = message.into();
40
41 if !message.is_external()
43 && message.is_handshake_message()
44 && !self
45 .configuration()
46 .wire_format_policy()
47 .incoming()
48 .is_compatible_with(message.wire_format())
49 {
50 return Err(ProcessMessageError::IncompatibleWireFormat);
51 }
52
53 let sender_ratchet_configuration = *self.configuration().sender_ratchet_configuration();
55
56 let decrypted_message =
62 self.decrypt_message(provider.crypto(), message, &sender_ratchet_configuration)?;
63
64 let unverified_message = self
65 .public_group
66 .parse_message(decrypted_message, &self.message_secrets_store)
67 .map_err(ProcessMessageError::from)?;
68
69 let (old_epoch_keypairs, leaf_node_keypairs) =
71 if let ContentType::Commit = unverified_message.content_type() {
72 self.read_decryption_keypairs(provider, &self.own_leaf_nodes)?
73 } else {
74 (vec![], vec![])
75 };
76
77 self.process_unverified_message(
78 provider,
79 unverified_message,
80 old_epoch_keypairs,
81 leaf_node_keypairs,
82 )
83 }
84
85 pub fn store_pending_proposal<Storage: StorageProvider>(
87 &mut self,
88 storage: &Storage,
89 proposal: QueuedProposal,
90 ) -> Result<(), Storage::Error> {
91 storage.queue_proposal(self.group_id(), &proposal.proposal_reference(), &proposal)?;
92 self.proposal_store_mut().add(proposal);
94
95 Ok(())
96 }
97
98 #[allow(clippy::type_complexity)]
109 pub fn commit_to_pending_proposals<Provider: OpenMlsProvider>(
110 &mut self,
111 provider: &Provider,
112 signer: &impl Signer,
113 ) -> Result<
114 (MlsMessageOut, Option<MlsMessageOut>, Option<GroupInfo>),
115 CommitToPendingProposalsError<Provider::StorageError>,
116 > {
117 self.is_operational()?;
118
119 let (commit, welcome, group_info) = self
122 .commit_builder()
123 .consume_proposal_store(true)
125 .load_psks(provider.storage())?
126 .build(provider.rand(), provider.crypto(), signer, |_| true)?
127 .stage_commit(provider)?
128 .into_contents();
129
130 Ok((
131 commit,
132 welcome.map(|welcome| MlsMessageOut::from_welcome(welcome, self.version())),
134 group_info,
135 ))
136 }
137
138 pub fn merge_staged_commit<Provider: OpenMlsProvider>(
141 &mut self,
142 provider: &Provider,
143 staged_commit: StagedCommit,
144 ) -> Result<(), MergeCommitError<Provider::StorageError>> {
145 if staged_commit.self_removed() {
147 self.group_state = MlsGroupState::Inactive;
148 }
149 provider
150 .storage()
151 .write_group_state(self.group_id(), &self.group_state)
152 .map_err(MergeCommitError::StorageError)?;
153
154 self.merge_commit(provider, staged_commit)?;
156
157 let resumption_psk = self.group_epoch_secrets().resumption_psk();
159 self.resumption_psk_store
160 .add(self.context().epoch(), resumption_psk.clone());
161 provider
162 .storage()
163 .write_resumption_psk_store(self.group_id(), &self.resumption_psk_store)
164 .map_err(MergeCommitError::StorageError)?;
165
166 self.own_leaf_nodes.clear();
168 provider
169 .storage()
170 .delete_own_leaf_nodes(self.group_id())
171 .map_err(MergeCommitError::StorageError)?;
172
173 self.clear_pending_commit(provider.storage())
175 .map_err(MergeCommitError::StorageError)?;
176
177 Ok(())
178 }
179
180 pub fn merge_pending_commit<Provider: OpenMlsProvider>(
183 &mut self,
184 provider: &Provider,
185 ) -> Result<(), MergePendingCommitError<Provider::StorageError>> {
186 match &self.group_state {
187 MlsGroupState::PendingCommit(_) => {
188 let old_state = mem::replace(&mut self.group_state, MlsGroupState::Operational);
189 if let MlsGroupState::PendingCommit(pending_commit_state) = old_state {
190 self.merge_staged_commit(provider, (*pending_commit_state).into())?;
191 }
192 Ok(())
193 }
194 MlsGroupState::Inactive => Err(MlsGroupStateError::UseAfterEviction)?,
195 MlsGroupState::Operational => Ok(()),
196 }
197 }
198
199 pub(super) fn read_decryption_keypairs(
201 &self,
202 provider: &impl OpenMlsProvider,
203 own_leaf_nodes: &[LeafNode],
204 ) -> Result<(Vec<EncryptionKeyPair>, Vec<EncryptionKeyPair>), StageCommitError> {
205 let old_epoch_keypairs = self.read_epoch_keypairs(provider.storage()).map_err(|e| {
207 log::error!("Error reading epoch keypairs: {e:?}");
208 StageCommitError::MissingDecryptionKey
209 })?;
210
211 let leaf_node_keypairs = own_leaf_nodes
215 .iter()
216 .map(|leaf_node| {
217 EncryptionKeyPair::read(provider, leaf_node.encryption_key())
218 .ok_or(StageCommitError::MissingDecryptionKey)
219 })
220 .collect::<Result<Vec<EncryptionKeyPair>, StageCommitError>>()?;
221
222 Ok((old_epoch_keypairs, leaf_node_keypairs))
223 }
224
225 pub(crate) fn process_unverified_message<Provider: OpenMlsProvider>(
254 &self,
255 provider: &Provider,
256 unverified_message: UnverifiedMessage,
257 old_epoch_keypairs: Vec<EncryptionKeyPair>,
258 leaf_node_keypairs: Vec<EncryptionKeyPair>,
259 ) -> Result<ProcessedMessage, ProcessMessageError> {
260 let (content, credential) =
266 unverified_message.verify(self.ciphersuite(), provider.crypto(), self.version())?;
267
268 match content.sender() {
269 Sender::Member(_) | Sender::NewMemberCommit | Sender::NewMemberProposal => {
270 let sender = content.sender().clone();
271 let authenticated_data = content.authenticated_data().to_owned();
272 let epoch = content.epoch();
273
274 let content = match content.content() {
275 FramedContentBody::Application(application_message) => {
276 ProcessedMessageContent::ApplicationMessage(ApplicationMessage::new(
277 application_message.as_slice().to_owned(),
278 ))
279 }
280 FramedContentBody::Proposal(_) => {
281 let proposal = Box::new(QueuedProposal::from_authenticated_content_by_ref(
282 self.ciphersuite(),
283 provider.crypto(),
284 content,
285 )?);
286
287 if matches!(sender, Sender::NewMemberProposal) {
288 ProcessedMessageContent::ExternalJoinProposalMessage(proposal)
289 } else {
290 ProcessedMessageContent::ProposalMessage(proposal)
291 }
292 }
293 FramedContentBody::Commit(_) => {
294 let staged_commit = self.stage_commit(
295 &content,
296 old_epoch_keypairs,
297 leaf_node_keypairs,
298 provider,
299 )?;
300 ProcessedMessageContent::StagedCommitMessage(Box::new(staged_commit))
301 }
302 };
303
304 Ok(ProcessedMessage::new(
305 self.group_id().clone(),
306 epoch,
307 sender,
308 authenticated_data,
309 content,
310 credential,
311 ))
312 }
313 Sender::External(_) => {
314 let sender = content.sender().clone();
315 let data = content.authenticated_data().to_owned();
316 match content.content() {
318 FramedContentBody::Application(_) => {
319 Err(ProcessMessageError::UnauthorizedExternalApplicationMessage)
320 }
321 FramedContentBody::Proposal(Proposal::GroupContextExtensions(_)) => {
323 let content = ProcessedMessageContent::ProposalMessage(Box::new(
324 QueuedProposal::from_authenticated_content_by_ref(
325 self.ciphersuite(),
326 provider.crypto(),
327 content,
328 )?,
329 ));
330 Ok(ProcessedMessage::new(
331 self.group_id().clone(),
332 self.context().epoch(),
333 sender,
334 data,
335 content,
336 credential,
337 ))
338 }
339
340 FramedContentBody::Proposal(Proposal::Remove(_)) => {
341 let content = ProcessedMessageContent::ProposalMessage(Box::new(
342 QueuedProposal::from_authenticated_content_by_ref(
343 self.ciphersuite(),
344 provider.crypto(),
345 content,
346 )?,
347 ));
348 Ok(ProcessedMessage::new(
349 self.group_id().clone(),
350 self.context().epoch(),
351 sender,
352 data,
353 content,
354 credential,
355 ))
356 }
357 FramedContentBody::Proposal(Proposal::Add(_)) => {
358 let content = ProcessedMessageContent::ProposalMessage(Box::new(
359 QueuedProposal::from_authenticated_content_by_ref(
360 self.ciphersuite(),
361 provider.crypto(),
362 content,
363 )?,
364 ));
365 Ok(ProcessedMessage::new(
366 self.group_id().clone(),
367 self.context().epoch(),
368 sender,
369 data,
370 content,
371 credential,
372 ))
373 }
374 FramedContentBody::Proposal(_) => {
376 Err(ProcessMessageError::UnsupportedProposalType)
377 }
378 FramedContentBody::Commit(_) => {
379 Err(ProcessMessageError::UnauthorizedExternalCommitMessage)
380 }
381 }
382 }
383 }
384 }
385
386 pub(crate) fn decrypt_message(
398 &mut self,
399 crypto: &impl OpenMlsCrypto,
400 message: ProtocolMessage,
401 sender_ratchet_configuration: &SenderRatchetConfiguration,
402 ) -> Result<DecryptedMessage, ValidationError> {
403 self.public_group.validate_framing(&message)?;
407
408 let epoch = message.epoch();
409
410 match message {
414 ProtocolMessage::PublicMessage(public_message) => {
415 let message_secrets =
417 self.message_secrets_for_epoch(epoch).map_err(|e| match e {
418 SecretTreeError::TooDistantInThePast => ValidationError::NoPastEpochData,
419 _ => LibraryError::custom(
420 "Unexpected error while retrieving message secrets for epoch.",
421 )
422 .into(),
423 })?;
424 DecryptedMessage::from_inbound_public_message(
425 *public_message,
426 message_secrets,
427 message_secrets.serialized_context().to_vec(),
428 crypto,
429 self.ciphersuite(),
430 )
431 }
432 ProtocolMessage::PrivateMessage(ciphertext) => {
433 DecryptedMessage::from_inbound_ciphertext(
435 ciphertext,
436 crypto,
437 self,
438 sender_ratchet_configuration,
439 )
440 }
441 }
442 }
443}