openmls/group/mls_group/
processing.rs

1//! Processing functions of an [`MlsGroup`] for incoming messages.
2
3use std::mem;
4
5use errors::{CommitToPendingProposalsError, MergePendingCommitError};
6use openmls_traits::{crypto::OpenMlsCrypto, signatures::Signer, storage::StorageProvider as _};
7
8use crate::{
9    framing::mls_content::FramedContentBody,
10    group::{errors::MergeCommitError, StageCommitError, ValidationError},
11    messages::group_info::GroupInfo,
12    storage::OpenMlsProvider,
13    tree::sender_ratchet::SenderRatchetConfiguration,
14};
15
16use super::{errors::ProcessMessageError, *};
17
18impl MlsGroup {
19    /// Parses incoming messages from the DS. Checks for syntactic errors and
20    /// makes some semantic checks as well. If the input is an encrypted
21    /// message, it will be decrypted. This processing function does syntactic
22    /// and semantic validation of the message. It returns a [ProcessedMessage]
23    /// enum.
24    ///
25    /// # Errors:
26    /// Returns an [`ProcessMessageError`] when the validation checks fail
27    /// with the exact reason of the failure.
28    pub fn process_message<Provider: OpenMlsProvider>(
29        &mut self,
30        provider: &Provider,
31        message: impl Into<ProtocolMessage>,
32    ) -> Result<ProcessedMessage, ProcessMessageError> {
33        // Make sure we are still a member of the group
34        if !self.is_active() {
35            return Err(ProcessMessageError::GroupStateError(
36                MlsGroupStateError::UseAfterEviction,
37            ));
38        }
39        let message = message.into();
40
41        // Check that handshake messages are compatible with the incoming wire format policy
42        if !message.is_external()
43            && message.is_handshake_message()
44            && !self
45                .configuration()
46                .wire_format_policy()
47                .incoming()
48                .is_compatible_with(message.wire_format())
49        {
50            return Err(ProcessMessageError::IncompatibleWireFormat);
51        }
52
53        // Parse the message
54        let sender_ratchet_configuration = *self.configuration().sender_ratchet_configuration();
55
56        // Checks the following semantic validation:
57        //  - ValSem002
58        //  - ValSem003
59        //  - ValSem006
60        //  - ValSem007 MembershipTag presence
61        let decrypted_message =
62            self.decrypt_message(provider.crypto(), message, &sender_ratchet_configuration)?;
63
64        let unverified_message = self
65            .public_group
66            .parse_message(decrypted_message, &self.message_secrets_store)
67            .map_err(ProcessMessageError::from)?;
68
69        // If this is a commit, we need to load the private key material we need for decryption.
70        let (old_epoch_keypairs, leaf_node_keypairs) =
71            if let ContentType::Commit = unverified_message.content_type() {
72                self.read_decryption_keypairs(provider, &self.own_leaf_nodes)?
73            } else {
74                (vec![], vec![])
75            };
76
77        self.process_unverified_message(
78            provider,
79            unverified_message,
80            old_epoch_keypairs,
81            leaf_node_keypairs,
82        )
83    }
84
85    /// Stores a standalone proposal in the internal [ProposalStore]
86    pub fn store_pending_proposal<Storage: StorageProvider>(
87        &mut self,
88        storage: &Storage,
89        proposal: QueuedProposal,
90    ) -> Result<(), Storage::Error> {
91        storage.queue_proposal(self.group_id(), &proposal.proposal_reference(), &proposal)?;
92        // Store the proposal in in the internal ProposalStore
93        self.proposal_store_mut().add(proposal);
94
95        Ok(())
96    }
97
98    /// Creates a Commit message that covers the pending proposals that are
99    /// currently stored in the group's [ProposalStore]. The Commit message is
100    /// created even if there are no valid pending proposals.
101    ///
102    /// Returns an error if there is a pending commit. Otherwise it returns a
103    /// tuple of `Commit, Option<Welcome>, Option<GroupInfo>`, where `Commit`
104    /// and [`Welcome`] are MlsMessages of the type [`MlsMessageOut`].
105    ///
106    /// [`Welcome`]: crate::messages::Welcome
107    // FIXME: #1217
108    #[allow(clippy::type_complexity)]
109    pub fn commit_to_pending_proposals<Provider: OpenMlsProvider>(
110        &mut self,
111        provider: &Provider,
112        signer: &impl Signer,
113    ) -> Result<
114        (MlsMessageOut, Option<MlsMessageOut>, Option<GroupInfo>),
115        CommitToPendingProposalsError<Provider::StorageError>,
116    > {
117        self.is_operational()?;
118
119        // Build and stage the commit using the commit builder
120        // TODO #751
121        let (commit, welcome, group_info) = self
122            .commit_builder()
123            // This forces committing to the proposals in the proposal store:
124            .consume_proposal_store(true)
125            .load_psks(provider.storage())?
126            .build(provider.rand(), provider.crypto(), signer, |_| true)?
127            .stage_commit(provider)?
128            .into_contents();
129
130        Ok((
131            commit,
132            // Turn the [`Welcome`] to an [`MlsMessageOut`], if there is one
133            welcome.map(|welcome| MlsMessageOut::from_welcome(welcome, self.version())),
134            group_info,
135        ))
136    }
137
138    /// Merge a [StagedCommit] into the group after inspection. As this advances
139    /// the epoch of the group, it also clears any pending commits.
140    pub fn merge_staged_commit<Provider: OpenMlsProvider>(
141        &mut self,
142        provider: &Provider,
143        staged_commit: StagedCommit,
144    ) -> Result<(), MergeCommitError<Provider::StorageError>> {
145        // Check if we were removed from the group
146        if staged_commit.self_removed() {
147            self.group_state = MlsGroupState::Inactive;
148        }
149        provider
150            .storage()
151            .write_group_state(self.group_id(), &self.group_state)
152            .map_err(MergeCommitError::StorageError)?;
153
154        // Merge staged commit
155        self.merge_commit(provider, staged_commit)?;
156
157        // Extract and store the resumption psk for the current epoch
158        let resumption_psk = self.group_epoch_secrets().resumption_psk();
159        self.resumption_psk_store
160            .add(self.context().epoch(), resumption_psk.clone());
161        provider
162            .storage()
163            .write_resumption_psk_store(self.group_id(), &self.resumption_psk_store)
164            .map_err(MergeCommitError::StorageError)?;
165
166        // Delete own KeyPackageBundles
167        self.own_leaf_nodes.clear();
168        provider
169            .storage()
170            .delete_own_leaf_nodes(self.group_id())
171            .map_err(MergeCommitError::StorageError)?;
172
173        // Delete a potential pending commit
174        self.clear_pending_commit(provider.storage())
175            .map_err(MergeCommitError::StorageError)?;
176
177        Ok(())
178    }
179
180    /// Merges the pending [`StagedCommit`] if there is one, and
181    /// clears the field by setting it to `None`.
182    pub fn merge_pending_commit<Provider: OpenMlsProvider>(
183        &mut self,
184        provider: &Provider,
185    ) -> Result<(), MergePendingCommitError<Provider::StorageError>> {
186        match &self.group_state {
187            MlsGroupState::PendingCommit(_) => {
188                let old_state = mem::replace(&mut self.group_state, MlsGroupState::Operational);
189                if let MlsGroupState::PendingCommit(pending_commit_state) = old_state {
190                    self.merge_staged_commit(provider, (*pending_commit_state).into())?;
191                }
192                Ok(())
193            }
194            MlsGroupState::Inactive => Err(MlsGroupStateError::UseAfterEviction)?,
195            MlsGroupState::Operational => Ok(()),
196        }
197    }
198
199    /// Helper function to read decryption keypairs.
200    pub(super) fn read_decryption_keypairs(
201        &self,
202        provider: &impl OpenMlsProvider,
203        own_leaf_nodes: &[LeafNode],
204    ) -> Result<(Vec<EncryptionKeyPair>, Vec<EncryptionKeyPair>), StageCommitError> {
205        // All keys from the previous epoch are potential decryption keypairs.
206        let old_epoch_keypairs = self.read_epoch_keypairs(provider.storage()).map_err(|e| {
207            log::error!("Error reading epoch keypairs: {:?}", e);
208            StageCommitError::MissingDecryptionKey
209        })?;
210
211        // If we are processing an update proposal that originally came from
212        // us, the keypair corresponding to the leaf in the update is also a
213        // potential decryption keypair.
214        let leaf_node_keypairs = own_leaf_nodes
215            .iter()
216            .map(|leaf_node| {
217                EncryptionKeyPair::read(provider, leaf_node.encryption_key())
218                    .ok_or(StageCommitError::MissingDecryptionKey)
219            })
220            .collect::<Result<Vec<EncryptionKeyPair>, StageCommitError>>()?;
221
222        Ok((old_epoch_keypairs, leaf_node_keypairs))
223    }
224
225    /// This processing function does most of the semantic verifications.
226    /// It returns a [ProcessedMessage] enum.
227    /// Checks the following semantic validation:
228    ///  - ValSem008
229    ///  - ValSem010
230    ///  - ValSem101
231    ///  - ValSem102
232    ///  - ValSem104
233    ///  - ValSem106
234    ///  - ValSem107
235    ///  - ValSem108
236    ///  - ValSem110
237    ///  - ValSem111
238    ///  - ValSem112
239    ///  - ValSem113: All Proposals: The proposal type must be supported by all
240    ///    members of the group
241    ///  - ValSem200
242    ///  - ValSem201
243    ///  - ValSem202: Path must be the right length
244    ///  - ValSem203: Path secrets must decrypt correctly
245    ///  - ValSem204: Public keys from Path must be verified and match the
246    ///    private keys from the direct path
247    ///  - ValSem205
248    ///  - ValSem240
249    ///  - ValSem241
250    ///  - ValSem242
251    ///  - ValSem244
252    ///  - ValSem246 (as part of ValSem010)
253    pub(crate) fn process_unverified_message<Provider: OpenMlsProvider>(
254        &self,
255        provider: &Provider,
256        unverified_message: UnverifiedMessage,
257        old_epoch_keypairs: Vec<EncryptionKeyPair>,
258        leaf_node_keypairs: Vec<EncryptionKeyPair>,
259    ) -> Result<ProcessedMessage, ProcessMessageError> {
260        // Checks the following semantic validation:
261        //  - ValSem010
262        //  - ValSem246 (as part of ValSem010)
263        //  - https://validation.openmls.tech/#valn1302
264        //  - https://validation.openmls.tech/#valn1304
265        let (content, credential) =
266            unverified_message.verify(self.ciphersuite(), provider.crypto(), self.version())?;
267
268        match content.sender() {
269            Sender::Member(_) | Sender::NewMemberCommit | Sender::NewMemberProposal => {
270                let sender = content.sender().clone();
271                let authenticated_data = content.authenticated_data().to_owned();
272                let epoch = content.epoch();
273
274                let content = match content.content() {
275                    FramedContentBody::Application(application_message) => {
276                        ProcessedMessageContent::ApplicationMessage(ApplicationMessage::new(
277                            application_message.as_slice().to_owned(),
278                        ))
279                    }
280                    FramedContentBody::Proposal(_) => {
281                        let proposal = Box::new(QueuedProposal::from_authenticated_content_by_ref(
282                            self.ciphersuite(),
283                            provider.crypto(),
284                            content,
285                        )?);
286
287                        if matches!(sender, Sender::NewMemberProposal) {
288                            ProcessedMessageContent::ExternalJoinProposalMessage(proposal)
289                        } else {
290                            ProcessedMessageContent::ProposalMessage(proposal)
291                        }
292                    }
293                    FramedContentBody::Commit(_) => {
294                        let staged_commit = self.stage_commit(
295                            &content,
296                            old_epoch_keypairs,
297                            leaf_node_keypairs,
298                            provider,
299                        )?;
300                        ProcessedMessageContent::StagedCommitMessage(Box::new(staged_commit))
301                    }
302                };
303
304                Ok(ProcessedMessage::new(
305                    self.group_id().clone(),
306                    epoch,
307                    sender,
308                    authenticated_data,
309                    content,
310                    credential,
311                ))
312            }
313            Sender::External(_) => {
314                let sender = content.sender().clone();
315                let data = content.authenticated_data().to_owned();
316                match content.content() {
317                    FramedContentBody::Application(_) => {
318                        Err(ProcessMessageError::UnauthorizedExternalApplicationMessage)
319                    }
320                    FramedContentBody::Proposal(Proposal::Remove(_)) => {
321                        let content = ProcessedMessageContent::ProposalMessage(Box::new(
322                            QueuedProposal::from_authenticated_content_by_ref(
323                                self.ciphersuite(),
324                                provider.crypto(),
325                                content,
326                            )?,
327                        ));
328                        Ok(ProcessedMessage::new(
329                            self.group_id().clone(),
330                            self.context().epoch(),
331                            sender,
332                            data,
333                            content,
334                            credential,
335                        ))
336                    }
337                    FramedContentBody::Proposal(Proposal::Add(_)) => {
338                        let content = ProcessedMessageContent::ProposalMessage(Box::new(
339                            QueuedProposal::from_authenticated_content(
340                                self.ciphersuite(),
341                                provider.crypto(),
342                                content,
343                                ProposalOrRefType::Proposal,
344                            )?,
345                        ));
346                        Ok(ProcessedMessage::new(
347                            self.group_id().clone(),
348                            self.context().epoch(),
349                            sender,
350                            data,
351                            content,
352                            credential,
353                        ))
354                    }
355                    // TODO #151/#106
356                    FramedContentBody::Proposal(_) => {
357                        Err(ProcessMessageError::UnsupportedProposalType)
358                    }
359                    FramedContentBody::Commit(_) => {
360                        Err(ProcessMessageError::UnauthorizedExternalCommitMessage)
361                    }
362                }
363            }
364        }
365    }
366
367    /// Performs framing validation and, if necessary, decrypts the given message.
368    ///
369    /// Returns the [`DecryptedMessage`] if processing is successful, or a
370    /// [`ValidationError`] if it is not.
371    ///
372    /// Checks the following semantic validation:
373    ///  - ValSem002
374    ///  - ValSem003
375    ///  - ValSem006
376    ///  - ValSem007 MembershipTag presence
377    ///  - https://validation.openmls.tech/#valn1202
378    pub(crate) fn decrypt_message(
379        &mut self,
380        crypto: &impl OpenMlsCrypto,
381        message: ProtocolMessage,
382        sender_ratchet_configuration: &SenderRatchetConfiguration,
383    ) -> Result<DecryptedMessage, ValidationError> {
384        // Checks the following semantic validation:
385        //  - ValSem002
386        //  - ValSem003
387        self.public_group.validate_framing(&message)?;
388
389        let epoch = message.epoch();
390
391        // Checks the following semantic validation:
392        //  - ValSem006
393        //  - ValSem007 MembershipTag presence
394        match message {
395            ProtocolMessage::PublicMessage(public_message) => {
396                // If the message is older than the current epoch, we need to fetch the correct secret tree first.
397                let message_secrets =
398                    self.message_secrets_for_epoch(epoch).map_err(|e| match e {
399                        SecretTreeError::TooDistantInThePast => ValidationError::NoPastEpochData,
400                        _ => LibraryError::custom(
401                            "Unexpected error while retrieving message secrets for epoch.",
402                        )
403                        .into(),
404                    })?;
405                DecryptedMessage::from_inbound_public_message(
406                    *public_message,
407                    message_secrets,
408                    message_secrets.serialized_context().to_vec(),
409                    crypto,
410                    self.ciphersuite(),
411                )
412            }
413            ProtocolMessage::PrivateMessage(ciphertext) => {
414                // If the message is older than the current epoch, we need to fetch the correct secret tree first
415                DecryptedMessage::from_inbound_ciphertext(
416                    ciphertext,
417                    crypto,
418                    self,
419                    sender_ratchet_configuration,
420                )
421            }
422        }
423    }
424}