From 9bbba996c124b5d4536f3b37499c99f902f62ce9 Mon Sep 17 00:00:00 2001 From: bruingineer Date: Thu, 1 Jan 2026 18:09:54 -0800 Subject: [PATCH] use unique tx,rx pair in terminate 2 source test --- docker-linux/run_tests.sh | 2 +- tests/ipv4_tests.rs | 28 ++++++++++++++++------------ 2 files changed, 17 insertions(+), 13 deletions(-) diff --git a/docker-linux/run_tests.sh b/docker-linux/run_tests.sh index 532e646..13aeb44 100644 --- a/docker-linux/run_tests.sh +++ b/docker-linux/run_tests.sh @@ -22,4 +22,4 @@ ip a show dev lo # sysctl -p cargo test -cargo test --test ipv4_tests -- --ignored --nocapture --test-threads=1 +cargo test --test ipv4_tests -- --ignored --test-threads=1 diff --git a/tests/ipv4_tests.rs b/tests/ipv4_tests.rs index 0f6face..717d1f2 100644 --- a/tests/ipv4_tests.rs +++ b/tests/ipv4_tests.rs @@ -13,7 +13,7 @@ extern crate uuid; extern crate socket2; use std::io::Read; -use std::{thread}; +use std::{array, thread}; use std::thread::sleep; use std::sync::mpsc; use std::sync::mpsc::{Sender, SyncSender, Receiver, RecvTimeoutError}; @@ -2256,12 +2256,13 @@ fn test_receiver_source_limit_2_termination_check() { let mut snd_threads = Vec::new(); - let (snd_tx, snd_rx): (SyncSender<()>, Receiver<()>) = mpsc::sync_channel(0); // Used for handshaking, allows syncing the sender states. + let sender_channels: [(SyncSender<()>, Receiver<()>); SND_THREADS] = array::from_fn(|_| mpsc::sync_channel(0)); const BASE_UNIVERSE: u16 = 2; for i in 0 .. SND_THREADS { - let tx = snd_tx.clone(); + // use a unique tx,rx pair per send thread to prevent race condition when using the same tx,rx between threads + let tx = sender_channels[i].0.clone(); let data = [1, 2, 3]; @@ -2295,14 +2296,16 @@ fn test_receiver_source_limit_2_termination_check() { dmx_recv.listen_universes(&[i]).unwrap(); } - snd_rx.recv().unwrap(); - snd_rx.recv().unwrap(); + for (_, snd_rx) in &sender_channels { + snd_rx.recv().unwrap(); + } // Asserts that the recv attempts are successful. - dmx_recv.recv(RECV_TIMEOUT).unwrap(); - dmx_recv.recv(RECV_TIMEOUT).unwrap(); - dmx_recv.recv(RECV_TIMEOUT).unwrap(); - dmx_recv.recv(RECV_TIMEOUT).unwrap(); + dmx_recv.recv(RECV_TIMEOUT).expect("dmx_recv.recv() #1 failed."); + dmx_recv.recv(RECV_TIMEOUT).expect("dmx_recv.recv() #2 failed."); + dmx_recv.recv(RECV_TIMEOUT).expect("dmx_recv.recv() #3 failed."); + dmx_recv.recv(RECV_TIMEOUT).expect("dmx_recv.recv() #4 failed."); + // The first source is held back from terminating but the second source should terminate. let second_thread = snd_threads.remove(1); @@ -2318,15 +2321,16 @@ fn test_receiver_source_limit_2_termination_check() { // New source now sends twice which the receiver should receive. src.send(&[BASE_UNIVERSE], &data, None, None, None).unwrap(); + src.send(&[BASE_UNIVERSE], &data, None, None, None).unwrap(); }); // Asserts that the recv attempts are successful (no source exceeded). - dmx_recv.recv(RECV_TIMEOUT).unwrap(); - dmx_recv.recv(RECV_TIMEOUT).unwrap(); + dmx_recv.recv(RECV_TIMEOUT).expect("dmx_recv.recv() #5 failed."); + dmx_recv.recv(RECV_TIMEOUT).expect("dmx_recv.recv() #6 failed."); // Allow the first source to progress and finish. - snd_rx.recv().unwrap(); + sender_channels[0].1.recv().unwrap(); let first_thread = snd_threads.remove(0); first_thread.join().unwrap();