fast_stm/tvar.rs
1// Copyright 2015-2016 rust-stm Developers
2//
3// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
4// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
5// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
6// option. This file may not be copied, modified, or distributed
7// except according to those terms.
8
9use parking_lot::{Mutex, RwLock};
10use std::any::Any;
11use std::cmp;
12use std::fmt::{self, Debug};
13use std::marker::PhantomData;
14use std::sync::atomic::{self, AtomicUsize};
15use std::sync::{Arc, Weak};
16
17use super::result::StmClosureResult;
18use super::transaction::control_block::ControlBlock;
19use super::Transaction;
20
21/// `VarControlBlock` contains all the useful data for a `Var` while beeing the same type.
22///
23/// The control block is accessed from other threads directly whereas `Var`
24/// is just a typesafe wrapper around it.
25pub struct VarControlBlock {
26 /// `waiting_threads` is a list of all waiting threads protected by a mutex.
27 waiting_threads: Mutex<Vec<Weak<ControlBlock>>>,
28
29 /// `dead_threads` is a counter for all dead threads.
30 ///
31 /// When there are many dead threads waiting for a change, but
32 /// nobody changes the value, then an automatic collection is
33 /// performed.
34 dead_threads: AtomicUsize,
35
36 /// The inner value of the Var.
37 ///
38 /// It can be shared through a Arc without copying it too often.
39 ///
40 /// The Arc is also used by the threads to detect changes.
41 /// The value in it should not be changed or locked because
42 /// that may cause multiple threads to block unforeseen as well as
43 /// causing deadlocks.
44 ///
45 /// The shared reference is protected by a `RWLock` so that multiple
46 /// threads can safely block it. This ensures consistency, without
47 /// preventing other threads from accessing the values.
48 ///
49 /// Starvation may occur, if one thread wants to write-lock but others
50 /// keep holding read-locks.
51 pub value: RwLock<Arc<dyn Any + Send + Sync>>,
52}
53
54impl VarControlBlock {
55 /// create a new empty `VarControlBlock`
56 pub fn new<T>(val: T) -> Arc<VarControlBlock>
57 where
58 T: Any + Sync + Send,
59 {
60 let ctrl = VarControlBlock {
61 waiting_threads: Mutex::new(Vec::new()),
62 dead_threads: AtomicUsize::new(0),
63 value: RwLock::new(Arc::new(val)),
64 };
65 Arc::new(ctrl)
66 }
67
68 /// Wake all threads that are waiting for this block.
69 pub fn wake_all(&self) {
70 // Atomically take all waiting threads from the value.
71 let threads = {
72 let mut guard = self.waiting_threads.lock();
73 let inner: &mut Vec<_> = &mut guard;
74 std::mem::take(inner)
75 };
76
77 // Take all, that are still alive.
78 let threads = threads.iter().filter_map(Weak::upgrade);
79
80 // Release all the semaphores to start the thread.
81 for thread in threads {
82 // Inform thread that this var has changed.
83 thread.set_changed();
84 }
85 }
86
87 /// Add another thread, that waits for mutations of `self`.
88 pub fn wait(&self, thread: &Arc<ControlBlock>) {
89 let mut guard = self.waiting_threads.lock();
90
91 guard.push(Arc::downgrade(thread));
92 }
93
94 /// Mark another `StmControlBlock` as dead.
95 ///
96 /// If the count of dead control blocks is too high,
97 /// perform a cleanup.
98 /// This prevents masses of old `StmControlBlock` to
99 /// pile up when a variable is often read but rarely written.
100 pub fn set_dead(&self) {
101 // Increase by one.
102 let deads = self.dead_threads.fetch_add(1, atomic::Ordering::Relaxed);
103
104 // If there are too many then cleanup.
105
106 // There is a potential data race that may occure when
107 // one thread reads the number and then operates on
108 // outdated data, but no serious mistakes may happen.
109 if deads >= 64 {
110 let mut guard = self.waiting_threads.lock();
111 self.dead_threads.store(0, atomic::Ordering::SeqCst);
112
113 // Remove all dead ones. Possibly free up the memory.
114 guard.retain(|t| t.upgrade().is_some());
115 }
116 }
117
118 fn get_address(&self) -> usize {
119 std::ptr::from_ref::<VarControlBlock>(self) as usize
120 }
121}
122
123// Implement some operators so that VarControlBlocks can be sorted.
124
125impl PartialEq for VarControlBlock {
126 fn eq(&self, other: &Self) -> bool {
127 self.get_address() == other.get_address()
128 }
129}
130
131impl Eq for VarControlBlock {}
132
133impl Ord for VarControlBlock {
134 fn cmp(&self, other: &Self) -> cmp::Ordering {
135 self.get_address().cmp(&other.get_address())
136 }
137}
138
139impl PartialOrd for VarControlBlock {
140 fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
141 Some(self.cmp(other))
142 }
143}
144
145/// A variable that can be used in a STM-Block
146#[derive(Clone)]
147pub struct TVar<T> {
148 /// The control block is the inner of the variable.
149 ///
150 /// The rest of `TVar` is just the typesafe interface.
151 control_block: Arc<VarControlBlock>,
152
153 /// This marker is needed so that the variable can be used in a typesafe
154 /// manner.
155 _marker: PhantomData<T>,
156}
157
158impl<T> TVar<T>
159where
160 T: Any + Sync + Send + Clone,
161{
162 /// Create a new `TVar`.
163 pub fn new(val: T) -> TVar<T> {
164 TVar {
165 control_block: VarControlBlock::new(val),
166 _marker: PhantomData,
167 }
168 }
169
170 #[allow(clippy::missing_panics_doc)]
171 /// `read_atomic` reads a value atomically, without starting a transaction.
172 ///
173 /// It is semantically equivalent to
174 ///
175 /// ```
176 /// # use fast_stm::*;
177 ///
178 /// let var = TVar::new(0);
179 /// atomically(|trans| var.read(trans));
180 /// ```
181 ///
182 /// but more efficient.
183 ///
184 /// `read_atomic` returns a clone of the value.
185 pub fn read_atomic(&self) -> T {
186 let val = self.read_ref_atomic();
187
188 (&*val as &dyn Any)
189 .downcast_ref::<T>()
190 .expect("wrong type in Var<T>")
191 .clone()
192 }
193
194 /// Read a value atomically but return a reference.
195 ///
196 /// This is mostly used internally, but can be useful in
197 /// some cases, because `read_atomic` clones the
198 /// inner value, which may be expensive.
199 pub fn read_ref_atomic(&self) -> Arc<dyn Any + Send + Sync> {
200 self.control_block.value.read().clone()
201 }
202
203 /// The normal way to access a var.
204 ///
205 /// It is equivalent to `transaction.read(&var)`, but more
206 /// convenient.
207 pub fn read(&self, transaction: &mut Transaction) -> StmClosureResult<T> {
208 transaction.read(self)
209 }
210
211 /// The normal way to write a var.
212 ///
213 /// It is equivalent to `transaction.write(&var, value)`, but more
214 /// convenient.
215 pub fn write(&self, transaction: &mut Transaction, value: T) -> StmClosureResult<()> {
216 transaction.write(self, value)
217 }
218
219 /// Modify the content of a `TVar` with the function f.
220 ///
221 /// ```
222 /// # use fast_stm::*;
223 ///
224 ///
225 /// let var = TVar::new(21);
226 /// atomically(|trans|
227 /// var.modify(trans, |x| x*2)
228 /// );
229 ///
230 /// assert_eq!(var.read_atomic(), 42);
231 /// ```
232 pub fn modify<F>(&self, transaction: &mut Transaction, f: F) -> StmClosureResult<()>
233 where
234 F: FnOnce(T) -> T,
235 {
236 let old = self.read(transaction)?;
237 self.write(transaction, f(old))
238 }
239
240 /// Replaces the value of a `TVar` with a new one, returning
241 /// the old one.
242 ///
243 /// ```
244 /// # use fast_stm::*;
245 ///
246 /// let var = TVar::new(0);
247 /// let x = atomically(|trans|
248 /// var.replace(trans, 42)
249 /// );
250 ///
251 /// assert_eq!(x, 0);
252 /// assert_eq!(var.read_atomic(), 42);
253 /// ```
254 pub fn replace(&self, transaction: &mut Transaction, value: T) -> StmClosureResult<T> {
255 let old = self.read(transaction)?;
256 self.write(transaction, value)?;
257 Ok(old)
258 }
259
260 /// Check if two `TVar`s refer to the same position.
261 pub fn ref_eq(this: &TVar<T>, other: &TVar<T>) -> bool {
262 Arc::ptr_eq(&this.control_block, &other.control_block)
263 }
264
265 /// Access the control block of the var.
266 ///
267 /// Internal use only!
268 pub fn control_block(&self) -> &Arc<VarControlBlock> {
269 &self.control_block
270 }
271}
272
273/// Debug output a struct.
274///
275/// Note that this function does not print the state atomically.
276/// If another thread modifies the datastructure at the same time, it may print an inconsistent state.
277/// If you need an accurate view, that reflects current thread-local state, you can implement it easily yourself with
278/// atomically.
279///
280/// Running `atomically` inside a running transaction panics. Therefore `fmt` uses
281/// prints the state.
282impl<T> Debug for TVar<T>
283where
284 T: Any + Sync + Send + Clone,
285 T: Debug,
286{
287 #[inline(never)]
288 fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
289 let x = self.read_atomic();
290 f.debug_struct("TVar").field("value", &x).finish()
291 }
292}
293
294#[test]
295// Test if creating and reading a TVar works.
296fn test_read_atomic() {
297 let var = TVar::new(42);
298
299 assert_eq!(42, var.read_atomic());
300}
301
302// More tests are in lib.rs.