1use std::mem;
4
5use errors::{CommitToPendingProposalsError, MergePendingCommitError};
6use openmls_traits::{crypto::OpenMlsCrypto, signatures::Signer, storage::StorageProvider as _};
7
8use crate::{
9 framing::mls_content::FramedContentBody,
10 group::{errors::MergeCommitError, StageCommitError, ValidationError},
11 messages::group_info::GroupInfo,
12 storage::OpenMlsProvider,
13 tree::sender_ratchet::SenderRatchetConfiguration,
14};
15
16use super::{errors::ProcessMessageError, *};
17
18impl MlsGroup {
19 pub fn process_message<Provider: OpenMlsProvider>(
29 &mut self,
30 provider: &Provider,
31 message: impl Into<ProtocolMessage>,
32 ) -> Result<ProcessedMessage, ProcessMessageError> {
33 if !self.is_active() {
35 return Err(ProcessMessageError::GroupStateError(
36 MlsGroupStateError::UseAfterEviction,
37 ));
38 }
39 let message = message.into();
40
41 if !message.is_external()
43 && message.is_handshake_message()
44 && !self
45 .configuration()
46 .wire_format_policy()
47 .incoming()
48 .is_compatible_with(message.wire_format())
49 {
50 return Err(ProcessMessageError::IncompatibleWireFormat);
51 }
52
53 let sender_ratchet_configuration = *self.configuration().sender_ratchet_configuration();
55
56 let decrypted_message =
62 self.decrypt_message(provider.crypto(), message, &sender_ratchet_configuration)?;
63
64 let unverified_message = self
65 .public_group
66 .parse_message(decrypted_message, &self.message_secrets_store)
67 .map_err(ProcessMessageError::from)?;
68
69 let (old_epoch_keypairs, leaf_node_keypairs) =
71 if let ContentType::Commit = unverified_message.content_type() {
72 self.read_decryption_keypairs(provider, &self.own_leaf_nodes)?
73 } else {
74 (vec![], vec![])
75 };
76
77 self.process_unverified_message(
78 provider,
79 unverified_message,
80 old_epoch_keypairs,
81 leaf_node_keypairs,
82 )
83 }
84
85 pub fn store_pending_proposal<Storage: StorageProvider>(
87 &mut self,
88 storage: &Storage,
89 proposal: QueuedProposal,
90 ) -> Result<(), Storage::Error> {
91 storage.queue_proposal(self.group_id(), &proposal.proposal_reference(), &proposal)?;
92 self.proposal_store_mut().add(proposal);
94
95 Ok(())
96 }
97
98 #[allow(clippy::type_complexity)]
109 pub fn commit_to_pending_proposals<Provider: OpenMlsProvider>(
110 &mut self,
111 provider: &Provider,
112 signer: &impl Signer,
113 ) -> Result<
114 (MlsMessageOut, Option<MlsMessageOut>, Option<GroupInfo>),
115 CommitToPendingProposalsError<Provider::StorageError>,
116 > {
117 self.is_operational()?;
118
119 let (commit, welcome, group_info) = self
122 .commit_builder()
123 .consume_proposal_store(true)
125 .load_psks(provider.storage())?
126 .build(provider.rand(), provider.crypto(), signer, |_| true)?
127 .stage_commit(provider)?
128 .into_contents();
129
130 Ok((
131 commit,
132 welcome.map(|welcome| MlsMessageOut::from_welcome(welcome, self.version())),
134 group_info,
135 ))
136 }
137
138 pub fn merge_staged_commit<Provider: OpenMlsProvider>(
141 &mut self,
142 provider: &Provider,
143 staged_commit: StagedCommit,
144 ) -> Result<(), MergeCommitError<Provider::StorageError>> {
145 if staged_commit.self_removed() {
147 self.group_state = MlsGroupState::Inactive;
148 }
149 provider
150 .storage()
151 .write_group_state(self.group_id(), &self.group_state)
152 .map_err(MergeCommitError::StorageError)?;
153
154 self.merge_commit(provider, staged_commit)?;
156
157 let resumption_psk = self.group_epoch_secrets().resumption_psk();
159 self.resumption_psk_store
160 .add(self.context().epoch(), resumption_psk.clone());
161 provider
162 .storage()
163 .write_resumption_psk_store(self.group_id(), &self.resumption_psk_store)
164 .map_err(MergeCommitError::StorageError)?;
165
166 self.own_leaf_nodes.clear();
168 provider
169 .storage()
170 .delete_own_leaf_nodes(self.group_id())
171 .map_err(MergeCommitError::StorageError)?;
172
173 self.clear_pending_commit(provider.storage())
175 .map_err(MergeCommitError::StorageError)?;
176
177 Ok(())
178 }
179
180 pub fn merge_pending_commit<Provider: OpenMlsProvider>(
183 &mut self,
184 provider: &Provider,
185 ) -> Result<(), MergePendingCommitError<Provider::StorageError>> {
186 match &self.group_state {
187 MlsGroupState::PendingCommit(_) => {
188 let old_state = mem::replace(&mut self.group_state, MlsGroupState::Operational);
189 if let MlsGroupState::PendingCommit(pending_commit_state) = old_state {
190 self.merge_staged_commit(provider, (*pending_commit_state).into())?;
191 }
192 Ok(())
193 }
194 MlsGroupState::Inactive => Err(MlsGroupStateError::UseAfterEviction)?,
195 MlsGroupState::Operational => Ok(()),
196 }
197 }
198
199 pub(super) fn read_decryption_keypairs(
201 &self,
202 provider: &impl OpenMlsProvider,
203 own_leaf_nodes: &[LeafNode],
204 ) -> Result<(Vec<EncryptionKeyPair>, Vec<EncryptionKeyPair>), StageCommitError> {
205 let old_epoch_keypairs = self.read_epoch_keypairs(provider.storage()).map_err(|e| {
207 log::error!("Error reading epoch keypairs: {:?}", e);
208 StageCommitError::MissingDecryptionKey
209 })?;
210
211 let leaf_node_keypairs = own_leaf_nodes
215 .iter()
216 .map(|leaf_node| {
217 EncryptionKeyPair::read(provider, leaf_node.encryption_key())
218 .ok_or(StageCommitError::MissingDecryptionKey)
219 })
220 .collect::<Result<Vec<EncryptionKeyPair>, StageCommitError>>()?;
221
222 Ok((old_epoch_keypairs, leaf_node_keypairs))
223 }
224
225 pub(crate) fn process_unverified_message<Provider: OpenMlsProvider>(
254 &self,
255 provider: &Provider,
256 unverified_message: UnverifiedMessage,
257 old_epoch_keypairs: Vec<EncryptionKeyPair>,
258 leaf_node_keypairs: Vec<EncryptionKeyPair>,
259 ) -> Result<ProcessedMessage, ProcessMessageError> {
260 let (content, credential) =
266 unverified_message.verify(self.ciphersuite(), provider.crypto(), self.version())?;
267
268 match content.sender() {
269 Sender::Member(_) | Sender::NewMemberCommit | Sender::NewMemberProposal => {
270 let sender = content.sender().clone();
271 let authenticated_data = content.authenticated_data().to_owned();
272 let epoch = content.epoch();
273
274 let content = match content.content() {
275 FramedContentBody::Application(application_message) => {
276 ProcessedMessageContent::ApplicationMessage(ApplicationMessage::new(
277 application_message.as_slice().to_owned(),
278 ))
279 }
280 FramedContentBody::Proposal(_) => {
281 let proposal = Box::new(QueuedProposal::from_authenticated_content_by_ref(
282 self.ciphersuite(),
283 provider.crypto(),
284 content,
285 )?);
286
287 if matches!(sender, Sender::NewMemberProposal) {
288 ProcessedMessageContent::ExternalJoinProposalMessage(proposal)
289 } else {
290 ProcessedMessageContent::ProposalMessage(proposal)
291 }
292 }
293 FramedContentBody::Commit(_) => {
294 let staged_commit = self.stage_commit(
295 &content,
296 old_epoch_keypairs,
297 leaf_node_keypairs,
298 provider,
299 )?;
300 ProcessedMessageContent::StagedCommitMessage(Box::new(staged_commit))
301 }
302 };
303
304 Ok(ProcessedMessage::new(
305 self.group_id().clone(),
306 epoch,
307 sender,
308 authenticated_data,
309 content,
310 credential,
311 ))
312 }
313 Sender::External(_) => {
314 let sender = content.sender().clone();
315 let data = content.authenticated_data().to_owned();
316 match content.content() {
317 FramedContentBody::Application(_) => {
318 Err(ProcessMessageError::UnauthorizedExternalApplicationMessage)
319 }
320 FramedContentBody::Proposal(Proposal::Remove(_)) => {
321 let content = ProcessedMessageContent::ProposalMessage(Box::new(
322 QueuedProposal::from_authenticated_content_by_ref(
323 self.ciphersuite(),
324 provider.crypto(),
325 content,
326 )?,
327 ));
328 Ok(ProcessedMessage::new(
329 self.group_id().clone(),
330 self.context().epoch(),
331 sender,
332 data,
333 content,
334 credential,
335 ))
336 }
337 FramedContentBody::Proposal(Proposal::Add(_)) => {
338 let content = ProcessedMessageContent::ProposalMessage(Box::new(
339 QueuedProposal::from_authenticated_content(
340 self.ciphersuite(),
341 provider.crypto(),
342 content,
343 ProposalOrRefType::Proposal,
344 )?,
345 ));
346 Ok(ProcessedMessage::new(
347 self.group_id().clone(),
348 self.context().epoch(),
349 sender,
350 data,
351 content,
352 credential,
353 ))
354 }
355 FramedContentBody::Proposal(_) => {
357 Err(ProcessMessageError::UnsupportedProposalType)
358 }
359 FramedContentBody::Commit(_) => {
360 Err(ProcessMessageError::UnauthorizedExternalCommitMessage)
361 }
362 }
363 }
364 }
365 }
366
367 pub(crate) fn decrypt_message(
379 &mut self,
380 crypto: &impl OpenMlsCrypto,
381 message: ProtocolMessage,
382 sender_ratchet_configuration: &SenderRatchetConfiguration,
383 ) -> Result<DecryptedMessage, ValidationError> {
384 self.public_group.validate_framing(&message)?;
388
389 let epoch = message.epoch();
390
391 match message {
395 ProtocolMessage::PublicMessage(public_message) => {
396 let message_secrets =
398 self.message_secrets_for_epoch(epoch).map_err(|e| match e {
399 SecretTreeError::TooDistantInThePast => ValidationError::NoPastEpochData,
400 _ => LibraryError::custom(
401 "Unexpected error while retrieving message secrets for epoch.",
402 )
403 .into(),
404 })?;
405 DecryptedMessage::from_inbound_public_message(
406 *public_message,
407 message_secrets,
408 message_secrets.serialized_context().to_vec(),
409 crypto,
410 self.ciphersuite(),
411 )
412 }
413 ProtocolMessage::PrivateMessage(ciphertext) => {
414 DecryptedMessage::from_inbound_ciphertext(
416 ciphertext,
417 crypto,
418 self,
419 sender_ratchet_configuration,
420 )
421 }
422 }
423 }
424}