diff --git a/cros_async/src/sync/cv.rs b/cros_async/src/sync/cv.rs index 3a335da68f..830df821ca 100644 --- a/cros_async/src/sync/cv.rs +++ b/cros_async/src/sync/cv.rs @@ -229,10 +229,7 @@ impl Condvar { // Safe because the spin lock guarantees exclusive access and the reference does not escape // this function. let waiters = unsafe { &mut *self.waiters.get() }; - let (mut wake_list, all_readers) = get_wake_list(waiters); - - // Safe because the spin lock guarantees exclusive access. - let muptr = unsafe { (*self.mu.get()) as *const RawMutex }; + let wake_list = get_wake_list(waiters); let newstate = if waiters.is_empty() { // Also clear the mutex associated with this Condvar since there are no longer any @@ -247,17 +244,10 @@ impl Condvar { HAS_WAITERS }; - // Try to transfer waiters before releasing the spin lock. - if !wake_list.is_empty() { - // Safe because there was a waiter in the queue and the thread that owns the waiter also - // owns a reference to the Mutex, guaranteeing that the pointer is valid. - unsafe { (*muptr).transfer_waiters(&mut wake_list, all_readers) }; - } - // Release the spin lock. self.state.store(newstate, Ordering::Release); - // Now wake any waiters still left in the wake list. + // Now wake any waiters in the wake list. for w in wake_list { w.wake(); } @@ -295,22 +285,12 @@ impl Condvar { } // Safe because the spin lock guarantees exclusive access to `self.waiters`. - let mut wake_list = unsafe { (*self.waiters.get()).take() }; - - // Safe because the spin lock guarantees exclusive access. - let muptr = unsafe { (*self.mu.get()) as *const RawMutex }; + let wake_list = unsafe { (*self.waiters.get()).take() }; // Clear the mutex associated with this Condvar since there are no longer any waiters. Safe // because we the spin lock guarantees exclusive access. unsafe { *self.mu.get() = 0 }; - // Try to transfer waiters before releasing the spin lock. - if !wake_list.is_empty() { - // Safe because there was a waiter in the queue and the thread that owns the waiter also - // owns a reference to the Mutex, guaranteeing that the pointer is valid. - unsafe { (*muptr).transfer_waiters(&mut wake_list, false) }; - } - // Mark any waiters left as no longer waiting for the Condvar. for w in &wake_list { w.set_waiting_for(WaitingFor::None); @@ -319,7 +299,7 @@ impl Condvar { // Release the spin lock. We can clear all bits in the state since we took all the waiters. self.state.store(0, Ordering::Release); - // Now wake any waiters still left in the wake list. + // Now wake any waiters in the wake list. for w in wake_list { w.wake(); } @@ -373,24 +353,14 @@ impl Condvar { None }; - let (mut wake_list, all_readers) = if wake_next || waiting_for == WaitingFor::None { + let wake_list = if wake_next || waiting_for == WaitingFor::None { // Either the waiter was already woken or it's been removed from the condvar's waiter // list and is going to be woken. Either way, we need to wake up another thread. get_wake_list(waiters) } else { - (WaiterList::new(WaiterAdapter::new()), false) + WaiterList::new(WaiterAdapter::new()) }; - // Safe because the spin lock guarantees exclusive access. - let muptr = unsafe { (*self.mu.get()) as *const RawMutex }; - - // Try to transfer waiters before releasing the spin lock. - if !wake_list.is_empty() { - // Safe because there was a waiter in the queue and the thread that owns the waiter also - // owns a reference to the Mutex, guaranteeing that the pointer is valid. - unsafe { (*muptr).transfer_waiters(&mut wake_list, all_readers) }; - } - let set_on_release = if waiters.is_empty() { // Clear the mutex associated with this Condvar since there are no longer any waiters. Safe // because we the spin lock guarantees exclusive access. @@ -423,15 +393,14 @@ impl Default for Condvar { } } -// Scan `waiters` and return all waiters that should be woken up. If all waiters in the returned -// wait list are readers then the returned bool will be true. +// Scan `waiters` and return all waiters that should be woken up. // // If the first waiter is trying to acquire a shared lock, then all waiters in the list that are // waiting for a shared lock are also woken up. In addition one writer is woken up, if possible. // // If the first waiter is trying to acquire an exclusive lock, then only that waiter is returned and // the rest of the list is not scanned. -fn get_wake_list(waiters: &mut WaiterList) -> (WaiterList, bool) { +fn get_wake_list(waiters: &mut WaiterList) -> WaiterList { let mut to_wake = WaiterList::new(WaiterAdapter::new()); let mut cursor = waiters.front_mut(); @@ -445,7 +414,6 @@ fn get_wake_list(waiters: &mut WaiterList) -> (WaiterList, bool) { let waiter = cursor.remove().unwrap(); waiter.set_waiting_for(WaitingFor::None); to_wake.push_back(waiter); - all_readers = false; break; } @@ -475,7 +443,7 @@ fn get_wake_list(waiters: &mut WaiterList) -> (WaiterList, bool) { } } - (to_wake, all_readers) + to_wake } fn cancel_waiter(cv: usize, waiter: &Waiter, wake_next: bool) -> bool { @@ -1058,7 +1026,7 @@ mod test { } #[test] - fn cancel_after_notify() { + fn cancel_after_notify_one() { async fn dec(mu: Arc>, cv: Arc) { let mut count = mu.lock().await; @@ -1102,7 +1070,7 @@ mod test { } #[test] - fn cancel_after_transfer() { + fn cancel_after_notify_all() { async fn dec(mu: Arc>, cv: Arc) { let mut count = mu.lock().await; @@ -1134,63 +1102,7 @@ mod test { let mut count = block_on(mu.lock()); *count = 2; - // Notify the cv while holding the lock. Only transfer one waiter. - cv.notify_one(); - assert_eq!(cv.state.load(Ordering::Relaxed) & HAS_WAITERS, HAS_WAITERS); - - // Drop the lock and then the future. This should not cause fut2 to become runnable as it - // should still be in the Condvar's wait queue. - mem::drop(count); - mem::drop(fut1); - - if let Poll::Ready(()) = fut2.as_mut().poll(&mut cx) { - panic!("future unexpectedly ready"); - } - - // Now wake up fut2. Since the lock isn't held, it should wake up immediately. - cv.notify_one(); - if fut2.as_mut().poll(&mut cx).is_pending() { - panic!("future unable to complete"); - } - - assert_eq!(*block_on(mu.lock()), 1); - } - - #[test] - fn cancel_after_transfer_and_wake() { - async fn dec(mu: Arc>, cv: Arc) { - let mut count = mu.lock().await; - - while *count == 0 { - count = cv.wait(count).await; - } - - *count -= 1; - } - - let mu = Arc::new(Mutex::new(0)); - let cv = Arc::new(Condvar::new()); - - let arc_waker = Arc::new(TestWaker); - let waker = waker_ref(&arc_waker); - let mut cx = Context::from_waker(&waker); - - let mut fut1 = Box::pin(dec(mu.clone(), cv.clone())); - let mut fut2 = Box::pin(dec(mu.clone(), cv.clone())); - - if let Poll::Ready(()) = fut1.as_mut().poll(&mut cx) { - panic!("future unexpectedly ready"); - } - if let Poll::Ready(()) = fut2.as_mut().poll(&mut cx) { - panic!("future unexpectedly ready"); - } - assert_eq!(cv.state.load(Ordering::Relaxed) & HAS_WAITERS, HAS_WAITERS); - - let mut count = block_on(mu.lock()); - *count = 2; - - // Notify the cv while holding the lock. This should transfer both waiters to the mutex's - // wait queue. + // Notify the cv while holding the lock. This should wake up both waiters. cv.notify_all(); assert_eq!(cv.state.load(Ordering::Relaxed), 0); diff --git a/cros_async/src/sync/mu.rs b/cros_async/src/sync/mu.rs index 6bf827a575..bf35644fe2 100644 --- a/cros_async/src/sync/mu.rs +++ b/cros_async/src/sync/mu.rs @@ -36,8 +36,6 @@ const LONG_WAIT: usize = 1 << 5; const READ_LOCK: usize = 1 << 8; // Mask used for checking if any threads currently hold a shared lock. const READ_MASK: usize = !0xff; -// Mask used to check if the lock is held in either shared or exclusive mode. -const ANY_LOCK: usize = LOCKED | READ_MASK; // The number of times the thread should just spin and attempt to re-acquire the lock. const SPIN_THRESHOLD: usize = 7; @@ -507,105 +505,6 @@ impl RawMutex { } } - // Transfer waiters from the `Condvar` wait list to the `Mutex` wait list. `all_readers` may - // be set to true if all waiters are waiting to acquire a shared lock but should not be true if - // there is even one waiter waiting on an exclusive lock. - // - // This is similar to what the `FUTEX_CMP_REQUEUE` flag does on linux. - pub fn transfer_waiters(&self, new_waiters: &mut WaiterList, all_readers: bool) { - if new_waiters.is_empty() { - return; - } - - let mut oldstate = self.state.load(Ordering::Relaxed); - let can_acquire_read_lock = (oldstate & Shared::zero_to_acquire()) == 0; - - // The lock needs to be held in some mode or else the waiters we transfer now may never get - // woken up. Additionally, if all the new waiters are readers and can acquire the lock now - // then we can just wake them up. - if (oldstate & ANY_LOCK) == 0 || (all_readers && can_acquire_read_lock) { - // Nothing to do here. The Condvar will wake up all the waiters left in `new_waiters`. - return; - } - - if (oldstate & SPINLOCK) == 0 - && self - .state - .compare_exchange_weak( - oldstate, - oldstate | SPINLOCK | HAS_WAITERS, - Ordering::Acquire, - Ordering::Relaxed, - ) - .is_ok() - { - let mut transferred_writer = false; - - // Safe because the spin lock guarantees exclusive access and the reference does not - // escape this function. - let waiters = unsafe { &mut *self.waiters.get() }; - - let mut current = new_waiters.front_mut(); - while let Some(w) = current.get() { - match w.kind() { - WaiterKind::Shared => { - if can_acquire_read_lock { - current.move_next(); - } else { - // We need to update the cancellation function since we're moving this - // waiter into our queue. Also update the waiting to indicate that it is - // now in the Mutex's waiter list. - let w = current.remove().unwrap(); - w.set_cancel(cancel_waiter, self as *const RawMutex as usize); - w.set_waiting_for(WaitingFor::Mutex); - waiters.push_back(w); - } - } - WaiterKind::Exclusive => { - transferred_writer = true; - // We need to update the cancellation function since we're moving this - // waiter into our queue. Also update the waiting to indicate that it is - // now in the Mutex's waiter list. - let w = current.remove().unwrap(); - w.set_cancel(cancel_waiter, self as *const RawMutex as usize); - w.set_waiting_for(WaitingFor::Mutex); - waiters.push_back(w); - } - } - } - - let set_on_release = if transferred_writer { - WRITER_WAITING - } else { - 0 - }; - - // If we didn't actually transfer any waiters, clear the HAS_WAITERS bit that we set - // earlier when we acquired the spin lock. - let clear = if waiters.is_empty() { - SPINLOCK | HAS_WAITERS - } else { - SPINLOCK - }; - - while self - .state - .compare_exchange_weak( - oldstate, - (oldstate | set_on_release) & !clear, - Ordering::Release, - Ordering::Relaxed, - ) - .is_err() - { - spin_loop_hint(); - oldstate = self.state.load(Ordering::Relaxed); - } - } - - // The Condvar will wake up any waiters still left in the queue. - } - fn cancel_waiter(&self, waiter: &Waiter, wake_next: bool) -> bool { let mut oldstate = self.state.load(Ordering::Relaxed); while oldstate & SPINLOCK != 0 @@ -849,11 +748,7 @@ impl Mutex { } } - // Called from `Condvar::wait` when the thread wants to reacquire the lock. Since we may - // directly transfer waiters from the `Condvar` wait list to the `Mutex` wait list (see - // `transfer_all` below), we cannot call `Mutex::lock` as we also need to clear the - // `DESIGNATED_WAKER` bit when acquiring the lock. Not doing so will prevent us from waking up - // any other threads in the wait list. + // Called from `Condvar::wait` when the thread wants to reacquire the lock. #[inline] pub(crate) async fn lock_from_cv(&self) -> MutexGuard<'_, T> { self.raw.lock_slow::(DESIGNATED_WAKER, 0).await; @@ -869,24 +764,7 @@ impl Mutex { #[inline] pub(crate) async fn read_lock_from_cv(&self) -> MutexReadGuard<'_, T> { // Threads that have waited in the Condvar's waiter list don't have to care if there is a - // writer waiting. This also prevents a deadlock in the following case: - // - // * Thread A holds a write lock. - // * Thread B is in the mutex's waiter list, also waiting on a write lock. - // * Threads C, D, and E are in the condvar's waiter list. C and D want a read lock; E - // wants a write lock. - // * A calls `cv.notify_all()` while still holding the lock, which transfers C, D, and E - // onto the mutex's wait list. - // * A releases the lock, which wakes up B. - // * B acquires the lock, does some work, and releases the lock. This wakes up C and D. - // However, when iterating through the waiter list we find E, which is waiting for a - // write lock so we set the WRITER_WAITING bit. - // * C and D go through this function to acquire the lock. If we didn't clear the - // WRITER_WAITING bit from the zero_to_acquire set then it would prevent C and D from - // acquiring the lock and they would add themselves back into the waiter list. - // * Now C, D, and E will sit in the waiter list indefinitely unless some other thread - // comes along and acquires the lock. On release, it would wake up E and everything would - // go back to normal. + // writer waiting since they have already waited once. self.raw .lock_slow::(DESIGNATED_WAKER, WRITER_WAITING) .await; @@ -2093,7 +1971,7 @@ mod test { } #[test] - fn transfer_notify_one() { + fn notify_one() { async fn read(mu: Arc>, cv: Arc) { let mut count = mu.read_lock().await; while *count == 0 { @@ -2137,9 +2015,20 @@ mod test { let mut count = block_on(mu.lock()); *count = 1; - // This should transfer all readers + one writer to the waiter queue. + // This should wake all readers + one writer. cv.notify_one(); + // Poll the readers and the writer so they add themselves to the mutex's waiter list. + for r in &mut readers { + if r.as_mut().poll(&mut cx).is_ready() { + panic!("reader unexpectedly ready"); + } + } + + if writer.as_mut().poll(&mut cx).is_ready() { + panic!("writer unexpectedly ready"); + } + assert_eq!( mu.raw.state.load(Ordering::Relaxed) & HAS_WAITERS, HAS_WAITERS @@ -2170,7 +2059,7 @@ mod test { } #[test] - fn transfer_waiters_when_unlocked() { + fn notify_when_unlocked() { async fn dec(mu: Arc>, cv: Arc) { let mut count = mu.lock().await; @@ -2204,8 +2093,7 @@ mod test { *block_on(mu.lock()) = futures.len(); cv.notify_all(); - // Since the lock is not held, instead of transferring the waiters to the waiter list we - // should just wake them all up. + // Since we haven't polled `futures` yet, the mutex should not have any waiters. assert_eq!(mu.raw.state.load(Ordering::Relaxed) & HAS_WAITERS, 0); for f in &mut futures { @@ -2217,7 +2105,7 @@ mod test { } #[test] - fn transfer_reader_writer() { + fn notify_reader_writer() { async fn read(mu: Arc>, cv: Arc) { let mut count = mu.read_lock().await; while *count == 0 { @@ -2282,8 +2170,7 @@ mod test { HAS_WAITERS | WRITER_WAITING ); - // Wake up waiters while holding the lock. This should end with them transferred to the - // mutex's waiter list. + // Wake up waiters while holding the lock. cv.notify_all(); // Drop the lock. This should wake up the lock function. @@ -2293,10 +2180,8 @@ mod test { panic!("lock() unable to complete"); } - assert_eq!( - mu.raw.state.load(Ordering::Relaxed) & (HAS_WAITERS | WRITER_WAITING), - HAS_WAITERS | WRITER_WAITING - ); + // Since we haven't polled `futures` yet, the mutex state should now be empty. + assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0); // Poll everything again. The readers should be able to make progress (but not complete) but // the writer should be blocked. @@ -2338,7 +2223,7 @@ mod test { } #[test] - fn transfer_readers_with_read_lock() { + fn notify_readers_with_read_lock() { async fn read(mu: Arc>, cv: Arc) { let mut count = mu.read_lock().await; while *count == 0 { @@ -2374,23 +2259,24 @@ mod test { let g = block_on(mu.read_lock()); - // Notify the condvar while holding the read lock. This should wake up all the waiters - // rather than just transferring them. + // Notify the condvar while holding the read lock. This should wake up all the waiters. cv.notify_all(); - assert_eq!(mu.raw.state.load(Ordering::Relaxed) & HAS_WAITERS, 0); - - mem::drop(g); + // Since the lock is held in shared mode, all the readers should immediately be able to + // acquire the read lock. for f in &mut futures { if let Poll::Ready(()) = f.as_mut().poll(&mut cx) { panic!("future unexpectedly ready"); } } + assert_eq!(mu.raw.state.load(Ordering::Relaxed) & HAS_WAITERS, 0); assert_eq!( - mu.raw.state.load(Ordering::Relaxed), - READ_LOCK * futures.len() + mu.raw.state.load(Ordering::Relaxed) & READ_MASK, + READ_LOCK * (futures.len() + 1) ); + mem::drop(g); + for f in &mut futures { if f.as_mut().poll(&mut cx).is_pending() { panic!("future unable to complete"); diff --git a/cros_async/src/sync/waiter.rs b/cros_async/src/sync/waiter.rs index 48335c73e7..d9e0fa0de3 100644 --- a/cros_async/src/sync/waiter.rs +++ b/cros_async/src/sync/waiter.rs @@ -189,18 +189,6 @@ impl Waiter { self.waiting_for.store(waiting_for as u8, Ordering::Release); } - // Change the cancellation function that this `Waiter` should use. This will panic if called - // when the `Waiter` is still linked into a waiter list. - pub fn set_cancel(&self, c: fn(usize, &Waiter, bool) -> bool, data: usize) { - debug_assert!( - !self.is_linked(), - "Cannot change cancellation function while linked" - ); - let mut cancel = self.cancel.lock(); - cancel.c = c; - cancel.data = data; - } - // Reset the Waiter back to its initial state. Panics if this `Waiter` is still linked into a // waiter list. pub fn reset(&self, waiting_for: WaitingFor) {