fast_stm/tvar.rs
1// Copyright 2015-2016 rust-stm Developers
2//
3// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
4// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
5// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
6// option. This file may not be copied, modified, or distributed
7// except according to those terms.
8
9#[cfg(feature = "wait-on-retry")]
10use parking_lot::Mutex;
11use parking_lot::RwLock;
12use std::any::Any;
13use std::cmp;
14use std::fmt::{self, Debug};
15use std::marker::PhantomData;
16#[cfg(feature = "wait-on-retry")]
17use std::sync::atomic::{self, AtomicUsize};
18use std::sync::Arc;
19#[cfg(feature = "wait-on-retry")]
20use std::sync::Weak;
21
22use super::result::StmClosureResult;
23#[cfg(feature = "wait-on-retry")]
24use super::transaction::control_block::ControlBlock;
25use super::Transaction;
26
27/// `VarControlBlock` contains all the useful data for a `Var` while beeing the same type.
28///
29/// The control block is accessed from other threads directly whereas `Var`
30/// is just a typesafe wrapper around it.
31pub struct VarControlBlock {
32    /// `waiting_threads` is a list of all waiting threads protected by a mutex.
33    #[cfg(feature = "wait-on-retry")]
34    waiting_threads: Mutex<Vec<Weak<ControlBlock>>>,
35
36    /// `dead_threads` is a counter for all dead threads.
37    ///
38    /// When there are many dead threads waiting for a change, but
39    /// nobody changes the value, then an automatic collection is
40    /// performed.
41    #[cfg(feature = "wait-on-retry")]
42    dead_threads: AtomicUsize,
43
44    /// The inner value of the Var.
45    ///
46    /// It can be shared through a Arc without copying it too often.
47    ///
48    /// The Arc is also used by the threads to detect changes.
49    /// The value in it should not be changed or locked because
50    /// that may cause multiple threads to block unforeseen as well as
51    /// causing deadlocks.
52    ///
53    /// The shared reference is protected by a `RWLock` so that multiple
54    /// threads can safely block it. This ensures consistency, without
55    /// preventing other threads from accessing the values.
56    ///
57    /// Starvation may occur, if one thread wants to write-lock but others
58    /// keep holding read-locks.
59    pub value: RwLock<Arc<dyn Any + Send + Sync>>,
60}
61
62impl VarControlBlock {
63    #[cfg(feature = "wait-on-retry")]
64    /// create a new empty `VarControlBlock`
65    pub fn new<T>(val: T) -> Arc<VarControlBlock>
66    where
67        T: Any + Sync + Send,
68    {
69        let ctrl = VarControlBlock {
70            waiting_threads: Mutex::new(Vec::new()),
71            dead_threads: AtomicUsize::new(0),
72            value: RwLock::new(Arc::new(val)),
73        };
74        Arc::new(ctrl)
75    }
76
77    #[cfg(not(feature = "wait-on-retry"))]
78    /// create a new empty `VarControlBlock`
79    pub fn new<T>(val: T) -> Arc<VarControlBlock>
80    where
81        T: Any + Sync + Send,
82    {
83        let ctrl = VarControlBlock {
84            value: RwLock::new(Arc::new(val)),
85        };
86        Arc::new(ctrl)
87    }
88
89    #[cfg(feature = "wait-on-retry")]
90    /// Wake all threads that are waiting for this block.
91    pub fn wake_all(&self) {
92        // Atomically take all waiting threads from the value.
93        let threads = {
94            let mut guard = self.waiting_threads.lock();
95            let inner: &mut Vec<_> = &mut guard;
96            std::mem::take(inner)
97        };
98
99        // Take all, that are still alive.
100        let threads = threads.iter().filter_map(Weak::upgrade);
101
102        // Release all the semaphores to start the thread.
103        for thread in threads {
104            // Inform thread that this var has changed.
105            thread.set_changed();
106        }
107    }
108
109    #[cfg(feature = "wait-on-retry")]
110    /// Add another thread, that waits for mutations of `self`.
111    pub fn wait(&self, thread: &Arc<ControlBlock>) {
112        let mut guard = self.waiting_threads.lock();
113
114        guard.push(Arc::downgrade(thread));
115    }
116
117    #[cfg(feature = "wait-on-retry")]
118    /// Mark another `StmControlBlock` as dead.
119    ///
120    /// If the count of dead control blocks is too high,
121    /// perform a cleanup.
122    /// This prevents masses of old `StmControlBlock` to
123    /// pile up when a variable is often read but rarely written.
124    pub fn set_dead(&self) {
125        // Increase by one.
126        let deads = self.dead_threads.fetch_add(1, atomic::Ordering::Relaxed);
127
128        // If there are too many then cleanup.
129
130        // There is a potential data race that may occure when
131        // one thread reads the number and then operates on
132        // outdated data, but no serious mistakes may happen.
133        if deads >= 64 {
134            let mut guard = self.waiting_threads.lock();
135            self.dead_threads.store(0, atomic::Ordering::SeqCst);
136
137            // Remove all dead ones. Possibly free up the memory.
138            guard.retain(|t| t.upgrade().is_some());
139        }
140    }
141
142    fn get_address(&self) -> usize {
143        std::ptr::from_ref::<VarControlBlock>(self) as usize
144    }
145}
146
147// Implement some operators so that VarControlBlocks can be sorted.
148
149impl PartialEq for VarControlBlock {
150    fn eq(&self, other: &Self) -> bool {
151        self.get_address() == other.get_address()
152    }
153}
154
155impl Eq for VarControlBlock {}
156
157impl Ord for VarControlBlock {
158    fn cmp(&self, other: &Self) -> cmp::Ordering {
159        self.get_address().cmp(&other.get_address())
160    }
161}
162
163impl PartialOrd for VarControlBlock {
164    fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
165        Some(self.cmp(other))
166    }
167}
168
169/// A variable that can be used in a STM-Block
170#[derive(Clone)]
171pub struct TVar<T> {
172    /// The control block is the inner of the variable.
173    ///
174    /// The rest of `TVar` is just the typesafe interface.
175    control_block: Arc<VarControlBlock>,
176
177    /// This marker is needed so that the variable can be used in a typesafe
178    /// manner.
179    _marker: PhantomData<T>,
180}
181
182impl<T> TVar<T>
183where
184    T: Any + Sync + Send + Clone,
185{
186    /// Create a new `TVar`.
187    pub fn new(val: T) -> TVar<T> {
188        TVar {
189            control_block: VarControlBlock::new(val),
190            _marker: PhantomData,
191        }
192    }
193
194    #[allow(clippy::missing_panics_doc)]
195    /// `read_atomic` reads a value atomically, without starting a transaction.
196    ///
197    /// It is semantically equivalent to
198    ///
199    /// ```
200    /// # use fast_stm::*;
201    ///
202    /// let var = TVar::new(0);
203    /// atomically(|trans| var.read(trans));
204    /// ```
205    ///
206    /// but more efficient.
207    ///
208    /// `read_atomic` returns a clone of the value.
209    pub fn read_atomic(&self) -> T {
210        let val = self.read_ref_atomic();
211
212        (&*val as &dyn Any)
213            .downcast_ref::<T>()
214            .expect("wrong type in Var<T>")
215            .clone()
216    }
217
218    /// Read a value atomically but return a reference.
219    ///
220    /// This is mostly used internally, but can be useful in
221    /// some cases, because `read_atomic` clones the
222    /// inner value, which may be expensive.
223    pub fn read_ref_atomic(&self) -> Arc<dyn Any + Send + Sync> {
224        self.control_block.value.read().clone()
225    }
226
227    /// The normal way to access a var.
228    ///
229    /// It is equivalent to `transaction.read(&var)`, but more
230    /// convenient.
231    pub fn read(&self, transaction: &mut Transaction) -> StmClosureResult<T> {
232        transaction.read(self)
233    }
234
235    /// The normal way to write a var.
236    ///
237    /// It is equivalent to `transaction.write(&var, value)`, but more
238    /// convenient.
239    pub fn write(&self, transaction: &mut Transaction, value: T) -> StmClosureResult<()> {
240        transaction.write(self, value)
241    }
242
243    /// Modify the content of a `TVar` with the function f.
244    ///
245    /// ```
246    /// # use fast_stm::*;
247    ///
248    ///
249    /// let var = TVar::new(21);
250    /// atomically(|trans|
251    ///     var.modify(trans, |x| x*2)
252    /// );
253    ///
254    /// assert_eq!(var.read_atomic(), 42);
255    /// ```
256    pub fn modify<F>(&self, transaction: &mut Transaction, f: F) -> StmClosureResult<()>
257    where
258        F: FnOnce(T) -> T,
259    {
260        let old = self.read(transaction)?;
261        self.write(transaction, f(old))
262    }
263
264    /// Replaces the value of a `TVar` with a new one, returning
265    /// the old one.
266    ///
267    /// ```
268    /// # use fast_stm::*;
269    ///
270    /// let var = TVar::new(0);
271    /// let x = atomically(|trans|
272    ///     var.replace(trans, 42)
273    /// );
274    ///
275    /// assert_eq!(x, 0);
276    /// assert_eq!(var.read_atomic(), 42);
277    /// ```
278    pub fn replace(&self, transaction: &mut Transaction, value: T) -> StmClosureResult<T> {
279        let old = self.read(transaction)?;
280        self.write(transaction, value)?;
281        Ok(old)
282    }
283
284    /// Check if two `TVar`s refer to the same position.
285    pub fn ref_eq(this: &TVar<T>, other: &TVar<T>) -> bool {
286        Arc::ptr_eq(&this.control_block, &other.control_block)
287    }
288
289    /// Access the control block of the var.
290    ///
291    /// Internal use only!
292    pub fn control_block(&self) -> &Arc<VarControlBlock> {
293        &self.control_block
294    }
295}
296
297/// Debug output a struct.
298///
299/// Note that this function does not print the state atomically.
300/// If another thread modifies the datastructure at the same time, it may print an inconsistent state.
301/// If you need an accurate view, that reflects current thread-local state, you can implement it easily yourself with
302/// atomically.
303///
304/// Running `atomically` inside a running transaction panics. Therefore `fmt` uses
305/// prints the state.
306impl<T> Debug for TVar<T>
307where
308    T: Any + Sync + Send + Clone,
309    T: Debug,
310{
311    #[inline(never)]
312    fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
313        let x = self.read_atomic();
314        f.debug_struct("TVar").field("value", &x).finish()
315    }
316}
317
318#[test]
319// Test if creating and reading a TVar works.
320fn test_read_atomic() {
321    let var = TVar::new(42);
322
323    assert_eq!(42, var.read_atomic());
324}
325
326// More tests are in lib.rs.