1use std::mem;
4
5use errors::{CommitToPendingProposalsError, MergePendingCommitError};
6use openmls_traits::{crypto::OpenMlsCrypto, signatures::Signer, storage::StorageProvider as _};
7
8use crate::{
9 framing::mls_content::FramedContentBody,
10 group::{errors::MergeCommitError, StageCommitError, ValidationError},
11 messages::group_info::GroupInfo,
12 storage::OpenMlsProvider,
13 tree::sender_ratchet::SenderRatchetConfiguration,
14};
15
16use super::{errors::ProcessMessageError, *};
17
18impl MlsGroup {
19 pub fn process_message<Provider: OpenMlsProvider>(
29 &mut self,
30 provider: &Provider,
31 message: impl Into<ProtocolMessage>,
32 ) -> Result<ProcessedMessage, ProcessMessageError> {
33 if !self.is_active() {
35 return Err(ProcessMessageError::GroupStateError(
36 MlsGroupStateError::UseAfterEviction,
37 ));
38 }
39 let message = message.into();
40
41 if !message.is_external()
43 && message.is_handshake_message()
44 && !self
45 .configuration()
46 .wire_format_policy()
47 .incoming()
48 .is_compatible_with(message.wire_format())
49 {
50 return Err(ProcessMessageError::IncompatibleWireFormat);
51 }
52
53 let sender_ratchet_configuration = *self.configuration().sender_ratchet_configuration();
55
56 let decrypted_message =
62 self.decrypt_message(provider.crypto(), message, &sender_ratchet_configuration)?;
63
64 let unverified_message = self
65 .public_group
66 .parse_message(decrypted_message, &self.message_secrets_store)
67 .map_err(ProcessMessageError::from)?;
68
69 let (old_epoch_keypairs, leaf_node_keypairs) =
71 if let ContentType::Commit = unverified_message.content_type() {
72 self.read_decryption_keypairs(provider, &self.own_leaf_nodes)?
73 } else {
74 (vec![], vec![])
75 };
76
77 self.process_unverified_message(
78 provider,
79 unverified_message,
80 old_epoch_keypairs,
81 leaf_node_keypairs,
82 )
83 }
84
85 pub fn store_pending_proposal<Storage: StorageProvider>(
87 &mut self,
88 storage: &Storage,
89 proposal: QueuedProposal,
90 ) -> Result<(), Storage::Error> {
91 storage.queue_proposal(self.group_id(), &proposal.proposal_reference(), &proposal)?;
92 self.proposal_store_mut().add(proposal);
94
95 Ok(())
96 }
97
98 pub fn has_pending_proposals(&self) -> bool {
100 !self.proposal_store().is_empty()
101 }
102
103 #[allow(clippy::type_complexity)]
114 pub fn commit_to_pending_proposals<Provider: OpenMlsProvider>(
115 &mut self,
116 provider: &Provider,
117 signer: &impl Signer,
118 ) -> Result<
119 (MlsMessageOut, Option<MlsMessageOut>, Option<GroupInfo>),
120 CommitToPendingProposalsError<Provider::StorageError>,
121 > {
122 self.is_operational()?;
123
124 let (commit, welcome, group_info) = self
127 .commit_builder()
128 .consume_proposal_store(true)
130 .load_psks(provider.storage())?
131 .build(provider.rand(), provider.crypto(), signer, |_| true)?
132 .stage_commit(provider)?
133 .into_contents();
134
135 Ok((
136 commit,
137 welcome.map(|welcome| MlsMessageOut::from_welcome(welcome, self.version())),
139 group_info,
140 ))
141 }
142
143 pub fn merge_staged_commit<Provider: OpenMlsProvider>(
146 &mut self,
147 provider: &Provider,
148 staged_commit: StagedCommit,
149 ) -> Result<(), MergeCommitError<Provider::StorageError>> {
150 if staged_commit.self_removed() {
152 self.group_state = MlsGroupState::Inactive;
153 }
154 provider
155 .storage()
156 .write_group_state(self.group_id(), &self.group_state)
157 .map_err(MergeCommitError::StorageError)?;
158
159 self.merge_commit(provider, staged_commit)?;
161
162 let resumption_psk = self.group_epoch_secrets().resumption_psk();
164 self.resumption_psk_store
165 .add(self.context().epoch(), resumption_psk.clone());
166 provider
167 .storage()
168 .write_resumption_psk_store(self.group_id(), &self.resumption_psk_store)
169 .map_err(MergeCommitError::StorageError)?;
170
171 self.own_leaf_nodes.clear();
173 provider
174 .storage()
175 .delete_own_leaf_nodes(self.group_id())
176 .map_err(MergeCommitError::StorageError)?;
177
178 self.clear_pending_commit(provider.storage())
180 .map_err(MergeCommitError::StorageError)?;
181
182 Ok(())
183 }
184
185 pub fn merge_pending_commit<Provider: OpenMlsProvider>(
188 &mut self,
189 provider: &Provider,
190 ) -> Result<(), MergePendingCommitError<Provider::StorageError>> {
191 match &self.group_state {
192 MlsGroupState::PendingCommit(_) => {
193 let old_state = mem::replace(&mut self.group_state, MlsGroupState::Operational);
194 if let MlsGroupState::PendingCommit(pending_commit_state) = old_state {
195 self.merge_staged_commit(provider, (*pending_commit_state).into())?;
196 }
197 Ok(())
198 }
199 MlsGroupState::Inactive => Err(MlsGroupStateError::UseAfterEviction)?,
200 MlsGroupState::Operational => Ok(()),
201 }
202 }
203
204 pub(super) fn read_decryption_keypairs(
206 &self,
207 provider: &impl OpenMlsProvider,
208 own_leaf_nodes: &[LeafNode],
209 ) -> Result<(Vec<EncryptionKeyPair>, Vec<EncryptionKeyPair>), StageCommitError> {
210 let old_epoch_keypairs = self.read_epoch_keypairs(provider.storage()).map_err(|e| {
212 log::error!("Error reading epoch keypairs: {e:?}");
213 StageCommitError::MissingDecryptionKey
214 })?;
215
216 let leaf_node_keypairs = own_leaf_nodes
220 .iter()
221 .map(|leaf_node| {
222 EncryptionKeyPair::read(provider, leaf_node.encryption_key())
223 .ok_or(StageCommitError::MissingDecryptionKey)
224 })
225 .collect::<Result<Vec<EncryptionKeyPair>, StageCommitError>>()?;
226
227 Ok((old_epoch_keypairs, leaf_node_keypairs))
228 }
229
230 pub(crate) fn process_unverified_message<Provider: OpenMlsProvider>(
259 &self,
260 provider: &Provider,
261 unverified_message: UnverifiedMessage,
262 old_epoch_keypairs: Vec<EncryptionKeyPair>,
263 leaf_node_keypairs: Vec<EncryptionKeyPair>,
264 ) -> Result<ProcessedMessage, ProcessMessageError> {
265 let (content, credential) =
271 unverified_message.verify(self.ciphersuite(), provider.crypto(), self.version())?;
272
273 match content.sender() {
274 Sender::Member(_) | Sender::NewMemberCommit | Sender::NewMemberProposal => {
275 let sender = content.sender().clone();
276 let authenticated_data = content.authenticated_data().to_owned();
277 let epoch = content.epoch();
278
279 let content = match content.content() {
280 FramedContentBody::Application(application_message) => {
281 ProcessedMessageContent::ApplicationMessage(ApplicationMessage::new(
282 application_message.as_slice().to_owned(),
283 ))
284 }
285 FramedContentBody::Proposal(_) => {
286 let proposal = Box::new(QueuedProposal::from_authenticated_content_by_ref(
287 self.ciphersuite(),
288 provider.crypto(),
289 content,
290 )?);
291
292 if matches!(sender, Sender::NewMemberProposal) {
293 ProcessedMessageContent::ExternalJoinProposalMessage(proposal)
294 } else {
295 ProcessedMessageContent::ProposalMessage(proposal)
296 }
297 }
298 FramedContentBody::Commit(_) => {
299 let staged_commit = self.stage_commit(
300 &content,
301 old_epoch_keypairs,
302 leaf_node_keypairs,
303 provider,
304 )?;
305 ProcessedMessageContent::StagedCommitMessage(Box::new(staged_commit))
306 }
307 };
308
309 Ok(ProcessedMessage::new(
310 self.group_id().clone(),
311 epoch,
312 sender,
313 authenticated_data,
314 content,
315 credential,
316 ))
317 }
318 Sender::External(_) => {
319 let sender = content.sender().clone();
320 let data = content.authenticated_data().to_owned();
321 match content.content() {
323 FramedContentBody::Application(_) => {
324 Err(ProcessMessageError::UnauthorizedExternalApplicationMessage)
325 }
326 FramedContentBody::Proposal(Proposal::GroupContextExtensions(_)) => {
328 let content = ProcessedMessageContent::ProposalMessage(Box::new(
329 QueuedProposal::from_authenticated_content_by_ref(
330 self.ciphersuite(),
331 provider.crypto(),
332 content,
333 )?,
334 ));
335 Ok(ProcessedMessage::new(
336 self.group_id().clone(),
337 self.context().epoch(),
338 sender,
339 data,
340 content,
341 credential,
342 ))
343 }
344
345 FramedContentBody::Proposal(Proposal::Remove(_)) => {
346 let content = ProcessedMessageContent::ProposalMessage(Box::new(
347 QueuedProposal::from_authenticated_content_by_ref(
348 self.ciphersuite(),
349 provider.crypto(),
350 content,
351 )?,
352 ));
353 Ok(ProcessedMessage::new(
354 self.group_id().clone(),
355 self.context().epoch(),
356 sender,
357 data,
358 content,
359 credential,
360 ))
361 }
362 FramedContentBody::Proposal(Proposal::Add(_)) => {
363 let content = ProcessedMessageContent::ProposalMessage(Box::new(
364 QueuedProposal::from_authenticated_content_by_ref(
365 self.ciphersuite(),
366 provider.crypto(),
367 content,
368 )?,
369 ));
370 Ok(ProcessedMessage::new(
371 self.group_id().clone(),
372 self.context().epoch(),
373 sender,
374 data,
375 content,
376 credential,
377 ))
378 }
379 FramedContentBody::Proposal(_) => {
381 Err(ProcessMessageError::UnsupportedProposalType)
382 }
383 FramedContentBody::Commit(_) => {
384 Err(ProcessMessageError::UnauthorizedExternalCommitMessage)
385 }
386 }
387 }
388 }
389 }
390
391 pub(crate) fn decrypt_message(
403 &mut self,
404 crypto: &impl OpenMlsCrypto,
405 message: ProtocolMessage,
406 sender_ratchet_configuration: &SenderRatchetConfiguration,
407 ) -> Result<DecryptedMessage, ValidationError> {
408 self.public_group.validate_framing(&message)?;
412
413 let epoch = message.epoch();
414
415 match message {
419 ProtocolMessage::PublicMessage(public_message) => {
420 let message_secrets =
422 self.message_secrets_for_epoch(epoch).map_err(|e| match e {
423 SecretTreeError::TooDistantInThePast => ValidationError::NoPastEpochData,
424 _ => LibraryError::custom(
425 "Unexpected error while retrieving message secrets for epoch.",
426 )
427 .into(),
428 })?;
429 DecryptedMessage::from_inbound_public_message(
430 *public_message,
431 message_secrets,
432 message_secrets.serialized_context().to_vec(),
433 crypto,
434 self.ciphersuite(),
435 )
436 }
437 ProtocolMessage::PrivateMessage(ciphertext) => {
438 DecryptedMessage::from_inbound_ciphertext(
440 ciphertext,
441 crypto,
442 self,
443 sender_ratchet_configuration,
444 )
445 }
446 }
447 }
448}