Skip to content

Commit

Permalink
Merge pull request #6 from algesten/fix/respond-reset
Browse files Browse the repository at this point in the history
Respond with ParamOutgoingResetRequest
  • Loading branch information
rainliu authored Aug 24, 2023
2 parents e93d256 + ac26f74 commit a8edab0
Showing 1 changed file with 50 additions and 15 deletions.
65 changes: 50 additions & 15 deletions src/association/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1328,15 +1328,11 @@ impl Association {
let mut pp = vec![];

if let Some(param_a) = &c.param_a {
if let Some(p) = self.handle_reconfig_param(param_a)? {
pp.push(p);
}
self.handle_reconfig_param(param_a, &mut pp)?;
}

if let Some(param_b) = &c.param_b {
if let Some(p) = self.handle_reconfig_param(param_b)? {
pp.push(p);
}
self.handle_reconfig_param(param_b, &mut pp)?;
}

Ok(pp)
Expand Down Expand Up @@ -1491,9 +1487,7 @@ impl Association {
let rst_reqs: Vec<ParamOutgoingResetRequest> =
self.reconfig_requests.values().cloned().collect();
for rst_req in rst_reqs {
let resp = self.reset_streams_if_any(&rst_req);
debug!("[{}] RESET RESPONSE: {}", self.side, resp);
reply.push(resp);
self.reset_streams_if_any(&rst_req, false, &mut reply)?;
}
}

Expand Down Expand Up @@ -1529,17 +1523,19 @@ impl Association {
fn handle_reconfig_param(
&mut self,
raw: &Box<dyn Param + Send + Sync>,
) -> Result<Option<Packet>> {
reply: &mut Vec<Packet>,
) -> Result<()> {
if let Some(p) = raw.as_any().downcast_ref::<ParamOutgoingResetRequest>() {
self.reconfig_requests
.insert(p.reconfig_request_sequence_number, p.clone());
Ok(Some(self.reset_streams_if_any(p)))
self.reset_streams_if_any(p, true, reply)?;
Ok(())
} else if let Some(p) = raw.as_any().downcast_ref::<ParamReconfigResponse>() {
self.reconfigs.remove(&p.reconfig_response_sequence_number);
if self.reconfigs.is_empty() {
self.timers.stop(Timer::Reconfig);
}
Ok(None)
Ok(())
} else {
Err(Error::ErrParameterType)
}
Expand Down Expand Up @@ -1847,15 +1843,25 @@ impl Association {
}
}

fn reset_streams_if_any(&mut self, p: &ParamOutgoingResetRequest) -> Packet {
fn reset_streams_if_any(
&mut self,
p: &ParamOutgoingResetRequest,
respond: bool,
reply: &mut Vec<Packet>,
) -> Result<()> {
let mut result = ReconfigResult::SuccessPerformed;
let mut sis_to_reset = vec![];

if sna32lte(p.sender_last_tsn, self.peer_last_tsn) {
debug!(
"[{}] resetStream(): senderLastTSN={} <= peer_last_tsn={}",
self.side, p.sender_last_tsn, self.peer_last_tsn
);
for id in &p.stream_identifiers {
if self.streams.contains_key(id) {
if respond {
sis_to_reset.push(*id);
}
self.unregister_stream(*id);
}
}
Expand All @@ -1869,13 +1875,42 @@ impl Association {
result = ReconfigResult::InProgress;
}

self.create_packet(vec![Box::new(ChunkReconfig {
// Answer incoming reset requests with the same reset request, but with
// reconfig_response_sequence_number.
if !sis_to_reset.is_empty() {
let rsn = self.generate_next_rsn();
let tsn = self.my_next_tsn - 1;

let c = ChunkReconfig {
param_a: Some(Box::new(ParamOutgoingResetRequest {
reconfig_request_sequence_number: rsn,
reconfig_response_sequence_number: p.reconfig_request_sequence_number,
sender_last_tsn: tsn,
stream_identifiers: sis_to_reset,
..Default::default()
})),
..Default::default()
};

self.reconfigs.insert(rsn, c.clone()); // store in the map for retransmission

let p = self.create_packet(vec![Box::new(c)]);
reply.push(p);
}

let packet = self.create_packet(vec![Box::new(ChunkReconfig {
param_a: Some(Box::new(ParamReconfigResponse {
reconfig_response_sequence_number: p.reconfig_request_sequence_number,
result,
})),
param_b: None,
})])
})]);

debug!("[{}] RESET RESPONSE: {}", self.side, packet);

reply.push(packet);

Ok(())
}

/// create_packet wraps chunks in a packet.
Expand Down

0 comments on commit a8edab0

Please sign in to comment.