fast_stm/transaction/
mod.rs

1pub mod control_block;
2pub mod log_var;
3
4use std::any::Any;
5use std::cell::Cell;
6use std::collections::btree_map::Entry;
7use std::collections::BTreeMap;
8use std::mem;
9use std::sync::Arc;
10
11use crate::{TransactionClosureResult, TransactionError, TransactionResult};
12
13use self::control_block::ControlBlock;
14use self::log_var::LogVar;
15use super::result::{StmClosureResult, StmError};
16use super::tvar::{TVar, VarControlBlock};
17
18thread_local!(static TRANSACTION_RUNNING: Cell<bool> = const { Cell::new(false) });
19
20/// `TransactionGuard` checks against nested STM calls.
21///
22/// Use guard, so that it correctly marks the Transaction as finished.
23struct TransactionGuard;
24
25impl TransactionGuard {
26    pub fn new() -> TransactionGuard {
27        TRANSACTION_RUNNING.with(|t| {
28            assert!(!t.get(), "STM: Nested Transaction");
29            t.set(true);
30        });
31        TransactionGuard
32    }
33}
34
35impl Drop for TransactionGuard {
36    fn drop(&mut self) {
37        TRANSACTION_RUNNING.with(|t| {
38            t.set(false);
39        });
40    }
41}
42
43#[derive(Debug, Clone, Copy, PartialEq, Eq)]
44pub enum TransactionControl {
45    Retry,
46    Abort,
47}
48
49/// Transaction tracks all the read and written variables.
50///
51/// It is used for checking vars, to ensure atomicity.
52pub struct Transaction {
53    /// Map of all vars that map the `VarControlBlock` of a var to a `LogVar`.
54    /// The `VarControlBlock` is unique because it uses it's address for comparing.
55    ///
56    /// The logs need to be accessed in a order to prevend dead-locks on locking.
57    vars: BTreeMap<Arc<VarControlBlock>, LogVar>,
58}
59
60impl Transaction {
61    /// Create a new log.
62    ///
63    /// Normally you don't need to call this directly.
64    /// Use `atomically` instead.
65    fn new() -> Transaction {
66        Transaction {
67            vars: BTreeMap::new(),
68        }
69    }
70
71    /// Run a function with a transaction.
72    ///
73    /// It is equivalent to `atomically`.
74    pub fn with<T, F>(f: F) -> T
75    where
76        F: Fn(&mut Transaction) -> StmClosureResult<T>,
77    {
78        match Transaction::with_control(|_| TransactionControl::Retry, f) {
79            Some(t) => t,
80            None => unreachable!(),
81        }
82    }
83
84    /// Run a function with a transaction.
85    ///
86    /// `with_control` takes another control function, that
87    /// can steer the control flow and possible terminate early.
88    ///
89    /// `control` can react to counters, timeouts or external inputs.
90    ///
91    /// It allows the user to fall back to another strategy, like a global lock
92    /// in the case of too much contention.
93    ///
94    /// Please not, that the transaction may still infinitely wait for changes when `retry` is
95    /// called and `control` does not abort.
96    /// If you need a timeout, another thread should signal this through a [`TVar`].
97    pub fn with_control<T, F, C>(mut control: C, f: F) -> Option<T>
98    where
99        F: Fn(&mut Transaction) -> StmClosureResult<T>,
100        C: FnMut(StmError) -> TransactionControl,
101    {
102        let _guard = TransactionGuard::new();
103
104        // create a log guard for initializing and cleaning up
105        // the log
106        let mut transaction = Transaction::new();
107
108        // loop until success
109        loop {
110            // run the computation
111            match f(&mut transaction) {
112                // on success exit loop
113                Ok(t) => {
114                    if transaction.commit() {
115                        return Some(t);
116                    }
117                }
118
119                Err(e) => {
120                    // Check if the user wants to abort the transaction.
121                    if let TransactionControl::Abort = control(e) {
122                        return None;
123                    }
124
125                    // on retry wait for changes
126                    if let StmError::Retry = e {
127                        transaction.wait_for_change();
128                    }
129                }
130            }
131
132            // clear log before retrying computation
133            transaction.clear();
134        }
135    }
136
137    /// Run a function with a transaction.
138    ///
139    /// The transaction will be retried until:
140    /// - it is validated, or
141    /// - it is explicitly aborted from the function, using the `TODO` function.
142    pub fn with_err<T, F, E>(f: F) -> Result<T, E>
143    where
144        F: Fn(&mut Transaction) -> TransactionClosureResult<T, E>,
145    {
146        let _guard = TransactionGuard::new();
147
148        // create a log guard for initializing and cleaning up
149        // the log
150        let mut transaction = Transaction::new();
151
152        // loop until success
153        loop {
154            // run the computation
155            match f(&mut transaction) {
156                // on success exit loop
157                Ok(t) => {
158                    if transaction.commit() {
159                        return Ok(t);
160                    }
161                }
162                // on error,
163                Err(e) => match e {
164                    // abort and return the error
165                    TransactionError::Abort(err) => return Err(err),
166                    // retry
167                    TransactionError::Stm(_) => {
168                        transaction.wait_for_change();
169                    }
170                },
171            }
172
173            // clear log before retrying computation
174            transaction.clear();
175        }
176    }
177
178    /// Run a function with a transaction.
179    ///
180    /// `with_control` takes another control function, that
181    /// can steer the control flow and possible terminate early.
182    ///
183    /// `control` can react to counters, timeouts or external inputs.
184    ///
185    /// It allows the user to fall back to another strategy, like a global lock
186    /// in the case of too much contention.
187    ///
188    /// Please not, that the transaction may still infinitely wait for changes when `retry` is
189    /// called and `control` does not abort.
190    /// If you need a timeout, another thread should signal this through a [`TVar`].
191    pub fn with_control_and_err<T, F, C, E>(mut control: C, f: F) -> TransactionResult<T, E>
192    where
193        F: Fn(&mut Transaction) -> TransactionClosureResult<T, E>,
194        C: FnMut(StmError) -> TransactionControl,
195    {
196        let _guard = TransactionGuard::new();
197
198        // create a log guard for initializing and cleaning up
199        // the log
200        let mut transaction = Transaction::new();
201
202        // loop until success
203        loop {
204            // run the computation
205            match f(&mut transaction) {
206                // on success exit loop
207                Ok(t) => {
208                    if transaction.commit() {
209                        return TransactionResult::Validated(t);
210                    }
211                }
212
213                Err(e) => {
214                    match e {
215                        TransactionError::Abort(err) => {
216                            return TransactionResult::Cancelled(err);
217                        }
218                        TransactionError::Stm(err) => {
219                            // Check if the user wants to abort the transaction.
220                            if let TransactionControl::Abort = control(err) {
221                                return TransactionResult::Abandoned;
222                            }
223
224                            // on retry wait for changes
225                            if let StmError::Retry = err {
226                                transaction.wait_for_change();
227                            }
228                        }
229                    }
230                }
231            }
232
233            // clear log before retrying computation
234            transaction.clear();
235        }
236    }
237
238    #[allow(clippy::needless_pass_by_value)]
239    /// Perform a downcast on a var.
240    fn downcast<T: Any + Clone>(var: Arc<dyn Any>) -> T {
241        match var.downcast_ref::<T>() {
242            Some(s) => s.clone(),
243            None => unreachable!("TVar has wrong type"),
244        }
245    }
246
247    /// Read a variable and return the value.
248    ///
249    /// The returned value is not always consistent with the current value of the var,
250    /// but may be an outdated or or not yet commited value.
251    ///
252    /// The used code should be capable of handling inconsistent states
253    /// without running into infinite loops.
254    /// Just the commit of wrong values is prevented by STM.
255    pub fn read<T: Send + Sync + Any + Clone>(&mut self, var: &TVar<T>) -> StmClosureResult<T> {
256        let ctrl = var.control_block().clone();
257        // Check if the same var was written before.
258        let value = match self.vars.entry(ctrl) {
259            // If the variable has been accessed before, then load that value.
260            Entry::Occupied(mut entry) => entry.get_mut().read(),
261
262            // Else load the variable statically.
263            Entry::Vacant(entry) => {
264                // Read the value from the var.
265                let value = var.read_ref_atomic();
266
267                // Store in in an entry.
268                entry.insert(LogVar::Read(value.clone()));
269                value
270            }
271        };
272
273        // For now always succeeds, but that may change later.
274        Ok(Transaction::downcast(value))
275    }
276
277    /// Write a variable.
278    ///
279    /// The write is not immediately visible to other threads,
280    /// but atomically commited at the end of the computation.
281    pub fn write<T: Any + Send + Sync + Clone>(
282        &mut self,
283        var: &TVar<T>,
284        value: T,
285    ) -> StmClosureResult<()> {
286        // box the value
287        let boxed = Arc::new(value);
288
289        // new control block
290        let ctrl = var.control_block().clone();
291        // update or create new entry
292        match self.vars.entry(ctrl) {
293            Entry::Occupied(mut entry) => entry.get_mut().write(boxed),
294            Entry::Vacant(entry) => {
295                entry.insert(LogVar::Write(boxed));
296            }
297        }
298
299        // For now always succeeds, but that may change later.
300        Ok(())
301    }
302
303    /// Combine two calculations. When one blocks with `retry`,
304    /// run the other, but don't commit the changes in the first.
305    ///
306    /// If both block, `Transaction::or` still waits for `TVar`s in both functions.
307    /// Use `Transaction::or` instead of handling errors directly with the `Result::or`.
308    /// The later does not handle all the blocking correctly.
309    pub fn or<T, F1, F2>(&mut self, first: F1, second: F2) -> StmClosureResult<T>
310    where
311        F1: Fn(&mut Transaction) -> StmClosureResult<T>,
312        F2: Fn(&mut Transaction) -> StmClosureResult<T>,
313    {
314        // Create a backup of the log.
315        let mut copy = Transaction {
316            vars: self.vars.clone(),
317        };
318
319        // Run the first computation.
320        let f = first(self);
321
322        match f {
323            // Run other on manual retry call.
324            Err(StmError::Retry) => {
325                // swap, so that self is the current run
326                mem::swap(self, &mut copy);
327
328                // Run other action.
329                let s = second(self);
330
331                // If both called retry then exit.
332                match s {
333                    Err(StmError::Failure) => Err(StmError::Failure),
334                    s => {
335                        self.combine(copy);
336                        s
337                    }
338                }
339            }
340
341            // Return success and failure directly
342            x => x,
343        }
344    }
345
346    /// Combine two logs into a single log, to allow waiting for all reads.
347    fn combine(&mut self, other: Transaction) {
348        // combine reads
349        for (var, value) in other.vars {
350            // only insert new values
351            if let Some(value) = value.obsolete() {
352                self.vars.entry(var).or_insert(value);
353            }
354        }
355    }
356
357    /// Clear the log's data.
358    ///
359    /// This should be used before redoing a computation, but
360    /// nowhere else.
361    fn clear(&mut self) {
362        self.vars.clear();
363    }
364
365    /// Wait for any variable to change,
366    /// because the change may lead to a new calculation result.
367    fn wait_for_change(&mut self) {
368        // Create control block for waiting.
369        let ctrl = Arc::new(ControlBlock::new());
370
371        #[allow(clippy::mutable_key_type)]
372        let vars = std::mem::take(&mut self.vars);
373        let mut reads = Vec::with_capacity(self.vars.len());
374
375        let blocking = vars
376            .into_iter()
377            .filter_map(|(a, b)| b.into_read_value().map(|b| (a, b)))
378            // Check for consistency.
379            .all(|(var, value)| {
380                var.wait(&ctrl);
381                let x = {
382                    // Take read lock and read value.
383                    let guard = var.value.read();
384                    Arc::ptr_eq(&value, &guard)
385                };
386                reads.push(var);
387                x
388            });
389
390        // If no var has changed, then block.
391        if blocking {
392            // Propably wait until one var has changed.
393            ctrl.wait();
394        }
395
396        // Let others know that ctrl is dead.
397        // It does not matter, if we set too many
398        // to dead since it may slightly reduce performance
399        // but not break the semantics.
400        for var in &reads {
401            var.set_dead();
402        }
403    }
404
405    /// Write the log back to the variables.
406    ///
407    /// Return true for success and false, if a read var has changed
408    fn commit(&mut self) -> bool {
409        // Use two phase locking for safely writing data back to the vars.
410
411        // First phase: acquire locks.
412        // Check for consistency of all the reads and perform
413        // an early return if something is not consistent.
414
415        // Created arrays for storing the locks
416        // vector of locks.
417        let mut read_vec = Vec::with_capacity(self.vars.len());
418
419        // vector of tuple (value, lock)
420        let mut write_vec = Vec::with_capacity(self.vars.len());
421
422        // vector of written variables
423        let mut written = Vec::with_capacity(self.vars.len());
424
425        for (var, value) in &self.vars {
426            // lock the variable and read the value
427
428            match *value {
429                // We need to take a write lock.
430                LogVar::Write(ref w) | LogVar::ReadObsoleteWrite(_, ref w) => {
431                    // take write lock
432                    let lock = var.value.write();
433                    // add all data to the vector
434                    write_vec.push((w, lock));
435                    written.push(var);
436                }
437
438                // We need to check for consistency and
439                // take a write lock.
440                LogVar::ReadWrite(ref original, ref w) => {
441                    // take write lock
442                    let lock = var.value.write();
443
444                    if !Arc::ptr_eq(&lock, original) {
445                        return false;
446                    }
447                    // add all data to the vector
448                    write_vec.push((w, lock));
449                    written.push(var);
450                }
451                // Nothing to do. ReadObsolete is only needed for blocking, not
452                // for consistency checks.
453                LogVar::ReadObsolete(_) => {}
454                // Take read lock and check for consistency.
455                LogVar::Read(ref original) => {
456                    // Take a read lock.
457                    let lock = var.value.read();
458
459                    if !Arc::ptr_eq(&lock, original) {
460                        return false;
461                    }
462
463                    read_vec.push(lock);
464                }
465            }
466        }
467
468        // Second phase: write back and release
469
470        // Release the reads first.
471        // This allows other threads to continue quickly.
472        drop(read_vec);
473
474        for (value, mut lock) in write_vec {
475            // Commit value.
476            *lock = value.clone();
477        }
478
479        for var in written {
480            // Unblock all threads waiting for it.
481            var.wake_all();
482        }
483
484        // Commit succeded.
485        true
486    }
487}
488
489#[cfg(test)]
490mod test {
491    use super::*;
492    #[test]
493    fn read() {
494        let mut log = Transaction::new();
495        let var = TVar::new(vec![1, 2, 3, 4]);
496
497        // The variable can be read.
498        assert_eq!(&*log.read(&var).unwrap(), &[1, 2, 3, 4]);
499    }
500
501    #[test]
502    fn write_read() {
503        let mut log = Transaction::new();
504        let var = TVar::new(vec![1, 2]);
505
506        log.write(&var, vec![1, 2, 3, 4]).unwrap();
507
508        // Consecutive reads get the updated version.
509        assert_eq!(log.read(&var).unwrap(), [1, 2, 3, 4]);
510
511        // The original value is still preserved.
512        assert_eq!(var.read_atomic(), [1, 2]);
513    }
514
515    #[test]
516    fn transaction_simple() {
517        let x = Transaction::with(|_| Ok(42));
518        assert_eq!(x, 42);
519    }
520
521    #[test]
522    fn transaction_read() {
523        let read = TVar::new(42);
524
525        let x = Transaction::with(|trans| read.read(trans));
526
527        assert_eq!(x, 42);
528    }
529
530    /// Run a transaction with a control function, that always aborts.
531    /// The transaction still tries to run a single time and should successfully
532    /// commit in this test.
533    #[test]
534    fn transaction_with_control_abort_on_single_run() {
535        let read = TVar::new(42);
536
537        let x = Transaction::with_control(|_| TransactionControl::Abort, |tx| read.read(tx));
538
539        assert_eq!(x, Some(42));
540    }
541
542    /// Run a transaction with a control function, that always aborts.
543    /// The transaction retries infinitely often. The control function will abort this loop.
544    #[test]
545    fn transaction_with_control_abort_on_retry() {
546        let x: Option<i32> =
547            Transaction::with_control(|_| TransactionControl::Abort, |_| Err(StmError::Retry));
548
549        assert_eq!(x, None);
550    }
551
552    #[test]
553    fn transaction_write() {
554        let write = TVar::new(42);
555
556        Transaction::with(|trans| write.write(trans, 0));
557
558        assert_eq!(write.read_atomic(), 0);
559    }
560
561    #[test]
562    fn transaction_copy() {
563        let read = TVar::new(42);
564        let write = TVar::new(0);
565
566        Transaction::with(|trans| {
567            let r = read.read(trans)?;
568            write.write(trans, r)
569        });
570
571        assert_eq!(write.read_atomic(), 42);
572    }
573
574    // Dat name. seriously?
575    #[test]
576    fn transaction_control_stuff() {
577        let read = TVar::new(42);
578        let write = TVar::new(0);
579
580        Transaction::with(|trans| {
581            let r = read.read(trans)?;
582            write.write(trans, r)
583        });
584
585        assert_eq!(write.read_atomic(), 42);
586    }
587
588    /// Test if nested transactions are correctly detected.
589    #[test]
590    #[should_panic]
591    fn transaction_nested_fail() {
592        Transaction::with(|_| {
593            Transaction::with(|_| Ok(42));
594            Ok(1)
595        });
596    }
597}