fast_stm/tvar.rs
1// Copyright 2015-2016 rust-stm Developers
2//
3// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
4// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
5// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
6// option. This file may not be copied, modified, or distributed
7// except according to those terms.
8
9#[cfg(feature = "wait-on-retry")]
10use parking_lot::Mutex;
11use parking_lot::RwLock;
12use std::any::Any;
13use std::cmp;
14use std::fmt::{self, Debug};
15use std::marker::PhantomData;
16#[cfg(feature = "wait-on-retry")]
17use std::sync::atomic::{self, AtomicUsize};
18use std::sync::Arc;
19#[cfg(feature = "wait-on-retry")]
20use std::sync::Weak;
21
22use super::result::StmClosureResult;
23#[cfg(feature = "wait-on-retry")]
24use super::transaction::control_block::ControlBlock;
25use super::Transaction;
26
27/// `VarControlBlock` contains all the useful data for a `Var` while beeing the same type.
28///
29/// The control block is accessed from other threads directly whereas `Var`
30/// is just a typesafe wrapper around it.
31pub struct VarControlBlock {
32 /// `waiting_threads` is a list of all waiting threads protected by a mutex.
33 #[cfg(feature = "wait-on-retry")]
34 waiting_threads: Mutex<Vec<Weak<ControlBlock>>>,
35
36 /// `dead_threads` is a counter for all dead threads.
37 ///
38 /// When there are many dead threads waiting for a change, but
39 /// nobody changes the value, then an automatic collection is
40 /// performed.
41 #[cfg(feature = "wait-on-retry")]
42 dead_threads: AtomicUsize,
43
44 /// The inner value of the Var.
45 ///
46 /// It can be shared through a Arc without copying it too often.
47 ///
48 /// The Arc is also used by the threads to detect changes.
49 /// The value in it should not be changed or locked because
50 /// that may cause multiple threads to block unforeseen as well as
51 /// causing deadlocks.
52 ///
53 /// The shared reference is protected by a `RWLock` so that multiple
54 /// threads can safely block it. This ensures consistency, without
55 /// preventing other threads from accessing the values.
56 ///
57 /// Starvation may occur, if one thread wants to write-lock but others
58 /// keep holding read-locks.
59 pub value: RwLock<Arc<dyn Any + Send + Sync>>,
60}
61
62impl VarControlBlock {
63 #[cfg(feature = "wait-on-retry")]
64 /// create a new empty `VarControlBlock`
65 pub fn new<T>(val: T) -> Arc<VarControlBlock>
66 where
67 T: Any + Sync + Send,
68 {
69 let ctrl = VarControlBlock {
70 waiting_threads: Mutex::new(Vec::new()),
71 dead_threads: AtomicUsize::new(0),
72 value: RwLock::new(Arc::new(val)),
73 };
74 Arc::new(ctrl)
75 }
76
77 #[cfg(not(feature = "wait-on-retry"))]
78 /// create a new empty `VarControlBlock`
79 pub fn new<T>(val: T) -> Arc<VarControlBlock>
80 where
81 T: Any + Sync + Send,
82 {
83 let ctrl = VarControlBlock {
84 value: RwLock::new(Arc::new(val)),
85 };
86 Arc::new(ctrl)
87 }
88
89 #[cfg(feature = "wait-on-retry")]
90 /// Wake all threads that are waiting for this block.
91 pub fn wake_all(&self) {
92 // Atomically take all waiting threads from the value.
93 let threads = {
94 let mut guard = self.waiting_threads.lock();
95 let inner: &mut Vec<_> = &mut guard;
96 std::mem::take(inner)
97 };
98
99 // Take all, that are still alive.
100 let threads = threads.iter().filter_map(Weak::upgrade);
101
102 // Release all the semaphores to start the thread.
103 for thread in threads {
104 // Inform thread that this var has changed.
105 thread.set_changed();
106 }
107 }
108
109 #[cfg(feature = "wait-on-retry")]
110 /// Add another thread, that waits for mutations of `self`.
111 pub fn wait(&self, thread: &Arc<ControlBlock>) {
112 let mut guard = self.waiting_threads.lock();
113
114 guard.push(Arc::downgrade(thread));
115 }
116
117 #[cfg(feature = "wait-on-retry")]
118 /// Mark another `StmControlBlock` as dead.
119 ///
120 /// If the count of dead control blocks is too high,
121 /// perform a cleanup.
122 /// This prevents masses of old `StmControlBlock` to
123 /// pile up when a variable is often read but rarely written.
124 pub fn set_dead(&self) {
125 // Increase by one.
126 let deads = self.dead_threads.fetch_add(1, atomic::Ordering::Relaxed);
127
128 // If there are too many then cleanup.
129
130 // There is a potential data race that may occure when
131 // one thread reads the number and then operates on
132 // outdated data, but no serious mistakes may happen.
133 if deads >= 64 {
134 let mut guard = self.waiting_threads.lock();
135 self.dead_threads.store(0, atomic::Ordering::SeqCst);
136
137 // Remove all dead ones. Possibly free up the memory.
138 guard.retain(|t| t.upgrade().is_some());
139 }
140 }
141
142 fn get_address(&self) -> usize {
143 std::ptr::from_ref::<VarControlBlock>(self) as usize
144 }
145}
146
147// Implement some operators so that VarControlBlocks can be sorted.
148
149impl PartialEq for VarControlBlock {
150 fn eq(&self, other: &Self) -> bool {
151 self.get_address() == other.get_address()
152 }
153}
154
155impl Eq for VarControlBlock {}
156
157impl Ord for VarControlBlock {
158 fn cmp(&self, other: &Self) -> cmp::Ordering {
159 self.get_address().cmp(&other.get_address())
160 }
161}
162
163impl PartialOrd for VarControlBlock {
164 fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
165 Some(self.cmp(other))
166 }
167}
168
169/// A variable that can be used in a STM-Block
170#[derive(Clone)]
171pub struct TVar<T> {
172 /// The control block is the inner of the variable.
173 ///
174 /// The rest of `TVar` is just the typesafe interface.
175 control_block: Arc<VarControlBlock>,
176
177 /// This marker is needed so that the variable can be used in a typesafe
178 /// manner.
179 _marker: PhantomData<T>,
180}
181
182impl<T> TVar<T>
183where
184 T: Any + Sync + Send + Clone,
185{
186 /// Create a new `TVar`.
187 pub fn new(val: T) -> TVar<T> {
188 TVar {
189 control_block: VarControlBlock::new(val),
190 _marker: PhantomData,
191 }
192 }
193
194 #[allow(clippy::missing_panics_doc)]
195 /// `read_atomic` reads a value atomically, without starting a transaction.
196 ///
197 /// It is semantically equivalent to
198 ///
199 /// ```
200 /// # use fast_stm::*;
201 ///
202 /// let var = TVar::new(0);
203 /// atomically(|trans| var.read(trans));
204 /// ```
205 ///
206 /// but more efficient.
207 ///
208 /// `read_atomic` returns a clone of the value.
209 pub fn read_atomic(&self) -> T {
210 let val = self.read_ref_atomic();
211
212 (&*val as &dyn Any)
213 .downcast_ref::<T>()
214 .expect("wrong type in Var<T>")
215 .clone()
216 }
217
218 /// Read a value atomically but return a reference.
219 ///
220 /// This is mostly used internally, but can be useful in
221 /// some cases, because `read_atomic` clones the
222 /// inner value, which may be expensive.
223 pub fn read_ref_atomic(&self) -> Arc<dyn Any + Send + Sync> {
224 self.control_block.value.read().clone()
225 }
226
227 /// The normal way to access a var.
228 ///
229 /// It is equivalent to `transaction.read(&var)`, but more
230 /// convenient.
231 pub fn read(&self, transaction: &mut Transaction) -> StmClosureResult<T> {
232 transaction.read(self)
233 }
234
235 /// The normal way to write a var.
236 ///
237 /// It is equivalent to `transaction.write(&var, value)`, but more
238 /// convenient.
239 pub fn write(&self, transaction: &mut Transaction, value: T) -> StmClosureResult<()> {
240 transaction.write(self, value)
241 }
242
243 /// Modify the content of a `TVar` with the function f.
244 ///
245 /// ```
246 /// # use fast_stm::*;
247 ///
248 ///
249 /// let var = TVar::new(21);
250 /// atomically(|trans|
251 /// var.modify(trans, |x| x*2)
252 /// );
253 ///
254 /// assert_eq!(var.read_atomic(), 42);
255 /// ```
256 pub fn modify<F>(&self, transaction: &mut Transaction, f: F) -> StmClosureResult<()>
257 where
258 F: FnOnce(T) -> T,
259 {
260 let old = self.read(transaction)?;
261 self.write(transaction, f(old))
262 }
263
264 /// Replaces the value of a `TVar` with a new one, returning
265 /// the old one.
266 ///
267 /// ```
268 /// # use fast_stm::*;
269 ///
270 /// let var = TVar::new(0);
271 /// let x = atomically(|trans|
272 /// var.replace(trans, 42)
273 /// );
274 ///
275 /// assert_eq!(x, 0);
276 /// assert_eq!(var.read_atomic(), 42);
277 /// ```
278 pub fn replace(&self, transaction: &mut Transaction, value: T) -> StmClosureResult<T> {
279 let old = self.read(transaction)?;
280 self.write(transaction, value)?;
281 Ok(old)
282 }
283
284 /// Check if two `TVar`s refer to the same position.
285 pub fn ref_eq(this: &TVar<T>, other: &TVar<T>) -> bool {
286 Arc::ptr_eq(&this.control_block, &other.control_block)
287 }
288
289 /// Access the control block of the var.
290 ///
291 /// Internal use only!
292 pub fn control_block(&self) -> &Arc<VarControlBlock> {
293 &self.control_block
294 }
295}
296
297/// Debug output a struct.
298///
299/// Note that this function does not print the state atomically.
300/// If another thread modifies the datastructure at the same time, it may print an inconsistent state.
301/// If you need an accurate view, that reflects current thread-local state, you can implement it easily yourself with
302/// atomically.
303///
304/// Running `atomically` inside a running transaction panics. Therefore `fmt` uses
305/// prints the state.
306impl<T> Debug for TVar<T>
307where
308 T: Any + Sync + Send + Clone,
309 T: Debug,
310{
311 #[inline(never)]
312 fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
313 let x = self.read_atomic();
314 f.debug_struct("TVar").field("value", &x).finish()
315 }
316}
317
318#[test]
319// Test if creating and reading a TVar works.
320fn test_read_atomic() {
321 let var = TVar::new(42);
322
323 assert_eq!(42, var.read_atomic());
324}
325
326// More tests are in lib.rs.