openmls/group/mls_group/
processing.rs

1//! Processing functions of an [`MlsGroup`] for incoming messages.
2
3use std::mem;
4
5use errors::{CommitToPendingProposalsError, MergePendingCommitError};
6use openmls_traits::{crypto::OpenMlsCrypto, signatures::Signer, storage::StorageProvider as _};
7
8use crate::{
9    framing::mls_content::FramedContentBody,
10    group::{errors::MergeCommitError, StageCommitError, ValidationError},
11    messages::group_info::GroupInfo,
12    storage::OpenMlsProvider,
13    tree::sender_ratchet::SenderRatchetConfiguration,
14};
15
16use super::{errors::ProcessMessageError, *};
17
18impl MlsGroup {
19    /// Parses incoming messages from the DS. Checks for syntactic errors and
20    /// makes some semantic checks as well. If the input is an encrypted
21    /// message, it will be decrypted. This processing function does syntactic
22    /// and semantic validation of the message. It returns a [ProcessedMessage]
23    /// enum.
24    ///
25    /// # Errors:
26    /// Returns an [`ProcessMessageError`] when the validation checks fail
27    /// with the exact reason of the failure.
28    pub fn process_message<Provider: OpenMlsProvider>(
29        &mut self,
30        provider: &Provider,
31        message: impl Into<ProtocolMessage>,
32    ) -> Result<ProcessedMessage, ProcessMessageError> {
33        // Make sure we are still a member of the group
34        if !self.is_active() {
35            return Err(ProcessMessageError::GroupStateError(
36                MlsGroupStateError::UseAfterEviction,
37            ));
38        }
39        let message = message.into();
40
41        // Check that handshake messages are compatible with the incoming wire format policy
42        if !message.is_external()
43            && message.is_handshake_message()
44            && !self
45                .configuration()
46                .wire_format_policy()
47                .incoming()
48                .is_compatible_with(message.wire_format())
49        {
50            return Err(ProcessMessageError::IncompatibleWireFormat);
51        }
52
53        // Parse the message
54        let sender_ratchet_configuration = *self.configuration().sender_ratchet_configuration();
55
56        // Checks the following semantic validation:
57        //  - ValSem002
58        //  - ValSem003
59        //  - ValSem006
60        //  - ValSem007 MembershipTag presence
61        let decrypted_message =
62            self.decrypt_message(provider.crypto(), message, &sender_ratchet_configuration)?;
63
64        let unverified_message = self
65            .public_group
66            .parse_message(decrypted_message, &self.message_secrets_store)
67            .map_err(ProcessMessageError::from)?;
68
69        // If this is a commit, we need to load the private key material we need for decryption.
70        let (old_epoch_keypairs, leaf_node_keypairs) =
71            if let ContentType::Commit = unverified_message.content_type() {
72                self.read_decryption_keypairs(provider, &self.own_leaf_nodes)?
73            } else {
74                (vec![], vec![])
75            };
76
77        self.process_unverified_message(
78            provider,
79            unverified_message,
80            old_epoch_keypairs,
81            leaf_node_keypairs,
82        )
83    }
84
85    /// Stores a standalone proposal in the internal [ProposalStore]
86    pub fn store_pending_proposal<Storage: StorageProvider>(
87        &mut self,
88        storage: &Storage,
89        proposal: QueuedProposal,
90    ) -> Result<(), Storage::Error> {
91        storage.queue_proposal(self.group_id(), &proposal.proposal_reference(), &proposal)?;
92        // Store the proposal in in the internal ProposalStore
93        self.proposal_store_mut().add(proposal);
94
95        Ok(())
96    }
97
98    /// Returns true if there are pending proposals queued in the proposal store.
99    pub fn has_pending_proposals(&self) -> bool {
100        !self.proposal_store().is_empty()
101    }
102
103    /// Creates a Commit message that covers the pending proposals that are
104    /// currently stored in the group's [ProposalStore]. The Commit message is
105    /// created even if there are no valid pending proposals.
106    ///
107    /// Returns an error if there is a pending commit. Otherwise it returns a
108    /// tuple of `Commit, Option<Welcome>, Option<GroupInfo>`, where `Commit`
109    /// and [`Welcome`] are MlsMessages of the type [`MlsMessageOut`].
110    ///
111    /// [`Welcome`]: crate::messages::Welcome
112    // FIXME: #1217
113    #[allow(clippy::type_complexity)]
114    pub fn commit_to_pending_proposals<Provider: OpenMlsProvider>(
115        &mut self,
116        provider: &Provider,
117        signer: &impl Signer,
118    ) -> Result<
119        (MlsMessageOut, Option<MlsMessageOut>, Option<GroupInfo>),
120        CommitToPendingProposalsError<Provider::StorageError>,
121    > {
122        self.is_operational()?;
123
124        // Build and stage the commit using the commit builder
125        // TODO #751
126        let (commit, welcome, group_info) = self
127            .commit_builder()
128            // This forces committing to the proposals in the proposal store:
129            .consume_proposal_store(true)
130            .load_psks(provider.storage())?
131            .build(provider.rand(), provider.crypto(), signer, |_| true)?
132            .stage_commit(provider)?
133            .into_contents();
134
135        Ok((
136            commit,
137            // Turn the [`Welcome`] to an [`MlsMessageOut`], if there is one
138            welcome.map(|welcome| MlsMessageOut::from_welcome(welcome, self.version())),
139            group_info,
140        ))
141    }
142
143    /// Merge a [StagedCommit] into the group after inspection. As this advances
144    /// the epoch of the group, it also clears any pending commits.
145    pub fn merge_staged_commit<Provider: OpenMlsProvider>(
146        &mut self,
147        provider: &Provider,
148        staged_commit: StagedCommit,
149    ) -> Result<(), MergeCommitError<Provider::StorageError>> {
150        // Check if we were removed from the group
151        if staged_commit.self_removed() {
152            self.group_state = MlsGroupState::Inactive;
153        }
154        provider
155            .storage()
156            .write_group_state(self.group_id(), &self.group_state)
157            .map_err(MergeCommitError::StorageError)?;
158
159        // Merge staged commit
160        self.merge_commit(provider, staged_commit)?;
161
162        // Extract and store the resumption psk for the current epoch
163        let resumption_psk = self.group_epoch_secrets().resumption_psk();
164        self.resumption_psk_store
165            .add(self.context().epoch(), resumption_psk.clone());
166        provider
167            .storage()
168            .write_resumption_psk_store(self.group_id(), &self.resumption_psk_store)
169            .map_err(MergeCommitError::StorageError)?;
170
171        // Delete own KeyPackageBundles
172        self.own_leaf_nodes.clear();
173        provider
174            .storage()
175            .delete_own_leaf_nodes(self.group_id())
176            .map_err(MergeCommitError::StorageError)?;
177
178        // Delete a potential pending commit
179        self.clear_pending_commit(provider.storage())
180            .map_err(MergeCommitError::StorageError)?;
181
182        Ok(())
183    }
184
185    /// Merges the pending [`StagedCommit`] if there is one, and
186    /// clears the field by setting it to `None`.
187    pub fn merge_pending_commit<Provider: OpenMlsProvider>(
188        &mut self,
189        provider: &Provider,
190    ) -> Result<(), MergePendingCommitError<Provider::StorageError>> {
191        match &self.group_state {
192            MlsGroupState::PendingCommit(_) => {
193                let old_state = mem::replace(&mut self.group_state, MlsGroupState::Operational);
194                if let MlsGroupState::PendingCommit(pending_commit_state) = old_state {
195                    self.merge_staged_commit(provider, (*pending_commit_state).into())?;
196                }
197                Ok(())
198            }
199            MlsGroupState::Inactive => Err(MlsGroupStateError::UseAfterEviction)?,
200            MlsGroupState::Operational => Ok(()),
201        }
202    }
203
204    /// Helper function to read decryption keypairs.
205    pub(super) fn read_decryption_keypairs(
206        &self,
207        provider: &impl OpenMlsProvider,
208        own_leaf_nodes: &[LeafNode],
209    ) -> Result<(Vec<EncryptionKeyPair>, Vec<EncryptionKeyPair>), StageCommitError> {
210        // All keys from the previous epoch are potential decryption keypairs.
211        let old_epoch_keypairs = self.read_epoch_keypairs(provider.storage()).map_err(|e| {
212            log::error!("Error reading epoch keypairs: {e:?}");
213            StageCommitError::MissingDecryptionKey
214        })?;
215
216        // If we are processing an update proposal that originally came from
217        // us, the keypair corresponding to the leaf in the update is also a
218        // potential decryption keypair.
219        let leaf_node_keypairs = own_leaf_nodes
220            .iter()
221            .map(|leaf_node| {
222                EncryptionKeyPair::read(provider, leaf_node.encryption_key())
223                    .ok_or(StageCommitError::MissingDecryptionKey)
224            })
225            .collect::<Result<Vec<EncryptionKeyPair>, StageCommitError>>()?;
226
227        Ok((old_epoch_keypairs, leaf_node_keypairs))
228    }
229
230    /// This processing function does most of the semantic verifications.
231    /// It returns a [ProcessedMessage] enum.
232    /// Checks the following semantic validation:
233    ///  - ValSem008
234    ///  - ValSem010
235    ///  - ValSem101
236    ///  - ValSem102
237    ///  - ValSem104
238    ///  - ValSem106
239    ///  - ValSem107
240    ///  - ValSem108
241    ///  - ValSem110
242    ///  - ValSem111
243    ///  - ValSem112
244    ///  - ValSem113: All Proposals: The proposal type must be supported by all
245    ///    members of the group
246    ///  - ValSem200
247    ///  - ValSem201
248    ///  - ValSem202: Path must be the right length
249    ///  - ValSem203: Path secrets must decrypt correctly
250    ///  - ValSem204: Public keys from Path must be verified and match the
251    ///    private keys from the direct path
252    ///  - ValSem205
253    ///  - ValSem240
254    ///  - ValSem241
255    ///  - ValSem242
256    ///  - ValSem244
257    ///  - ValSem246 (as part of ValSem010)
258    pub(crate) fn process_unverified_message<Provider: OpenMlsProvider>(
259        &self,
260        provider: &Provider,
261        unverified_message: UnverifiedMessage,
262        old_epoch_keypairs: Vec<EncryptionKeyPair>,
263        leaf_node_keypairs: Vec<EncryptionKeyPair>,
264    ) -> Result<ProcessedMessage, ProcessMessageError> {
265        // Checks the following semantic validation:
266        //  - ValSem010
267        //  - ValSem246 (as part of ValSem010)
268        //  - https://validation.openmls.tech/#valn1302
269        //  - https://validation.openmls.tech/#valn1304
270        let (content, credential) =
271            unverified_message.verify(self.ciphersuite(), provider.crypto(), self.version())?;
272
273        match content.sender() {
274            Sender::Member(_) | Sender::NewMemberCommit | Sender::NewMemberProposal => {
275                let sender = content.sender().clone();
276                let authenticated_data = content.authenticated_data().to_owned();
277                let epoch = content.epoch();
278
279                let content = match content.content() {
280                    FramedContentBody::Application(application_message) => {
281                        ProcessedMessageContent::ApplicationMessage(ApplicationMessage::new(
282                            application_message.as_slice().to_owned(),
283                        ))
284                    }
285                    FramedContentBody::Proposal(_) => {
286                        let proposal = Box::new(QueuedProposal::from_authenticated_content_by_ref(
287                            self.ciphersuite(),
288                            provider.crypto(),
289                            content,
290                        )?);
291
292                        if matches!(sender, Sender::NewMemberProposal) {
293                            ProcessedMessageContent::ExternalJoinProposalMessage(proposal)
294                        } else {
295                            ProcessedMessageContent::ProposalMessage(proposal)
296                        }
297                    }
298                    FramedContentBody::Commit(_) => {
299                        let staged_commit = self.stage_commit(
300                            &content,
301                            old_epoch_keypairs,
302                            leaf_node_keypairs,
303                            provider,
304                        )?;
305                        ProcessedMessageContent::StagedCommitMessage(Box::new(staged_commit))
306                    }
307                };
308
309                Ok(ProcessedMessage::new(
310                    self.group_id().clone(),
311                    epoch,
312                    sender,
313                    authenticated_data,
314                    content,
315                    credential,
316                ))
317            }
318            Sender::External(_) => {
319                let sender = content.sender().clone();
320                let data = content.authenticated_data().to_owned();
321                // https://validation.openmls.tech/#valn1501
322                match content.content() {
323                    FramedContentBody::Application(_) => {
324                        Err(ProcessMessageError::UnauthorizedExternalApplicationMessage)
325                    }
326                    // TODO: https://validation.openmls.tech/#valn1502
327                    FramedContentBody::Proposal(Proposal::GroupContextExtensions(_)) => {
328                        let content = ProcessedMessageContent::ProposalMessage(Box::new(
329                            QueuedProposal::from_authenticated_content_by_ref(
330                                self.ciphersuite(),
331                                provider.crypto(),
332                                content,
333                            )?,
334                        ));
335                        Ok(ProcessedMessage::new(
336                            self.group_id().clone(),
337                            self.context().epoch(),
338                            sender,
339                            data,
340                            content,
341                            credential,
342                        ))
343                    }
344
345                    FramedContentBody::Proposal(Proposal::Remove(_)) => {
346                        let content = ProcessedMessageContent::ProposalMessage(Box::new(
347                            QueuedProposal::from_authenticated_content_by_ref(
348                                self.ciphersuite(),
349                                provider.crypto(),
350                                content,
351                            )?,
352                        ));
353                        Ok(ProcessedMessage::new(
354                            self.group_id().clone(),
355                            self.context().epoch(),
356                            sender,
357                            data,
358                            content,
359                            credential,
360                        ))
361                    }
362                    FramedContentBody::Proposal(Proposal::Add(_)) => {
363                        let content = ProcessedMessageContent::ProposalMessage(Box::new(
364                            QueuedProposal::from_authenticated_content_by_ref(
365                                self.ciphersuite(),
366                                provider.crypto(),
367                                content,
368                            )?,
369                        ));
370                        Ok(ProcessedMessage::new(
371                            self.group_id().clone(),
372                            self.context().epoch(),
373                            sender,
374                            data,
375                            content,
376                            credential,
377                        ))
378                    }
379                    // TODO #151/#106
380                    FramedContentBody::Proposal(_) => {
381                        Err(ProcessMessageError::UnsupportedProposalType)
382                    }
383                    FramedContentBody::Commit(_) => {
384                        Err(ProcessMessageError::UnauthorizedExternalCommitMessage)
385                    }
386                }
387            }
388        }
389    }
390
391    /// Performs framing validation and, if necessary, decrypts the given message.
392    ///
393    /// Returns the [`DecryptedMessage`] if processing is successful, or a
394    /// [`ValidationError`] if it is not.
395    ///
396    /// Checks the following semantic validation:
397    ///  - ValSem002
398    ///  - ValSem003
399    ///  - ValSem006
400    ///  - ValSem007 MembershipTag presence
401    ///  - https://validation.openmls.tech/#valn1202
402    pub(crate) fn decrypt_message(
403        &mut self,
404        crypto: &impl OpenMlsCrypto,
405        message: ProtocolMessage,
406        sender_ratchet_configuration: &SenderRatchetConfiguration,
407    ) -> Result<DecryptedMessage, ValidationError> {
408        // Checks the following semantic validation:
409        //  - ValSem002
410        //  - ValSem003
411        self.public_group.validate_framing(&message)?;
412
413        let epoch = message.epoch();
414
415        // Checks the following semantic validation:
416        //  - ValSem006
417        //  - ValSem007 MembershipTag presence
418        match message {
419            ProtocolMessage::PublicMessage(public_message) => {
420                // If the message is older than the current epoch, we need to fetch the correct secret tree first.
421                let message_secrets =
422                    self.message_secrets_for_epoch(epoch).map_err(|e| match e {
423                        SecretTreeError::TooDistantInThePast => ValidationError::NoPastEpochData,
424                        _ => LibraryError::custom(
425                            "Unexpected error while retrieving message secrets for epoch.",
426                        )
427                        .into(),
428                    })?;
429                DecryptedMessage::from_inbound_public_message(
430                    *public_message,
431                    message_secrets,
432                    message_secrets.serialized_context().to_vec(),
433                    crypto,
434                    self.ciphersuite(),
435                )
436            }
437            ProtocolMessage::PrivateMessage(ciphertext) => {
438                // If the message is older than the current epoch, we need to fetch the correct secret tree first
439                DecryptedMessage::from_inbound_ciphertext(
440                    ciphertext,
441                    crypto,
442                    self,
443                    sender_ratchet_configuration,
444                )
445            }
446        }
447    }
448}