fast_stm/
tvar.rs

1// Copyright 2015-2016 rust-stm Developers
2//
3// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
4// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
5// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
6// option. This file may not be copied, modified, or distributed
7// except according to those terms.
8
9use parking_lot::{Mutex, RwLock};
10use std::any::Any;
11use std::cmp;
12use std::fmt::{self, Debug};
13use std::marker::PhantomData;
14use std::sync::atomic::{self, AtomicUsize};
15use std::sync::{Arc, Weak};
16
17use super::result::StmClosureResult;
18use super::transaction::control_block::ControlBlock;
19use super::Transaction;
20
21/// `VarControlBlock` contains all the useful data for a `Var` while beeing the same type.
22///
23/// The control block is accessed from other threads directly whereas `Var`
24/// is just a typesafe wrapper around it.
25pub struct VarControlBlock {
26    /// `waiting_threads` is a list of all waiting threads protected by a mutex.
27    waiting_threads: Mutex<Vec<Weak<ControlBlock>>>,
28
29    /// `dead_threads` is a counter for all dead threads.
30    ///
31    /// When there are many dead threads waiting for a change, but
32    /// nobody changes the value, then an automatic collection is
33    /// performed.
34    dead_threads: AtomicUsize,
35
36    /// The inner value of the Var.
37    ///
38    /// It can be shared through a Arc without copying it too often.
39    ///
40    /// The Arc is also used by the threads to detect changes.
41    /// The value in it should not be changed or locked because
42    /// that may cause multiple threads to block unforeseen as well as
43    /// causing deadlocks.
44    ///
45    /// The shared reference is protected by a `RWLock` so that multiple
46    /// threads can safely block it. This ensures consistency, without
47    /// preventing other threads from accessing the values.
48    ///
49    /// Starvation may occur, if one thread wants to write-lock but others
50    /// keep holding read-locks.
51    pub value: RwLock<Arc<dyn Any + Send + Sync>>,
52}
53
54impl VarControlBlock {
55    /// create a new empty `VarControlBlock`
56    pub fn new<T>(val: T) -> Arc<VarControlBlock>
57    where
58        T: Any + Sync + Send,
59    {
60        let ctrl = VarControlBlock {
61            waiting_threads: Mutex::new(Vec::new()),
62            dead_threads: AtomicUsize::new(0),
63            value: RwLock::new(Arc::new(val)),
64        };
65        Arc::new(ctrl)
66    }
67
68    /// Wake all threads that are waiting for this block.
69    pub fn wake_all(&self) {
70        // Atomically take all waiting threads from the value.
71        let threads = {
72            let mut guard = self.waiting_threads.lock();
73            let inner: &mut Vec<_> = &mut guard;
74            std::mem::take(inner)
75        };
76
77        // Take all, that are still alive.
78        let threads = threads.iter().filter_map(Weak::upgrade);
79
80        // Release all the semaphores to start the thread.
81        for thread in threads {
82            // Inform thread that this var has changed.
83            thread.set_changed();
84        }
85    }
86
87    /// Add another thread, that waits for mutations of `self`.
88    pub fn wait(&self, thread: &Arc<ControlBlock>) {
89        let mut guard = self.waiting_threads.lock();
90
91        guard.push(Arc::downgrade(thread));
92    }
93
94    /// Mark another `StmControlBlock` as dead.
95    ///
96    /// If the count of dead control blocks is too high,
97    /// perform a cleanup.
98    /// This prevents masses of old `StmControlBlock` to
99    /// pile up when a variable is often read but rarely written.
100    pub fn set_dead(&self) {
101        // Increase by one.
102        let deads = self.dead_threads.fetch_add(1, atomic::Ordering::Relaxed);
103
104        // If there are too many then cleanup.
105
106        // There is a potential data race that may occure when
107        // one thread reads the number and then operates on
108        // outdated data, but no serious mistakes may happen.
109        if deads >= 64 {
110            let mut guard = self.waiting_threads.lock();
111            self.dead_threads.store(0, atomic::Ordering::SeqCst);
112
113            // Remove all dead ones. Possibly free up the memory.
114            guard.retain(|t| t.upgrade().is_some());
115        }
116    }
117
118    fn get_address(&self) -> usize {
119        std::ptr::from_ref::<VarControlBlock>(self) as usize
120    }
121}
122
123// Implement some operators so that VarControlBlocks can be sorted.
124
125impl PartialEq for VarControlBlock {
126    fn eq(&self, other: &Self) -> bool {
127        self.get_address() == other.get_address()
128    }
129}
130
131impl Eq for VarControlBlock {}
132
133impl Ord for VarControlBlock {
134    fn cmp(&self, other: &Self) -> cmp::Ordering {
135        self.get_address().cmp(&other.get_address())
136    }
137}
138
139impl PartialOrd for VarControlBlock {
140    fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
141        Some(self.cmp(other))
142    }
143}
144
145/// A variable that can be used in a STM-Block
146#[derive(Clone)]
147pub struct TVar<T> {
148    /// The control block is the inner of the variable.
149    ///
150    /// The rest of `TVar` is just the typesafe interface.
151    control_block: Arc<VarControlBlock>,
152
153    /// This marker is needed so that the variable can be used in a typesafe
154    /// manner.
155    _marker: PhantomData<T>,
156}
157
158impl<T> TVar<T>
159where
160    T: Any + Sync + Send + Clone,
161{
162    /// Create a new `TVar`.
163    pub fn new(val: T) -> TVar<T> {
164        TVar {
165            control_block: VarControlBlock::new(val),
166            _marker: PhantomData,
167        }
168    }
169
170    #[allow(clippy::missing_panics_doc)]
171    /// `read_atomic` reads a value atomically, without starting a transaction.
172    ///
173    /// It is semantically equivalent to
174    ///
175    /// ```
176    /// # use fast_stm::*;
177    ///
178    /// let var = TVar::new(0);
179    /// atomically(|trans| var.read(trans));
180    /// ```
181    ///
182    /// but more efficient.
183    ///
184    /// `read_atomic` returns a clone of the value.
185    pub fn read_atomic(&self) -> T {
186        let val = self.read_ref_atomic();
187
188        (&*val as &dyn Any)
189            .downcast_ref::<T>()
190            .expect("wrong type in Var<T>")
191            .clone()
192    }
193
194    /// Read a value atomically but return a reference.
195    ///
196    /// This is mostly used internally, but can be useful in
197    /// some cases, because `read_atomic` clones the
198    /// inner value, which may be expensive.
199    pub fn read_ref_atomic(&self) -> Arc<dyn Any + Send + Sync> {
200        self.control_block.value.read().clone()
201    }
202
203    /// The normal way to access a var.
204    ///
205    /// It is equivalent to `transaction.read(&var)`, but more
206    /// convenient.
207    pub fn read(&self, transaction: &mut Transaction) -> StmClosureResult<T> {
208        transaction.read(self)
209    }
210
211    /// The normal way to write a var.
212    ///
213    /// It is equivalent to `transaction.write(&var, value)`, but more
214    /// convenient.
215    pub fn write(&self, transaction: &mut Transaction, value: T) -> StmClosureResult<()> {
216        transaction.write(self, value)
217    }
218
219    /// Modify the content of a `TVar` with the function f.
220    ///
221    /// ```
222    /// # use fast_stm::*;
223    ///
224    ///
225    /// let var = TVar::new(21);
226    /// atomically(|trans|
227    ///     var.modify(trans, |x| x*2)
228    /// );
229    ///
230    /// assert_eq!(var.read_atomic(), 42);
231    /// ```
232    pub fn modify<F>(&self, transaction: &mut Transaction, f: F) -> StmClosureResult<()>
233    where
234        F: FnOnce(T) -> T,
235    {
236        let old = self.read(transaction)?;
237        self.write(transaction, f(old))
238    }
239
240    /// Replaces the value of a `TVar` with a new one, returning
241    /// the old one.
242    ///
243    /// ```
244    /// # use fast_stm::*;
245    ///
246    /// let var = TVar::new(0);
247    /// let x = atomically(|trans|
248    ///     var.replace(trans, 42)
249    /// );
250    ///
251    /// assert_eq!(x, 0);
252    /// assert_eq!(var.read_atomic(), 42);
253    /// ```
254    pub fn replace(&self, transaction: &mut Transaction, value: T) -> StmClosureResult<T> {
255        let old = self.read(transaction)?;
256        self.write(transaction, value)?;
257        Ok(old)
258    }
259
260    /// Check if two `TVar`s refer to the same position.
261    pub fn ref_eq(this: &TVar<T>, other: &TVar<T>) -> bool {
262        Arc::ptr_eq(&this.control_block, &other.control_block)
263    }
264
265    /// Access the control block of the var.
266    ///
267    /// Internal use only!
268    pub fn control_block(&self) -> &Arc<VarControlBlock> {
269        &self.control_block
270    }
271}
272
273/// Debug output a struct.
274///
275/// Note that this function does not print the state atomically.
276/// If another thread modifies the datastructure at the same time, it may print an inconsistent state.
277/// If you need an accurate view, that reflects current thread-local state, you can implement it easily yourself with
278/// atomically.
279///
280/// Running `atomically` inside a running transaction panics. Therefore `fmt` uses
281/// prints the state.
282impl<T> Debug for TVar<T>
283where
284    T: Any + Sync + Send + Clone,
285    T: Debug,
286{
287    #[inline(never)]
288    fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
289        let x = self.read_atomic();
290        f.debug_struct("TVar").field("value", &x).finish()
291    }
292}
293
294#[test]
295// Test if creating and reading a TVar works.
296fn test_read_atomic() {
297    let var = TVar::new(42);
298
299    assert_eq!(42, var.read_atomic());
300}
301
302// More tests are in lib.rs.