Skip to main content

fast_stm/transaction/
mod.rs

1#[cfg(feature = "wait-on-retry")]
2pub mod control_block;
3pub mod log_var;
4
5cfg_if::cfg_if! {
6    if #[cfg(feature = "hash-registers")] {
7        use std::collections::hash_map::Entry;
8
9        use rustc_hash::FxHashMap;
10    } else {
11        use std::collections::{btree_map::Entry, BTreeMap};
12    }
13}
14
15use std::any::Any;
16use std::cell::Cell;
17use std::mem;
18use std::sync::Arc;
19
20use crate::result::{StmClosureResult, StmError};
21use crate::tvar::{TVar, VarControlBlock};
22use crate::{TransactionClosureResult, TransactionError, TransactionResult};
23
24#[cfg(feature = "wait-on-retry")]
25use control_block::ControlBlock;
26use log_var::LogVar;
27
28thread_local!(static TRANSACTION_RUNNING: Cell<bool> = const { Cell::new(false) });
29
30/// `TransactionGuard` checks against nested STM calls.
31///
32/// Use guard, so that it correctly marks the Transaction as finished.
33struct TransactionGuard;
34
35impl TransactionGuard {
36    pub fn new() -> TransactionGuard {
37        TRANSACTION_RUNNING.with(|t| {
38            assert!(!t.get(), "STM: Nested Transaction");
39            t.set(true);
40        });
41        TransactionGuard
42    }
43}
44
45impl Drop for TransactionGuard {
46    fn drop(&mut self) {
47        TRANSACTION_RUNNING.with(|t| {
48            t.set(false);
49        });
50    }
51}
52
53#[derive(Debug, Clone, Copy, PartialEq, Eq)]
54pub enum TransactionControl {
55    Retry,
56    Abort,
57}
58
59#[cfg(feature = "profiling")]
60#[derive(Debug, Default)]
61pub struct TransactionTallies {
62    pub n_attempts: std::sync::atomic::AtomicUsize,
63    pub n_retry: std::sync::atomic::AtomicUsize,
64    pub n_error: std::sync::atomic::AtomicUsize,
65    pub n_read: std::sync::atomic::AtomicUsize,
66    pub n_redundant_read: std::sync::atomic::AtomicUsize,
67    pub n_read_after_write: std::sync::atomic::AtomicUsize,
68    pub n_write: std::sync::atomic::AtomicUsize,
69}
70
71#[cfg(feature = "profiling")]
72impl std::ops::AddAssign for TransactionTallies {
73    fn add_assign(&mut self, rhs: Self) {
74        self.n_attempts.fetch_add(
75            rhs.n_attempts.load(std::sync::atomic::Ordering::Relaxed),
76            std::sync::atomic::Ordering::Relaxed,
77        );
78        self.n_retry.fetch_add(
79            rhs.n_retry.load(std::sync::atomic::Ordering::Relaxed),
80            std::sync::atomic::Ordering::Relaxed,
81        );
82        self.n_error.fetch_add(
83            rhs.n_error.load(std::sync::atomic::Ordering::Relaxed),
84            std::sync::atomic::Ordering::Relaxed,
85        );
86        self.n_read.fetch_add(
87            rhs.n_read.load(std::sync::atomic::Ordering::Relaxed),
88            std::sync::atomic::Ordering::Relaxed,
89        );
90        self.n_redundant_read.fetch_add(
91            rhs.n_redundant_read
92                .load(std::sync::atomic::Ordering::Relaxed),
93            std::sync::atomic::Ordering::Relaxed,
94        );
95        self.n_read_after_write.fetch_add(
96            rhs.n_read_after_write
97                .load(std::sync::atomic::Ordering::Relaxed),
98            std::sync::atomic::Ordering::Relaxed,
99        );
100        self.n_write.fetch_add(
101            rhs.n_write.load(std::sync::atomic::Ordering::Relaxed),
102            std::sync::atomic::Ordering::Relaxed,
103        );
104    }
105}
106
107#[cfg(feature = "profiling")]
108impl std::iter::Sum for TransactionTallies {
109    fn sum<I: Iterator<Item = Self>>(iter: I) -> Self {
110        iter.fold(Self::default(), |mut acc, t| {
111            acc += t;
112            acc
113        })
114    }
115}
116
117// -- Transactions
118
119#[cfg(not(feature = "hash-registers"))]
120pub(crate) type RegisterType = BTreeMap<Arc<VarControlBlock>, LogVar>;
121#[cfg(feature = "hash-registers")]
122pub(crate) type RegisterType = FxHashMap<*const VarControlBlock, LogVar>;
123
124/// Transaction tracks all the read and written variables.
125///
126/// It is used for checking vars, to ensure atomicity.
127pub struct Transaction {
128    /// Map of all vars that map the `VarControlBlock` of a var to a `LogVar`.
129    /// The `VarControlBlock` is unique because it uses it's address for comparing.
130    ///
131    /// The logs need to be accessed in a order to prevend dead-locks on locking.
132    vars: RegisterType,
133    #[cfg(feature = "profiling")]
134    tallies: TransactionTallies,
135}
136
137impl Default for Transaction {
138    fn default() -> Self {
139        Self {
140            vars: RegisterType::default(),
141            #[cfg(feature = "profiling")]
142            tallies: TransactionTallies::default(),
143        }
144    }
145}
146
147/// Public API
148impl Transaction {
149    /// Run a function with a transaction.
150    ///
151    /// It is equivalent to `atomically`.
152    pub fn with<T, F>(f: F) -> T
153    where
154        F: Fn(&mut Transaction) -> StmClosureResult<T>,
155    {
156        match Transaction::with_control(|_| TransactionControl::Retry, f) {
157            Some(t) => t,
158            None => unreachable!(),
159        }
160    }
161
162    /// Run a function with a transaction.
163    ///
164    /// `with_control` takes another control function, that
165    /// can steer the control flow and possible terminate early.
166    ///
167    /// `control` can react to counters, timeouts or external inputs.
168    ///
169    /// It allows the user to fall back to another strategy, like a global lock
170    /// in the case of too much contention.
171    ///
172    /// Please not, that the transaction may still infinitely wait for changes when `retry` is
173    /// called and `control` does not abort.
174    /// If you need a timeout, another thread should signal this through a [`TVar`].
175    pub fn with_control<T, F, C>(mut control: C, f: F) -> Option<T>
176    where
177        F: Fn(&mut Transaction) -> StmClosureResult<T>,
178        C: FnMut(StmError) -> TransactionControl,
179    {
180        let _guard = TransactionGuard::new();
181
182        // create a log guard for initializing and cleaning up
183        // the log
184        let mut transaction = Transaction::default();
185
186        // loop until success
187        loop {
188            // run the computation
189            match f(&mut transaction) {
190                // on success exit loop
191                Ok(t) => {
192                    if transaction.commit() {
193                        return Some(t);
194                    }
195                }
196
197                Err(e) => {
198                    // Check if the user wants to abort the transaction.
199                    if let TransactionControl::Abort = control(e) {
200                        return None;
201                    }
202
203                    // on retry wait for changes
204                    #[cfg(feature = "wait-on-retry")]
205                    if let StmError::Retry = e {
206                        transaction.wait_for_change();
207                    }
208                }
209            }
210
211            // clear log before retrying computation
212            transaction.clear();
213        }
214    }
215
216    /// Run a function with a transaction.
217    ///
218    /// The transaction will be retried until:
219    /// - it is validated, or
220    /// - it is explicitly aborted from the function, using the `TODO` function.
221    pub fn with_err<T, F, E>(f: F) -> Result<T, E>
222    where
223        F: Fn(&mut Transaction) -> TransactionClosureResult<T, E>,
224    {
225        let _guard = TransactionGuard::new();
226
227        // create a log guard for initializing and cleaning up
228        // the log
229        let mut transaction = Transaction::default();
230
231        // loop until success
232        loop {
233            // run the computation
234            match f(&mut transaction) {
235                // on success exit loop
236                Ok(t) => {
237                    if transaction.commit() {
238                        return Ok(t);
239                    }
240                }
241                // on error,
242                Err(e) => match e {
243                    // abort and return the error
244                    TransactionError::Abort(err) => return Err(err),
245                    // retry
246                    TransactionError::Stm(_) => {
247                        #[cfg(feature = "wait-on-retry")]
248                        transaction.wait_for_change();
249                    }
250                },
251            }
252
253            // clear log before retrying computation
254            transaction.clear();
255        }
256    }
257
258    /// Run a function with a transaction.
259    ///
260    /// `with_control` takes another control function, that
261    /// can steer the control flow and possible terminate early.
262    ///
263    /// `control` can react to counters, timeouts or external inputs.
264    ///
265    /// It allows the user to fall back to another strategy, like a global lock
266    /// in the case of too much contention.
267    ///
268    /// Please not, that the transaction may still infinitely wait for changes when `retry` is
269    /// called and `control` does not abort.
270    /// If you need a timeout, another thread should signal this through a [`TVar`].
271    pub fn with_control_and_err<T, F, C, E>(mut control: C, f: F) -> TransactionResult<T, E>
272    where
273        F: Fn(&mut Transaction) -> TransactionClosureResult<T, E>,
274        C: FnMut(StmError) -> TransactionControl,
275    {
276        let _guard = TransactionGuard::new();
277
278        // create a log guard for initializing and cleaning up
279        // the log
280        let mut transaction = Transaction::default();
281
282        // loop until success
283        loop {
284            // run the computation
285            match f(&mut transaction) {
286                // on success exit loop
287                Ok(t) => {
288                    if transaction.commit() {
289                        return TransactionResult::Validated(t);
290                    }
291                }
292
293                Err(e) => {
294                    match e {
295                        TransactionError::Abort(err) => {
296                            return TransactionResult::Cancelled(err);
297                        }
298                        TransactionError::Stm(err) => {
299                            // Check if the user wants to abort the transaction.
300                            if let TransactionControl::Abort = control(err) {
301                                return TransactionResult::Abandoned;
302                            }
303
304                            // on retry wait for changes
305                            #[cfg(feature = "wait-on-retry")]
306                            if let StmError::Retry = err {
307                                transaction.wait_for_change();
308                            }
309                        }
310                    }
311                }
312            }
313
314            // clear log before retrying computation
315            transaction.clear();
316        }
317    }
318}
319
320#[cfg(feature = "profiling")]
321/// Public profiling API
322impl Transaction {
323    /// Run a function with a transaction.
324    ///
325    /// It is equivalent to `atomically`.
326    pub fn profile_with<T, F>(f: F) -> (T, TransactionTallies)
327    where
328        F: Fn(&mut Transaction) -> StmClosureResult<T>,
329    {
330        match Transaction::profile_with_control(|_| TransactionControl::Retry, f) {
331            (Some(t), tallies) => (t, tallies),
332            (None, _) => unreachable!(),
333        }
334    }
335
336    /// Run a function with a transaction.
337    ///
338    /// `with_control` takes another control function, that
339    /// can steer the control flow and possible terminate early.
340    ///
341    /// `control` can react to counters, timeouts or external inputs.
342    ///
343    /// It allows the user to fall back to another strategy, like a global lock
344    /// in the case of too much contention.
345    ///
346    /// Please not, that the transaction may still infinitely wait for changes when `retry` is
347    /// called and `control` does not abort.
348    /// If you need a timeout, another thread should signal this through a [`TVar`].
349    pub fn profile_with_control<T, F, C>(mut control: C, f: F) -> (Option<T>, TransactionTallies)
350    where
351        F: Fn(&mut Transaction) -> StmClosureResult<T>,
352        C: FnMut(StmError) -> TransactionControl,
353    {
354        let _guard = TransactionGuard::new();
355
356        // create a log guard for initializing and cleaning up
357        // the log
358        let mut transaction = Transaction::default();
359
360        // loop until success
361        loop {
362            transaction
363                .tallies
364                .n_attempts
365                .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
366            // run the computation
367            match f(&mut transaction) {
368                // on success exit loop
369                Ok(t) => {
370                    if transaction.commit() {
371                        return (Some(t), transaction.tallies);
372                    }
373                }
374
375                Err(e) => {
376                    // Check if the user wants to abort the transaction.
377                    match e {
378                        StmError::Failure => {
379                            transaction
380                                .tallies
381                                .n_error
382                                .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
383                        }
384                        StmError::Retry => {
385                            transaction
386                                .tallies
387                                .n_retry
388                                .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
389                        }
390                    }
391
392                    if let TransactionControl::Abort = control(e) {
393                        return (None, transaction.tallies);
394                    }
395
396                    // on retry wait for changes
397                    #[cfg(feature = "wait-on-retry")]
398                    if let StmError::Retry = e {
399                        transaction.wait_for_change();
400                    }
401                }
402            }
403
404            // clear log before retrying computation
405            transaction.clear();
406        }
407    }
408
409    /// Run a function with a transaction.
410    ///
411    /// The transaction will be retried until:
412    /// - it is validated, or
413    /// - it is explicitly aborted from the function, using the `TODO` function.
414    pub fn profile_with_err<T, F, E>(f: F) -> (Result<T, E>, TransactionTallies)
415    where
416        F: Fn(&mut Transaction) -> TransactionClosureResult<T, E>,
417    {
418        let _guard = TransactionGuard::new();
419
420        // create a log guard for initializing and cleaning up
421        // the log
422        let mut transaction = Transaction::default();
423
424        // loop until success
425        loop {
426            transaction
427                .tallies
428                .n_attempts
429                .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
430            // run the computation
431            match f(&mut transaction) {
432                // on success exit loop
433                Ok(t) => {
434                    if transaction.commit() {
435                        return (Ok(t), transaction.tallies);
436                    }
437                }
438                // on error,
439                Err(e) => match e {
440                    // abort and return the error
441                    TransactionError::Abort(err) => return (Err(err), transaction.tallies),
442                    // retry
443                    TransactionError::Stm(err) => {
444                        match err {
445                            StmError::Failure => {
446                                transaction
447                                    .tallies
448                                    .n_error
449                                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
450                            }
451                            StmError::Retry => {
452                                transaction
453                                    .tallies
454                                    .n_retry
455                                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
456                            }
457                        }
458                        #[cfg(feature = "wait-on-retry")]
459                        transaction.wait_for_change();
460                    }
461                },
462            }
463
464            // clear log before retrying computation
465            transaction.clear();
466        }
467    }
468
469    /// Run a function with a transaction.
470    ///
471    /// `with_control` takes another control function, that
472    /// can steer the control flow and possible terminate early.
473    ///
474    /// `control` can react to counters, timeouts or external inputs.
475    ///
476    /// It allows the user to fall back to another strategy, like a global lock
477    /// in the case of too much contention.
478    ///
479    /// Please not, that the transaction may still infinitely wait for changes when `retry` is
480    /// called and `control` does not abort.
481    /// If you need a timeout, another thread should signal this through a [`TVar`].
482    pub fn profile_with_control_and_err<T, F, C, E>(
483        mut control: C,
484        f: F,
485    ) -> (TransactionResult<T, E>, TransactionTallies)
486    where
487        F: Fn(&mut Transaction) -> TransactionClosureResult<T, E>,
488        C: FnMut(StmError) -> TransactionControl,
489    {
490        let _guard = TransactionGuard::new();
491
492        // create a log guard for initializing and cleaning up
493        // the log
494        let mut transaction = Transaction::default();
495
496        // loop until success
497        loop {
498            transaction
499                .tallies
500                .n_attempts
501                .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
502            // run the computation
503            match f(&mut transaction) {
504                // on success exit loop
505                Ok(t) => {
506                    if transaction.commit() {
507                        return (TransactionResult::Validated(t), transaction.tallies);
508                    }
509                }
510
511                Err(e) => {
512                    match e {
513                        TransactionError::Abort(err) => {
514                            return (TransactionResult::Cancelled(err), transaction.tallies);
515                        }
516                        TransactionError::Stm(err) => {
517                            match err {
518                                StmError::Failure => {
519                                    transaction
520                                        .tallies
521                                        .n_error
522                                        .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
523                                }
524                                StmError::Retry => {
525                                    transaction
526                                        .tallies
527                                        .n_retry
528                                        .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
529                                }
530                            }
531
532                            // Check if the user wants to abort the transaction.
533                            if let TransactionControl::Abort = control(err) {
534                                return (TransactionResult::Abandoned, transaction.tallies);
535                            }
536
537                            // on retry wait for changes
538                            #[cfg(feature = "wait-on-retry")]
539                            if let StmError::Retry = err {
540                                transaction.wait_for_change();
541                            }
542                        }
543                    }
544                }
545            }
546
547            // clear log before retrying computation
548            transaction.clear();
549        }
550    }
551}
552
553/// In-closure routines
554impl Transaction {
555    /// Read a variable and return the value.
556    ///
557    /// The returned value is not always consistent with the current value of the var,
558    /// but may be an outdated or or not yet commited value.
559    ///
560    /// The used code should be capable of handling inconsistent states
561    /// without running into infinite loops.
562    /// Just the commit of wrong values is prevented by STM.
563    pub fn read<T: Send + Sync + Any + Clone>(&mut self, var: &TVar<T>) -> StmClosureResult<T> {
564        #[cfg(feature = "profiling")]
565        self.tallies
566            .n_read
567            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
568        let ctrl = var.control_block().clone();
569        // Check if the same var was written before.
570        #[cfg(not(feature = "hash-registers"))]
571        let key = ctrl;
572        #[cfg(feature = "hash-registers")]
573        let key = Arc::as_ptr(&ctrl);
574        let value = match self.vars.entry(key) {
575            // If the variable has been accessed before, then load that value.
576            #[cfg(feature = "early-conflict-detection")]
577            Entry::Occupied(mut entry) => {
578                let log = entry.get_mut();
579                // if we previously read the var, check for value change
580                if let LogVar::Read(v) = log {
581                    #[cfg(feature = "profiling")]
582                    self.tallies
583                        .n_redundant_read
584                        .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
585                    let crt_v = var.read_ref_atomic();
586                    if !Arc::ptr_eq(v, &crt_v) {
587                        return Err(StmError::Failure);
588                    }
589                }
590                #[cfg(feature = "profiling")]
591                if let LogVar::Write(_) = log {
592                    self.tallies
593                        .n_read_after_write
594                        .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
595                }
596                log.read()
597            }
598            #[cfg(not(feature = "early-conflict-detection"))]
599            Entry::Occupied(mut entry) => {
600                #[cfg(feature = "profiling")]
601                {
602                    let log = entry.get();
603                    if let LogVar::Read(_) = log {
604                        self.tallies
605                            .n_redundant_read
606                            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
607                    } else if let LogVar::Write(_) = log {
608                        self.tallies
609                            .n_read_after_write
610                            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
611                    }
612                }
613
614                entry.get_mut().read()
615            }
616
617            // Else load the variable statically.
618            Entry::Vacant(entry) => {
619                // Read the value from the var.
620                let value = var.read_ref_atomic();
621
622                // Store in in an entry.
623                entry.insert(LogVar::Read(value.clone()));
624                value
625            }
626        };
627
628        Ok(Transaction::downcast(value))
629    }
630
631    /// Write a variable.
632    ///
633    /// The write is not immediately visible to other threads,
634    /// but atomically commited at the end of the computation.
635    pub fn write<T: Any + Send + Sync + Clone>(
636        &mut self,
637        var: &TVar<T>,
638        value: T,
639    ) -> StmClosureResult<()> {
640        #[cfg(feature = "profiling")]
641        self.tallies
642            .n_write
643            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
644        // box the value
645        let boxed = Arc::new(value);
646
647        // new control block
648        let ctrl = var.control_block().clone();
649        // update or create new entry
650        #[cfg(not(feature = "hash-registers"))]
651        let key = ctrl;
652        #[cfg(feature = "hash-registers")]
653        let key = Arc::as_ptr(&ctrl);
654        match self.vars.entry(key) {
655            Entry::Occupied(mut entry) => entry.get_mut().write(boxed),
656            Entry::Vacant(entry) => {
657                entry.insert(LogVar::Write(boxed));
658            }
659        }
660
661        // For now always succeeds, but that may change later.
662        Ok(())
663    }
664
665    /// Modify a variable.
666    ///
667    /// The write is not immediately visible to other threads,
668    /// but atomically commited at the end of the computation.
669    ///
670    /// Prefer this method over calling `read` then `write` for performance.
671    pub fn modify<T: Any + Send + Sync + Clone, F>(
672        &mut self,
673        var: &TVar<T>,
674        f: F,
675    ) -> StmClosureResult<()>
676    where
677        F: FnOnce(T) -> T,
678    {
679        #[cfg(feature = "profiling")]
680        self.tallies
681            .n_write
682            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
683
684        // new control block
685        let ctrl = var.control_block().clone();
686        // update or create new entry
687        #[cfg(not(feature = "hash-registers"))]
688        let key = ctrl;
689        #[cfg(feature = "hash-registers")]
690        let key = Arc::as_ptr(&ctrl);
691        match self.vars.entry(key) {
692            // If the variable has been accessed before, then load that value.
693            #[cfg(feature = "early-conflict-detection")]
694            Entry::Occupied(mut entry) => {
695                let log = entry.get_mut();
696                // if we previously read the var, check for value change
697                if let LogVar::Read(v) = log {
698                    #[cfg(feature = "profiling")]
699                    self.tallies
700                        .n_redundant_read
701                        .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
702                    let crt_v = var.read_ref_atomic();
703                    if !Arc::ptr_eq(v, &crt_v) {
704                        return Err(StmError::Failure);
705                    }
706                }
707                #[cfg(feature = "profiling")]
708                if let LogVar::Write(_) = log {
709                    self.tallies
710                        .n_read_after_write
711                        .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
712                }
713                let value = Transaction::downcast(log.read());
714                let boxed = Arc::new(f(value));
715                entry.get_mut().write(boxed);
716            }
717            #[cfg(not(feature = "early-conflict-detection"))]
718            Entry::Occupied(mut entry) => {
719                #[cfg(feature = "profiling")]
720                {
721                    let log = entry.get();
722                    if let LogVar::Read(_) = log {
723                        self.tallies
724                            .n_redundant_read
725                            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
726                    } else if let LogVar::Write(_) = log {
727                        self.tallies
728                            .n_read_after_write
729                            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
730                    }
731                }
732
733                let value = Transaction::downcast(entry.get_mut().read());
734                let boxed = Arc::new(f(value));
735                entry.get_mut().write(boxed);
736            }
737            Entry::Vacant(entry) => {
738                // Read the value from the var.
739                let value = Transaction::downcast(var.read_ref_atomic());
740                let boxed = Arc::new(f(value));
741                entry.insert(LogVar::Write(boxed));
742            }
743        }
744
745        // For now always succeeds, but that may change later.
746        Ok(())
747    }
748
749    /// Replace a variable.
750    ///
751    /// The write is not immediately visible to other threads,
752    /// but atomically commited at the end of the computation.
753    ///
754    /// Prefer this method over calling `read` then `write` for performance.
755    pub fn replace<T: Any + Send + Sync + Clone>(
756        &mut self,
757        var: &TVar<T>,
758        value: T,
759    ) -> StmClosureResult<T> {
760        #[cfg(feature = "profiling")]
761        self.tallies
762            .n_write
763            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
764        // box the value
765        let boxed = Arc::new(value);
766
767        // new control block
768        let ctrl = var.control_block().clone();
769        // update or create new entry
770        #[cfg(not(feature = "hash-registers"))]
771        let key = ctrl;
772        #[cfg(feature = "hash-registers")]
773        let key = Arc::as_ptr(&ctrl);
774        let value = match self.vars.entry(key) {
775            // If the variable has been accessed before, then load that value.
776            #[cfg(feature = "early-conflict-detection")]
777            Entry::Occupied(mut entry) => {
778                let log = entry.get_mut();
779                // if we previously read the var, check for value change
780                if let LogVar::Read(v) = log {
781                    #[cfg(feature = "profiling")]
782                    self.tallies
783                        .n_redundant_read
784                        .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
785                    let crt_v = var.read_ref_atomic();
786                    if !Arc::ptr_eq(v, &crt_v) {
787                        return Err(StmError::Failure);
788                    }
789                }
790                #[cfg(feature = "profiling")]
791                if let LogVar::Write(_) = log {
792                    self.tallies
793                        .n_read_after_write
794                        .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
795                }
796                let value = log.read();
797                entry.get_mut().write(boxed);
798                value
799            }
800            #[cfg(not(feature = "early-conflict-detection"))]
801            Entry::Occupied(mut entry) => {
802                #[cfg(feature = "profiling")]
803                {
804                    let log = entry.get();
805                    if let LogVar::Read(_) = log {
806                        self.tallies
807                            .n_redundant_read
808                            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
809                    } else if let LogVar::Write(_) = log {
810                        self.tallies
811                            .n_read_after_write
812                            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
813                    }
814                }
815
816                let value = entry.get_mut().read();
817                entry.get_mut().write(boxed);
818                value
819            }
820            Entry::Vacant(entry) => {
821                // Read the value from the var.
822                let value = var.read_ref_atomic();
823                entry.insert(LogVar::Write(boxed));
824                value
825            }
826        };
827
828        // For now always succeeds, but that may change later.
829        Ok(Transaction::downcast(value))
830    }
831
832    /// Combine two calculations. When one blocks with `retry`,
833    /// run the other, but don't commit the changes in the first.
834    ///
835    /// If both block, `Transaction::or` still waits for `TVar`s in both functions.
836    /// Use `Transaction::or` instead of handling errors directly with the `Result::or`.
837    /// The later does not handle all the blocking correctly.
838    pub fn or<T, F1, F2>(&mut self, first: F1, second: F2) -> StmClosureResult<T>
839    where
840        F1: Fn(&mut Transaction) -> StmClosureResult<T>,
841        F2: Fn(&mut Transaction) -> StmClosureResult<T>,
842    {
843        // Create a backup of the log.
844        let mut copy = self.vars.clone();
845        // let mut copy = Transaction {
846        //     vars: self.vars.clone(),
847        // };
848
849        // Run the first computation.
850        let f = first(self);
851
852        match f {
853            // Run other on manual retry call.
854            Err(StmError::Retry) => {
855                // swap, so that self is the current run
856                mem::swap(&mut self.vars, &mut copy);
857
858                // Run other action.
859                let s = second(self);
860
861                // If both called retry then exit.
862                match s {
863                    Err(StmError::Failure) => Err(StmError::Failure),
864                    s => {
865                        self.combine(copy);
866                        s
867                    }
868                }
869            }
870
871            // Return success and failure directly
872            x => x,
873        }
874    }
875}
876
877/// Internal routines
878impl Transaction {
879    #[allow(clippy::needless_pass_by_value)]
880    /// Perform a downcast on a var.
881    fn downcast<T: Any + Clone>(var: Arc<dyn Any>) -> T {
882        match var.downcast_ref::<T>() {
883            Some(s) => s.clone(),
884            None => unreachable!("TVar has wrong type"),
885        }
886    }
887
888    /// Combine two logs into a single log, to allow waiting for all reads.
889    fn combine(&mut self, vars: RegisterType) {
890        // combine reads
891        for (var, value) in vars {
892            // only insert new values
893            if let Some(value) = value.obsolete() {
894                self.vars.entry(var).or_insert(value);
895            }
896        }
897    }
898
899    /// Clear the log's data.
900    ///
901    /// This should be used before redoing a computation, but
902    /// nowhere else.
903    fn clear(&mut self) {
904        self.vars.clear();
905    }
906
907    /// Wait for any variable to change,
908    /// because the change may lead to a new calculation result.
909    #[cfg(feature = "wait-on-retry")]
910    fn wait_for_change(&mut self) {
911        // Create control block for waiting.
912        let ctrl = Arc::new(ControlBlock::new());
913
914        #[allow(clippy::mutable_key_type)]
915        let vars = std::mem::take(&mut self.vars);
916        let mut reads = Vec::with_capacity(self.vars.len());
917
918        let blocking = vars
919            .into_iter()
920            .filter_map(|(a, b)| b.into_read_value().map(|b| (a, b)))
921            // Check for consistency.
922            .all(|(var, value)| {
923                #[cfg(feature = "hash-registers")]
924                let var = unsafe { var.as_ref() }.expect("E: unreachabel");
925                var.wait(&ctrl);
926                let x = {
927                    // Take read lock and read value.
928                    let guard = var.value.read();
929                    Arc::ptr_eq(&value, &guard)
930                };
931                reads.push(var);
932                x
933            });
934
935        // If no var has changed, then block.
936        if blocking {
937            // Propably wait until one var has changed.
938            ctrl.wait();
939        }
940
941        // Let others know that ctrl is dead.
942        // It does not matter, if we set too many
943        // to dead since it may slightly reduce performance
944        // but not break the semantics.
945        for var in &reads {
946            var.set_dead();
947        }
948    }
949
950    /// Write the log back to the variables.
951    ///
952    /// Return true for success and false, if a read var has changed
953    pub(crate) fn commit(&mut self) -> bool {
954        // Use two phase locking for safely writing data back to the vars.
955
956        // First phase: acquire locks.
957        // Check for consistency of all the reads and perform
958        // an early return if something is not consistent.
959
960        // Created arrays for storing the locks
961        // vector of locks.
962        let mut read_vec = Vec::with_capacity(self.vars.len());
963
964        // vector of tuple (value, lock)
965        let mut write_vec = Vec::with_capacity(self.vars.len());
966
967        // vector of written variables
968        let mut written = Vec::with_capacity(self.vars.len());
969
970        #[cfg(feature = "hash-registers")]
971        let records = {
972            let mut recs: Vec<_> = self.vars.iter().collect();
973            recs.sort_by(|(k1, _), (k2, _)| k1.cmp(&k2));
974            recs
975        };
976        #[cfg(not(feature = "hash-registers"))]
977        let records = &self.vars;
978
979        for (var, value) in records {
980            // lock the variable and read the value
981            #[cfg(feature = "hash-registers")]
982            let var = unsafe { var.as_ref() }.expect("E: unreachabel");
983
984            match *value {
985                // We need to take a write lock.
986                LogVar::Write(ref w) | LogVar::ReadObsoleteWrite(_, ref w) => {
987                    // take write lock
988                    let lock = var.value.write();
989                    // add all data to the vector
990                    write_vec.push((w, lock));
991                    written.push(var);
992                }
993
994                // We need to check for consistency and
995                // take a write lock.
996                LogVar::ReadWrite(ref original, ref w) => {
997                    // take write lock
998                    let lock = var.value.write();
999
1000                    if !Arc::ptr_eq(&lock, original) {
1001                        return false;
1002                    }
1003                    // add all data to the vector
1004                    write_vec.push((w, lock));
1005                    written.push(var);
1006                }
1007                // Nothing to do. ReadObsolete is only needed for blocking, not
1008                // for consistency checks.
1009                LogVar::ReadObsolete(_) => {}
1010                // Take read lock and check for consistency.
1011                LogVar::Read(ref original) => {
1012                    // Take a read lock.
1013                    let lock = var.value.read();
1014
1015                    if !Arc::ptr_eq(&lock, original) {
1016                        return false;
1017                    }
1018
1019                    read_vec.push(lock);
1020                }
1021            }
1022        }
1023
1024        // Second phase: write back and release
1025
1026        // Release the reads first.
1027        // This allows other threads to continue quickly.
1028        drop(read_vec);
1029
1030        for (value, mut lock) in write_vec {
1031            // Commit value.
1032            *lock = value.clone();
1033        }
1034
1035        #[cfg(feature = "wait-on-retry")]
1036        for var in written {
1037            // Unblock all threads waiting for it.
1038            var.wake_all();
1039        }
1040
1041        // Commit succeded.
1042        true
1043    }
1044}
1045
1046#[cfg(test)]
1047mod test {
1048    use super::*;
1049    #[test]
1050    fn read() {
1051        let mut log = Transaction::default();
1052        let var = TVar::new(vec![1, 2, 3, 4]);
1053
1054        // The variable can be read.
1055        assert_eq!(&*log.read(&var).unwrap(), &[1, 2, 3, 4]);
1056    }
1057
1058    #[test]
1059    fn write_read() {
1060        let mut log = Transaction::default();
1061        let var = TVar::new(vec![1, 2]);
1062
1063        log.write(&var, vec![1, 2, 3, 4]).unwrap();
1064
1065        // Consecutive reads get the updated version.
1066        assert_eq!(log.read(&var).unwrap(), [1, 2, 3, 4]);
1067
1068        // The original value is still preserved.
1069        assert_eq!(var.read_atomic(), [1, 2]);
1070    }
1071
1072    #[test]
1073    fn transaction_simple() {
1074        let x = Transaction::with(|_| Ok(42));
1075        assert_eq!(x, 42);
1076    }
1077
1078    #[test]
1079    fn transaction_read() {
1080        let read = TVar::new(42);
1081
1082        let x = Transaction::with(|trans| read.read(trans));
1083
1084        assert_eq!(x, 42);
1085    }
1086
1087    /// Run a transaction with a control function, that always aborts.
1088    /// The transaction still tries to run a single time and should successfully
1089    /// commit in this test.
1090    #[test]
1091    fn transaction_with_control_abort_on_single_run() {
1092        let read = TVar::new(42);
1093
1094        let x = Transaction::with_control(|_| TransactionControl::Abort, |tx| read.read(tx));
1095
1096        assert_eq!(x, Some(42));
1097    }
1098
1099    /// Run a transaction with a control function, that always aborts.
1100    /// The transaction retries infinitely often. The control function will abort this loop.
1101    #[test]
1102    fn transaction_with_control_abort_on_retry() {
1103        let x: Option<i32> =
1104            Transaction::with_control(|_| TransactionControl::Abort, |_| Err(StmError::Retry));
1105
1106        assert_eq!(x, None);
1107    }
1108
1109    #[test]
1110    fn transaction_write() {
1111        let write = TVar::new(42);
1112
1113        Transaction::with(|trans| write.write(trans, 0));
1114
1115        assert_eq!(write.read_atomic(), 0);
1116    }
1117
1118    #[test]
1119    fn transaction_copy() {
1120        let read = TVar::new(42);
1121        let write = TVar::new(0);
1122
1123        Transaction::with(|trans| {
1124            let r = read.read(trans)?;
1125            write.write(trans, r)
1126        });
1127
1128        assert_eq!(write.read_atomic(), 42);
1129    }
1130
1131    // Dat name. seriously?
1132    #[test]
1133    fn transaction_control_stuff() {
1134        let read = TVar::new(42);
1135        let write = TVar::new(0);
1136
1137        Transaction::with(|trans| {
1138            let r = read.read(trans)?;
1139            write.write(trans, r)
1140        });
1141
1142        assert_eq!(write.read_atomic(), 42);
1143    }
1144
1145    /// Test if nested transactions are correctly detected.
1146    #[test]
1147    #[should_panic]
1148    fn transaction_nested_fail() {
1149        Transaction::with(|_| {
1150            Transaction::with(|_| Ok(42));
1151            Ok(1)
1152        });
1153    }
1154}