1#[cfg(feature = "wait-on-retry")]
2pub mod control_block;
3pub mod log_var;
4
5cfg_if::cfg_if! {
6 if #[cfg(feature = "hash-registers")] {
7 use std::collections::hash_map::Entry;
8
9 use rustc_hash::FxHashMap;
10 } else {
11 use std::collections::{btree_map::Entry, BTreeMap};
12 }
13}
14
15use std::any::Any;
16use std::cell::Cell;
17use std::mem;
18use std::sync::Arc;
19
20use crate::result::{StmClosureResult, StmError};
21use crate::tvar::{TVar, VarControlBlock};
22use crate::{TransactionClosureResult, TransactionError, TransactionResult};
23
24#[cfg(feature = "wait-on-retry")]
25use control_block::ControlBlock;
26use log_var::LogVar;
27
28thread_local!(static TRANSACTION_RUNNING: Cell<bool> = const { Cell::new(false) });
29
30struct TransactionGuard;
34
35impl TransactionGuard {
36 pub fn new() -> TransactionGuard {
37 TRANSACTION_RUNNING.with(|t| {
38 assert!(!t.get(), "STM: Nested Transaction");
39 t.set(true);
40 });
41 TransactionGuard
42 }
43}
44
45impl Drop for TransactionGuard {
46 fn drop(&mut self) {
47 TRANSACTION_RUNNING.with(|t| {
48 t.set(false);
49 });
50 }
51}
52
53#[derive(Debug, Clone, Copy, PartialEq, Eq)]
54pub enum TransactionControl {
55 Retry,
56 Abort,
57}
58
59#[cfg(feature = "profiling")]
60#[derive(Debug, Default)]
61pub struct TransactionTallies {
62 pub n_attempts: std::sync::atomic::AtomicUsize,
63 pub n_retry: std::sync::atomic::AtomicUsize,
64 pub n_error: std::sync::atomic::AtomicUsize,
65 pub n_read: std::sync::atomic::AtomicUsize,
66 pub n_redundant_read: std::sync::atomic::AtomicUsize,
67 pub n_read_after_write: std::sync::atomic::AtomicUsize,
68 pub n_write: std::sync::atomic::AtomicUsize,
69}
70
71#[cfg(feature = "profiling")]
72impl std::ops::AddAssign for TransactionTallies {
73 fn add_assign(&mut self, rhs: Self) {
74 self.n_attempts.fetch_add(
75 rhs.n_attempts.load(std::sync::atomic::Ordering::Relaxed),
76 std::sync::atomic::Ordering::Relaxed,
77 );
78 self.n_retry.fetch_add(
79 rhs.n_retry.load(std::sync::atomic::Ordering::Relaxed),
80 std::sync::atomic::Ordering::Relaxed,
81 );
82 self.n_error.fetch_add(
83 rhs.n_error.load(std::sync::atomic::Ordering::Relaxed),
84 std::sync::atomic::Ordering::Relaxed,
85 );
86 self.n_read.fetch_add(
87 rhs.n_read.load(std::sync::atomic::Ordering::Relaxed),
88 std::sync::atomic::Ordering::Relaxed,
89 );
90 self.n_redundant_read.fetch_add(
91 rhs.n_redundant_read
92 .load(std::sync::atomic::Ordering::Relaxed),
93 std::sync::atomic::Ordering::Relaxed,
94 );
95 self.n_read_after_write.fetch_add(
96 rhs.n_read_after_write
97 .load(std::sync::atomic::Ordering::Relaxed),
98 std::sync::atomic::Ordering::Relaxed,
99 );
100 self.n_write.fetch_add(
101 rhs.n_write.load(std::sync::atomic::Ordering::Relaxed),
102 std::sync::atomic::Ordering::Relaxed,
103 );
104 }
105}
106
107#[cfg(feature = "profiling")]
108impl std::iter::Sum for TransactionTallies {
109 fn sum<I: Iterator<Item = Self>>(iter: I) -> Self {
110 iter.fold(Self::default(), |mut acc, t| {
111 acc += t;
112 acc
113 })
114 }
115}
116
117#[cfg(not(feature = "hash-registers"))]
120pub(crate) type RegisterType = BTreeMap<Arc<VarControlBlock>, LogVar>;
121#[cfg(feature = "hash-registers")]
122pub(crate) type RegisterType = FxHashMap<*const VarControlBlock, LogVar>;
123
124pub struct Transaction {
128 vars: RegisterType,
133 #[cfg(feature = "profiling")]
134 tallies: TransactionTallies,
135}
136
137impl Default for Transaction {
138 fn default() -> Self {
139 Self {
140 vars: RegisterType::default(),
141 #[cfg(feature = "profiling")]
142 tallies: TransactionTallies::default(),
143 }
144 }
145}
146
147impl Transaction {
149 pub fn with<T, F>(f: F) -> T
153 where
154 F: Fn(&mut Transaction) -> StmClosureResult<T>,
155 {
156 match Transaction::with_control(|_| TransactionControl::Retry, f) {
157 Some(t) => t,
158 None => unreachable!(),
159 }
160 }
161
162 pub fn with_control<T, F, C>(mut control: C, f: F) -> Option<T>
176 where
177 F: Fn(&mut Transaction) -> StmClosureResult<T>,
178 C: FnMut(StmError) -> TransactionControl,
179 {
180 let _guard = TransactionGuard::new();
181
182 let mut transaction = Transaction::default();
185
186 loop {
188 match f(&mut transaction) {
190 Ok(t) => {
192 if transaction.commit() {
193 return Some(t);
194 }
195 }
196
197 Err(e) => {
198 if let TransactionControl::Abort = control(e) {
200 return None;
201 }
202
203 #[cfg(feature = "wait-on-retry")]
205 if let StmError::Retry = e {
206 transaction.wait_for_change();
207 }
208 }
209 }
210
211 transaction.clear();
213 }
214 }
215
216 pub fn with_err<T, F, E>(f: F) -> Result<T, E>
222 where
223 F: Fn(&mut Transaction) -> TransactionClosureResult<T, E>,
224 {
225 let _guard = TransactionGuard::new();
226
227 let mut transaction = Transaction::default();
230
231 loop {
233 match f(&mut transaction) {
235 Ok(t) => {
237 if transaction.commit() {
238 return Ok(t);
239 }
240 }
241 Err(e) => match e {
243 TransactionError::Abort(err) => return Err(err),
245 TransactionError::Stm(_) => {
247 #[cfg(feature = "wait-on-retry")]
248 transaction.wait_for_change();
249 }
250 },
251 }
252
253 transaction.clear();
255 }
256 }
257
258 pub fn with_control_and_err<T, F, C, E>(mut control: C, f: F) -> TransactionResult<T, E>
272 where
273 F: Fn(&mut Transaction) -> TransactionClosureResult<T, E>,
274 C: FnMut(StmError) -> TransactionControl,
275 {
276 let _guard = TransactionGuard::new();
277
278 let mut transaction = Transaction::default();
281
282 loop {
284 match f(&mut transaction) {
286 Ok(t) => {
288 if transaction.commit() {
289 return TransactionResult::Validated(t);
290 }
291 }
292
293 Err(e) => {
294 match e {
295 TransactionError::Abort(err) => {
296 return TransactionResult::Cancelled(err);
297 }
298 TransactionError::Stm(err) => {
299 if let TransactionControl::Abort = control(err) {
301 return TransactionResult::Abandoned;
302 }
303
304 #[cfg(feature = "wait-on-retry")]
306 if let StmError::Retry = err {
307 transaction.wait_for_change();
308 }
309 }
310 }
311 }
312 }
313
314 transaction.clear();
316 }
317 }
318}
319
320#[cfg(feature = "profiling")]
321impl Transaction {
323 pub fn profile_with<T, F>(f: F) -> (T, TransactionTallies)
327 where
328 F: Fn(&mut Transaction) -> StmClosureResult<T>,
329 {
330 match Transaction::profile_with_control(|_| TransactionControl::Retry, f) {
331 (Some(t), tallies) => (t, tallies),
332 (None, _) => unreachable!(),
333 }
334 }
335
336 pub fn profile_with_control<T, F, C>(mut control: C, f: F) -> (Option<T>, TransactionTallies)
350 where
351 F: Fn(&mut Transaction) -> StmClosureResult<T>,
352 C: FnMut(StmError) -> TransactionControl,
353 {
354 let _guard = TransactionGuard::new();
355
356 let mut transaction = Transaction::default();
359
360 loop {
362 transaction
363 .tallies
364 .n_attempts
365 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
366 match f(&mut transaction) {
368 Ok(t) => {
370 if transaction.commit() {
371 return (Some(t), transaction.tallies);
372 }
373 }
374
375 Err(e) => {
376 match e {
378 StmError::Failure => {
379 transaction
380 .tallies
381 .n_error
382 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
383 }
384 StmError::Retry => {
385 transaction
386 .tallies
387 .n_retry
388 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
389 }
390 }
391
392 if let TransactionControl::Abort = control(e) {
393 return (None, transaction.tallies);
394 }
395
396 #[cfg(feature = "wait-on-retry")]
398 if let StmError::Retry = e {
399 transaction.wait_for_change();
400 }
401 }
402 }
403
404 transaction.clear();
406 }
407 }
408
409 pub fn profile_with_err<T, F, E>(f: F) -> (Result<T, E>, TransactionTallies)
415 where
416 F: Fn(&mut Transaction) -> TransactionClosureResult<T, E>,
417 {
418 let _guard = TransactionGuard::new();
419
420 let mut transaction = Transaction::default();
423
424 loop {
426 transaction
427 .tallies
428 .n_attempts
429 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
430 match f(&mut transaction) {
432 Ok(t) => {
434 if transaction.commit() {
435 return (Ok(t), transaction.tallies);
436 }
437 }
438 Err(e) => match e {
440 TransactionError::Abort(err) => return (Err(err), transaction.tallies),
442 TransactionError::Stm(err) => {
444 match err {
445 StmError::Failure => {
446 transaction
447 .tallies
448 .n_error
449 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
450 }
451 StmError::Retry => {
452 transaction
453 .tallies
454 .n_retry
455 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
456 }
457 }
458 #[cfg(feature = "wait-on-retry")]
459 transaction.wait_for_change();
460 }
461 },
462 }
463
464 transaction.clear();
466 }
467 }
468
469 pub fn profile_with_control_and_err<T, F, C, E>(
483 mut control: C,
484 f: F,
485 ) -> (TransactionResult<T, E>, TransactionTallies)
486 where
487 F: Fn(&mut Transaction) -> TransactionClosureResult<T, E>,
488 C: FnMut(StmError) -> TransactionControl,
489 {
490 let _guard = TransactionGuard::new();
491
492 let mut transaction = Transaction::default();
495
496 loop {
498 transaction
499 .tallies
500 .n_attempts
501 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
502 match f(&mut transaction) {
504 Ok(t) => {
506 if transaction.commit() {
507 return (TransactionResult::Validated(t), transaction.tallies);
508 }
509 }
510
511 Err(e) => {
512 match e {
513 TransactionError::Abort(err) => {
514 return (TransactionResult::Cancelled(err), transaction.tallies);
515 }
516 TransactionError::Stm(err) => {
517 match err {
518 StmError::Failure => {
519 transaction
520 .tallies
521 .n_error
522 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
523 }
524 StmError::Retry => {
525 transaction
526 .tallies
527 .n_retry
528 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
529 }
530 }
531
532 if let TransactionControl::Abort = control(err) {
534 return (TransactionResult::Abandoned, transaction.tallies);
535 }
536
537 #[cfg(feature = "wait-on-retry")]
539 if let StmError::Retry = err {
540 transaction.wait_for_change();
541 }
542 }
543 }
544 }
545 }
546
547 transaction.clear();
549 }
550 }
551}
552
553impl Transaction {
555 pub fn read<T: Send + Sync + Any + Clone>(&mut self, var: &TVar<T>) -> StmClosureResult<T> {
564 #[cfg(feature = "profiling")]
565 self.tallies
566 .n_read
567 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
568 let ctrl = var.control_block().clone();
569 #[cfg(not(feature = "hash-registers"))]
571 let key = ctrl;
572 #[cfg(feature = "hash-registers")]
573 let key = Arc::as_ptr(&ctrl);
574 let value = match self.vars.entry(key) {
575 #[cfg(feature = "early-conflict-detection")]
577 Entry::Occupied(mut entry) => {
578 let log = entry.get_mut();
579 if let LogVar::Read(v) = log {
581 #[cfg(feature = "profiling")]
582 self.tallies
583 .n_redundant_read
584 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
585 let crt_v = var.read_ref_atomic();
586 if !Arc::ptr_eq(v, &crt_v) {
587 return Err(StmError::Failure);
588 }
589 }
590 #[cfg(feature = "profiling")]
591 if let LogVar::Write(_) = log {
592 self.tallies
593 .n_read_after_write
594 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
595 }
596 log.read()
597 }
598 #[cfg(not(feature = "early-conflict-detection"))]
599 Entry::Occupied(mut entry) => {
600 #[cfg(feature = "profiling")]
601 {
602 let log = entry.get();
603 if let LogVar::Read(_) = log {
604 self.tallies
605 .n_redundant_read
606 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
607 } else if let LogVar::Write(_) = log {
608 self.tallies
609 .n_read_after_write
610 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
611 }
612 }
613
614 entry.get_mut().read()
615 }
616
617 Entry::Vacant(entry) => {
619 let value = var.read_ref_atomic();
621
622 entry.insert(LogVar::Read(value.clone()));
624 value
625 }
626 };
627
628 Ok(Transaction::downcast(value))
629 }
630
631 pub fn write<T: Any + Send + Sync + Clone>(
636 &mut self,
637 var: &TVar<T>,
638 value: T,
639 ) -> StmClosureResult<()> {
640 #[cfg(feature = "profiling")]
641 self.tallies
642 .n_write
643 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
644 let boxed = Arc::new(value);
646
647 let ctrl = var.control_block().clone();
649 #[cfg(not(feature = "hash-registers"))]
651 let key = ctrl;
652 #[cfg(feature = "hash-registers")]
653 let key = Arc::as_ptr(&ctrl);
654 match self.vars.entry(key) {
655 Entry::Occupied(mut entry) => entry.get_mut().write(boxed),
656 Entry::Vacant(entry) => {
657 entry.insert(LogVar::Write(boxed));
658 }
659 }
660
661 Ok(())
663 }
664
665 pub fn modify<T: Any + Send + Sync + Clone, F>(
672 &mut self,
673 var: &TVar<T>,
674 f: F,
675 ) -> StmClosureResult<()>
676 where
677 F: FnOnce(T) -> T,
678 {
679 #[cfg(feature = "profiling")]
680 self.tallies
681 .n_write
682 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
683
684 let ctrl = var.control_block().clone();
686 #[cfg(not(feature = "hash-registers"))]
688 let key = ctrl;
689 #[cfg(feature = "hash-registers")]
690 let key = Arc::as_ptr(&ctrl);
691 match self.vars.entry(key) {
692 #[cfg(feature = "early-conflict-detection")]
694 Entry::Occupied(mut entry) => {
695 let log = entry.get_mut();
696 if let LogVar::Read(v) = log {
698 #[cfg(feature = "profiling")]
699 self.tallies
700 .n_redundant_read
701 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
702 let crt_v = var.read_ref_atomic();
703 if !Arc::ptr_eq(v, &crt_v) {
704 return Err(StmError::Failure);
705 }
706 }
707 #[cfg(feature = "profiling")]
708 if let LogVar::Write(_) = log {
709 self.tallies
710 .n_read_after_write
711 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
712 }
713 let value = Transaction::downcast(log.read());
714 let boxed = Arc::new(f(value));
715 entry.get_mut().write(boxed);
716 }
717 #[cfg(not(feature = "early-conflict-detection"))]
718 Entry::Occupied(mut entry) => {
719 #[cfg(feature = "profiling")]
720 {
721 let log = entry.get();
722 if let LogVar::Read(_) = log {
723 self.tallies
724 .n_redundant_read
725 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
726 } else if let LogVar::Write(_) = log {
727 self.tallies
728 .n_read_after_write
729 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
730 }
731 }
732
733 let value = Transaction::downcast(entry.get_mut().read());
734 let boxed = Arc::new(f(value));
735 entry.get_mut().write(boxed);
736 }
737 Entry::Vacant(entry) => {
738 let value = Transaction::downcast(var.read_ref_atomic());
740 let boxed = Arc::new(f(value));
741 entry.insert(LogVar::Write(boxed));
742 }
743 }
744
745 Ok(())
747 }
748
749 pub fn replace<T: Any + Send + Sync + Clone>(
756 &mut self,
757 var: &TVar<T>,
758 value: T,
759 ) -> StmClosureResult<T> {
760 #[cfg(feature = "profiling")]
761 self.tallies
762 .n_write
763 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
764 let boxed = Arc::new(value);
766
767 let ctrl = var.control_block().clone();
769 #[cfg(not(feature = "hash-registers"))]
771 let key = ctrl;
772 #[cfg(feature = "hash-registers")]
773 let key = Arc::as_ptr(&ctrl);
774 let value = match self.vars.entry(key) {
775 #[cfg(feature = "early-conflict-detection")]
777 Entry::Occupied(mut entry) => {
778 let log = entry.get_mut();
779 if let LogVar::Read(v) = log {
781 #[cfg(feature = "profiling")]
782 self.tallies
783 .n_redundant_read
784 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
785 let crt_v = var.read_ref_atomic();
786 if !Arc::ptr_eq(v, &crt_v) {
787 return Err(StmError::Failure);
788 }
789 }
790 #[cfg(feature = "profiling")]
791 if let LogVar::Write(_) = log {
792 self.tallies
793 .n_read_after_write
794 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
795 }
796 let value = log.read();
797 entry.get_mut().write(boxed);
798 value
799 }
800 #[cfg(not(feature = "early-conflict-detection"))]
801 Entry::Occupied(mut entry) => {
802 #[cfg(feature = "profiling")]
803 {
804 let log = entry.get();
805 if let LogVar::Read(_) = log {
806 self.tallies
807 .n_redundant_read
808 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
809 } else if let LogVar::Write(_) = log {
810 self.tallies
811 .n_read_after_write
812 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
813 }
814 }
815
816 let value = entry.get_mut().read();
817 entry.get_mut().write(boxed);
818 value
819 }
820 Entry::Vacant(entry) => {
821 let value = var.read_ref_atomic();
823 entry.insert(LogVar::Write(boxed));
824 value
825 }
826 };
827
828 Ok(Transaction::downcast(value))
830 }
831
832 pub fn or<T, F1, F2>(&mut self, first: F1, second: F2) -> StmClosureResult<T>
839 where
840 F1: Fn(&mut Transaction) -> StmClosureResult<T>,
841 F2: Fn(&mut Transaction) -> StmClosureResult<T>,
842 {
843 let mut copy = self.vars.clone();
845 let f = first(self);
851
852 match f {
853 Err(StmError::Retry) => {
855 mem::swap(&mut self.vars, &mut copy);
857
858 let s = second(self);
860
861 match s {
863 Err(StmError::Failure) => Err(StmError::Failure),
864 s => {
865 self.combine(copy);
866 s
867 }
868 }
869 }
870
871 x => x,
873 }
874 }
875}
876
877impl Transaction {
879 #[allow(clippy::needless_pass_by_value)]
880 fn downcast<T: Any + Clone>(var: Arc<dyn Any>) -> T {
882 match var.downcast_ref::<T>() {
883 Some(s) => s.clone(),
884 None => unreachable!("TVar has wrong type"),
885 }
886 }
887
888 fn combine(&mut self, vars: RegisterType) {
890 for (var, value) in vars {
892 if let Some(value) = value.obsolete() {
894 self.vars.entry(var).or_insert(value);
895 }
896 }
897 }
898
899 fn clear(&mut self) {
904 self.vars.clear();
905 }
906
907 #[cfg(feature = "wait-on-retry")]
910 fn wait_for_change(&mut self) {
911 let ctrl = Arc::new(ControlBlock::new());
913
914 #[allow(clippy::mutable_key_type)]
915 let vars = std::mem::take(&mut self.vars);
916 let mut reads = Vec::with_capacity(self.vars.len());
917
918 let blocking = vars
919 .into_iter()
920 .filter_map(|(a, b)| b.into_read_value().map(|b| (a, b)))
921 .all(|(var, value)| {
923 #[cfg(feature = "hash-registers")]
924 let var = unsafe { var.as_ref() }.expect("E: unreachabel");
925 var.wait(&ctrl);
926 let x = {
927 let guard = var.value.read();
929 Arc::ptr_eq(&value, &guard)
930 };
931 reads.push(var);
932 x
933 });
934
935 if blocking {
937 ctrl.wait();
939 }
940
941 for var in &reads {
946 var.set_dead();
947 }
948 }
949
950 pub(crate) fn commit(&mut self) -> bool {
954 let mut read_vec = Vec::with_capacity(self.vars.len());
963
964 let mut write_vec = Vec::with_capacity(self.vars.len());
966
967 let mut written = Vec::with_capacity(self.vars.len());
969
970 #[cfg(feature = "hash-registers")]
971 let records = {
972 let mut recs: Vec<_> = self.vars.iter().collect();
973 recs.sort_by(|(k1, _), (k2, _)| k1.cmp(&k2));
974 recs
975 };
976 #[cfg(not(feature = "hash-registers"))]
977 let records = &self.vars;
978
979 for (var, value) in records {
980 #[cfg(feature = "hash-registers")]
982 let var = unsafe { var.as_ref() }.expect("E: unreachabel");
983
984 match *value {
985 LogVar::Write(ref w) | LogVar::ReadObsoleteWrite(_, ref w) => {
987 let lock = var.value.write();
989 write_vec.push((w, lock));
991 written.push(var);
992 }
993
994 LogVar::ReadWrite(ref original, ref w) => {
997 let lock = var.value.write();
999
1000 if !Arc::ptr_eq(&lock, original) {
1001 return false;
1002 }
1003 write_vec.push((w, lock));
1005 written.push(var);
1006 }
1007 LogVar::ReadObsolete(_) => {}
1010 LogVar::Read(ref original) => {
1012 let lock = var.value.read();
1014
1015 if !Arc::ptr_eq(&lock, original) {
1016 return false;
1017 }
1018
1019 read_vec.push(lock);
1020 }
1021 }
1022 }
1023
1024 drop(read_vec);
1029
1030 for (value, mut lock) in write_vec {
1031 *lock = value.clone();
1033 }
1034
1035 #[cfg(feature = "wait-on-retry")]
1036 for var in written {
1037 var.wake_all();
1039 }
1040
1041 true
1043 }
1044}
1045
1046#[cfg(test)]
1047mod test {
1048 use super::*;
1049 #[test]
1050 fn read() {
1051 let mut log = Transaction::default();
1052 let var = TVar::new(vec![1, 2, 3, 4]);
1053
1054 assert_eq!(&*log.read(&var).unwrap(), &[1, 2, 3, 4]);
1056 }
1057
1058 #[test]
1059 fn write_read() {
1060 let mut log = Transaction::default();
1061 let var = TVar::new(vec![1, 2]);
1062
1063 log.write(&var, vec![1, 2, 3, 4]).unwrap();
1064
1065 assert_eq!(log.read(&var).unwrap(), [1, 2, 3, 4]);
1067
1068 assert_eq!(var.read_atomic(), [1, 2]);
1070 }
1071
1072 #[test]
1073 fn transaction_simple() {
1074 let x = Transaction::with(|_| Ok(42));
1075 assert_eq!(x, 42);
1076 }
1077
1078 #[test]
1079 fn transaction_read() {
1080 let read = TVar::new(42);
1081
1082 let x = Transaction::with(|trans| read.read(trans));
1083
1084 assert_eq!(x, 42);
1085 }
1086
1087 #[test]
1091 fn transaction_with_control_abort_on_single_run() {
1092 let read = TVar::new(42);
1093
1094 let x = Transaction::with_control(|_| TransactionControl::Abort, |tx| read.read(tx));
1095
1096 assert_eq!(x, Some(42));
1097 }
1098
1099 #[test]
1102 fn transaction_with_control_abort_on_retry() {
1103 let x: Option<i32> =
1104 Transaction::with_control(|_| TransactionControl::Abort, |_| Err(StmError::Retry));
1105
1106 assert_eq!(x, None);
1107 }
1108
1109 #[test]
1110 fn transaction_write() {
1111 let write = TVar::new(42);
1112
1113 Transaction::with(|trans| write.write(trans, 0));
1114
1115 assert_eq!(write.read_atomic(), 0);
1116 }
1117
1118 #[test]
1119 fn transaction_copy() {
1120 let read = TVar::new(42);
1121 let write = TVar::new(0);
1122
1123 Transaction::with(|trans| {
1124 let r = read.read(trans)?;
1125 write.write(trans, r)
1126 });
1127
1128 assert_eq!(write.read_atomic(), 42);
1129 }
1130
1131 #[test]
1133 fn transaction_control_stuff() {
1134 let read = TVar::new(42);
1135 let write = TVar::new(0);
1136
1137 Transaction::with(|trans| {
1138 let r = read.read(trans)?;
1139 write.write(trans, r)
1140 });
1141
1142 assert_eq!(write.read_atomic(), 42);
1143 }
1144
1145 #[test]
1147 #[should_panic]
1148 fn transaction_nested_fail() {
1149 Transaction::with(|_| {
1150 Transaction::with(|_| Ok(42));
1151 Ok(1)
1152 });
1153 }
1154}