fast_stm/
tvar.rs

1// Copyright 2015-2016 rust-stm Developers
2//
3// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
4// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
5// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
6// option. This file may not be copied, modified, or distributed
7// except according to those terms.
8
9#[cfg(feature = "wait-on-retry")]
10use parking_lot::Mutex;
11use parking_lot::RwLock;
12use std::any::Any;
13use std::cmp;
14use std::fmt::{self, Debug};
15use std::marker::PhantomData;
16#[cfg(feature = "wait-on-retry")]
17use std::sync::atomic::{self, AtomicUsize};
18use std::sync::Arc;
19#[cfg(feature = "wait-on-retry")]
20use std::sync::Weak;
21
22use super::result::StmClosureResult;
23#[cfg(feature = "wait-on-retry")]
24use super::transaction::control_block::ControlBlock;
25use super::Transaction;
26
27/// `VarControlBlock` contains all the useful data for a `Var` while beeing the same type.
28///
29/// The control block is accessed from other threads directly whereas `Var`
30/// is just a typesafe wrapper around it.
31pub struct VarControlBlock {
32    /// `waiting_threads` is a list of all waiting threads protected by a mutex.
33    #[cfg(feature = "wait-on-retry")]
34    waiting_threads: Mutex<Vec<Weak<ControlBlock>>>,
35
36    /// `dead_threads` is a counter for all dead threads.
37    ///
38    /// When there are many dead threads waiting for a change, but
39    /// nobody changes the value, then an automatic collection is
40    /// performed.
41    #[cfg(feature = "wait-on-retry")]
42    dead_threads: AtomicUsize,
43
44    /// The inner value of the Var.
45    ///
46    /// It can be shared through a Arc without copying it too often.
47    ///
48    /// The Arc is also used by the threads to detect changes.
49    /// The value in it should not be changed or locked because
50    /// that may cause multiple threads to block unforeseen as well as
51    /// causing deadlocks.
52    ///
53    /// The shared reference is protected by a `RWLock` so that multiple
54    /// threads can safely block it. This ensures consistency, without
55    /// preventing other threads from accessing the values.
56    ///
57    /// Starvation may occur, if one thread wants to write-lock but others
58    /// keep holding read-locks.
59    pub value: RwLock<Arc<dyn Any + Send + Sync>>,
60}
61
62impl VarControlBlock {
63    #[cfg(feature = "wait-on-retry")]
64    /// create a new empty `VarControlBlock`
65    pub fn new<T>(val: T) -> Arc<VarControlBlock>
66    where
67        T: Any + Sync + Send,
68    {
69        let ctrl = VarControlBlock {
70            waiting_threads: Mutex::new(Vec::new()),
71            dead_threads: AtomicUsize::new(0),
72            value: RwLock::new(Arc::new(val)),
73        };
74        Arc::new(ctrl)
75    }
76
77    #[cfg(not(feature = "wait-on-retry"))]
78    /// create a new empty `VarControlBlock`
79    pub fn new<T>(val: T) -> Arc<VarControlBlock>
80    where
81        T: Any + Sync + Send,
82    {
83        let ctrl = VarControlBlock {
84            value: RwLock::new(Arc::new(val)),
85        };
86        Arc::new(ctrl)
87    }
88
89    #[cfg(feature = "wait-on-retry")]
90    /// Wake all threads that are waiting for this block.
91    pub fn wake_all(&self) {
92        // Atomically take all waiting threads from the value.
93        let threads = {
94            let mut guard = self.waiting_threads.lock();
95            let inner: &mut Vec<_> = &mut guard;
96            std::mem::take(inner)
97        };
98
99        // Take all, that are still alive.
100        let threads = threads.iter().filter_map(Weak::upgrade);
101
102        // Release all the semaphores to start the thread.
103        for thread in threads {
104            // Inform thread that this var has changed.
105            thread.set_changed();
106        }
107    }
108
109    #[cfg(feature = "wait-on-retry")]
110    /// Add another thread, that waits for mutations of `self`.
111    pub fn wait(&self, thread: &Arc<ControlBlock>) {
112        let mut guard = self.waiting_threads.lock();
113
114        guard.push(Arc::downgrade(thread));
115    }
116
117    #[cfg(feature = "wait-on-retry")]
118    /// Mark another `StmControlBlock` as dead.
119    ///
120    /// If the count of dead control blocks is too high,
121    /// perform a cleanup.
122    /// This prevents masses of old `StmControlBlock` to
123    /// pile up when a variable is often read but rarely written.
124    pub fn set_dead(&self) {
125        // Increase by one.
126        let deads = self.dead_threads.fetch_add(1, atomic::Ordering::Relaxed);
127
128        // If there are too many then cleanup.
129
130        // There is a potential data race that may occure when
131        // one thread reads the number and then operates on
132        // outdated data, but no serious mistakes may happen.
133        if deads >= 64 {
134            let mut guard = self.waiting_threads.lock();
135            self.dead_threads.store(0, atomic::Ordering::SeqCst);
136
137            // Remove all dead ones. Possibly free up the memory.
138            guard.retain(|t| t.upgrade().is_some());
139        }
140    }
141
142    fn get_address(&self) -> usize {
143        std::ptr::from_ref::<VarControlBlock>(self) as usize
144    }
145}
146
147// Implement some operators so that VarControlBlocks can be sorted.
148
149impl PartialEq for VarControlBlock {
150    fn eq(&self, other: &Self) -> bool {
151        self.get_address() == other.get_address()
152    }
153}
154
155impl Eq for VarControlBlock {}
156
157impl Ord for VarControlBlock {
158    fn cmp(&self, other: &Self) -> cmp::Ordering {
159        self.get_address().cmp(&other.get_address())
160    }
161}
162
163impl PartialOrd for VarControlBlock {
164    fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
165        Some(self.cmp(other))
166    }
167}
168
169/// A variable that can be used in a STM-Block
170#[derive(Clone)]
171pub struct TVar<T> {
172    /// The control block is the inner of the variable.
173    ///
174    /// The rest of `TVar` is just the typesafe interface.
175    control_block: Arc<VarControlBlock>,
176
177    /// This marker is needed so that the variable can be used in a typesafe
178    /// manner.
179    _marker: PhantomData<T>,
180}
181
182impl<T> TVar<T>
183where
184    T: Any + Sync + Send + Clone,
185{
186    /// Create a new `TVar`.
187    pub fn new(val: T) -> TVar<T> {
188        TVar {
189            control_block: VarControlBlock::new(val),
190            _marker: PhantomData,
191        }
192    }
193
194    #[allow(clippy::missing_panics_doc)]
195    /// `read_atomic` reads a value atomically, without starting a transaction.
196    ///
197    /// It is semantically equivalent to
198    ///
199    /// ```
200    /// # use fast_stm::*;
201    ///
202    /// let var = TVar::new(0);
203    /// atomically(|trans| var.read(trans));
204    /// ```
205    ///
206    /// but more efficient.
207    ///
208    /// `read_atomic` returns a clone of the value.
209    ///
210    /// <div class="warning">
211    ///
212    /// This method should not be used inside transactions.
213    ///
214    /// </div>
215    pub fn read_atomic(&self) -> T {
216        let val = self.read_ref_atomic();
217
218        (&*val as &dyn Any)
219            .downcast_ref::<T>()
220            .expect("wrong type in Var<T>")
221            .clone()
222    }
223
224    #[allow(clippy::missing_panics_doc)]
225    /// `write_atomic` writes a value atomically, without starting a transaction.
226    ///
227    /// It is semantically equivalent to
228    ///
229    /// ```
230    /// # use fast_stm::*;
231    ///
232    /// let var = TVar::new(0);
233    /// atomically(|trans| var.write(trans, 1));
234    /// ```
235    ///
236    /// but more efficient.
237    ///
238    /// <div class="warning">
239    ///
240    /// This method should not be used inside transactions.
241    ///
242    /// </div>
243    pub fn write_atomic(&self, value: T) {
244        let mut val = self.control_block.value.write();
245        let boxed = Arc::new(value);
246        *val = boxed;
247    }
248
249    /// Read a value atomically but return a reference.
250    ///
251    /// This is mostly used internally, but can be useful in
252    /// some cases, because `read_atomic` clones the
253    /// inner value, which may be expensive.
254    pub fn read_ref_atomic(&self) -> Arc<dyn Any + Send + Sync> {
255        self.control_block.value.read().clone()
256    }
257
258    /// The normal way to access a var.
259    ///
260    /// It is equivalent to `transaction.read(&var)`, but more
261    /// convenient.
262    pub fn read(&self, transaction: &mut Transaction) -> StmClosureResult<T> {
263        transaction.read(self)
264    }
265
266    /// The normal way to write a var.
267    ///
268    /// It is equivalent to `transaction.write(&var, value)`, but more
269    /// convenient.
270    pub fn write(&self, transaction: &mut Transaction, value: T) -> StmClosureResult<()> {
271        transaction.write(self, value)
272    }
273
274    /// Modify the content of a `TVar` with the function f.
275    ///
276    /// ```
277    /// # use fast_stm::*;
278    ///
279    ///
280    /// let var = TVar::new(21);
281    /// atomically(|trans|
282    ///     var.modify(trans, |x| x*2)
283    /// );
284    ///
285    /// assert_eq!(var.read_atomic(), 42);
286    /// ```
287    pub fn modify<F>(&self, transaction: &mut Transaction, f: F) -> StmClosureResult<()>
288    where
289        F: FnOnce(T) -> T,
290    {
291        let old = self.read(transaction)?;
292        self.write(transaction, f(old))
293    }
294
295    /// Replaces the value of a `TVar` with a new one, returning
296    /// the old one.
297    ///
298    /// ```
299    /// # use fast_stm::*;
300    ///
301    /// let var = TVar::new(0);
302    /// let x = atomically(|trans|
303    ///     var.replace(trans, 42)
304    /// );
305    ///
306    /// assert_eq!(x, 0);
307    /// assert_eq!(var.read_atomic(), 42);
308    /// ```
309    pub fn replace(&self, transaction: &mut Transaction, value: T) -> StmClosureResult<T> {
310        let old = self.read(transaction)?;
311        self.write(transaction, value)?;
312        Ok(old)
313    }
314
315    /// Check if two `TVar`s refer to the same position.
316    pub fn ref_eq(this: &TVar<T>, other: &TVar<T>) -> bool {
317        Arc::ptr_eq(&this.control_block, &other.control_block)
318    }
319
320    /// Access the control block of the var.
321    ///
322    /// Internal use only!
323    pub fn control_block(&self) -> &Arc<VarControlBlock> {
324        &self.control_block
325    }
326}
327
328/// Debug output a struct.
329///
330/// Note that this function does not print the state atomically.
331/// If another thread modifies the datastructure at the same time, it may print an inconsistent state.
332/// If you need an accurate view, that reflects current thread-local state, you can implement it easily yourself with
333/// atomically.
334///
335/// Running `atomically` inside a running transaction panics. Therefore `fmt` uses
336/// prints the state.
337impl<T> Debug for TVar<T>
338where
339    T: Any + Sync + Send + Clone,
340    T: Debug,
341{
342    #[inline(never)]
343    fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
344        let x = self.read_atomic();
345        f.debug_struct("TVar").field("value", &x).finish()
346    }
347}
348
349#[test]
350// Test if creating and reading a TVar works.
351fn test_read_atomic() {
352    let var = TVar::new(42);
353
354    assert_eq!(42, var.read_atomic());
355}
356
357// More tests are in lib.rs.