fast_stm/transaction/mod.rs
1pub mod control_block;
2pub mod log_var;
3
4use std::any::Any;
5use std::cell::Cell;
6use std::collections::btree_map::Entry;
7use std::collections::BTreeMap;
8use std::mem;
9use std::sync::Arc;
10
11use crate::{TransactionClosureResult, TransactionError, TransactionResult};
12
13use self::control_block::ControlBlock;
14use self::log_var::LogVar;
15use super::result::{StmClosureResult, StmError};
16use super::tvar::{TVar, VarControlBlock};
17
18thread_local!(static TRANSACTION_RUNNING: Cell<bool> = const { Cell::new(false) });
19
20/// `TransactionGuard` checks against nested STM calls.
21///
22/// Use guard, so that it correctly marks the Transaction as finished.
23struct TransactionGuard;
24
25impl TransactionGuard {
26 pub fn new() -> TransactionGuard {
27 TRANSACTION_RUNNING.with(|t| {
28 assert!(!t.get(), "STM: Nested Transaction");
29 t.set(true);
30 });
31 TransactionGuard
32 }
33}
34
35impl Drop for TransactionGuard {
36 fn drop(&mut self) {
37 TRANSACTION_RUNNING.with(|t| {
38 t.set(false);
39 });
40 }
41}
42
43#[derive(Debug, Clone, Copy, PartialEq, Eq)]
44pub enum TransactionControl {
45 Retry,
46 Abort,
47}
48
49/// Transaction tracks all the read and written variables.
50///
51/// It is used for checking vars, to ensure atomicity.
52pub struct Transaction {
53 /// Map of all vars that map the `VarControlBlock` of a var to a `LogVar`.
54 /// The `VarControlBlock` is unique because it uses it's address for comparing.
55 ///
56 /// The logs need to be accessed in a order to prevend dead-locks on locking.
57 vars: BTreeMap<Arc<VarControlBlock>, LogVar>,
58}
59
60impl Transaction {
61 /// Create a new log.
62 ///
63 /// Normally you don't need to call this directly.
64 /// Use `atomically` instead.
65 fn new() -> Transaction {
66 Transaction {
67 vars: BTreeMap::new(),
68 }
69 }
70
71 /// Run a function with a transaction.
72 ///
73 /// It is equivalent to `atomically`.
74 pub fn with<T, F>(f: F) -> T
75 where
76 F: Fn(&mut Transaction) -> StmClosureResult<T>,
77 {
78 match Transaction::with_control(|_| TransactionControl::Retry, f) {
79 Some(t) => t,
80 None => unreachable!(),
81 }
82 }
83
84 /// Run a function with a transaction.
85 ///
86 /// `with_control` takes another control function, that
87 /// can steer the control flow and possible terminate early.
88 ///
89 /// `control` can react to counters, timeouts or external inputs.
90 ///
91 /// It allows the user to fall back to another strategy, like a global lock
92 /// in the case of too much contention.
93 ///
94 /// Please not, that the transaction may still infinitely wait for changes when `retry` is
95 /// called and `control` does not abort.
96 /// If you need a timeout, another thread should signal this through a [`TVar`].
97 pub fn with_control<T, F, C>(mut control: C, f: F) -> Option<T>
98 where
99 F: Fn(&mut Transaction) -> StmClosureResult<T>,
100 C: FnMut(StmError) -> TransactionControl,
101 {
102 let _guard = TransactionGuard::new();
103
104 // create a log guard for initializing and cleaning up
105 // the log
106 let mut transaction = Transaction::new();
107
108 // loop until success
109 loop {
110 // run the computation
111 match f(&mut transaction) {
112 // on success exit loop
113 Ok(t) => {
114 if transaction.commit() {
115 return Some(t);
116 }
117 }
118
119 Err(e) => {
120 // Check if the user wants to abort the transaction.
121 if let TransactionControl::Abort = control(e) {
122 return None;
123 }
124
125 // on retry wait for changes
126 if let StmError::Retry = e {
127 transaction.wait_for_change();
128 }
129 }
130 }
131
132 // clear log before retrying computation
133 transaction.clear();
134 }
135 }
136
137 /// Run a function with a transaction.
138 ///
139 /// The transaction will be retried until:
140 /// - it is validated, or
141 /// - it is explicitly aborted from the function, using the `TODO` function.
142 pub fn with_err<T, F, E>(f: F) -> Result<T, E>
143 where
144 F: Fn(&mut Transaction) -> TransactionClosureResult<T, E>,
145 {
146 let _guard = TransactionGuard::new();
147
148 // create a log guard for initializing and cleaning up
149 // the log
150 let mut transaction = Transaction::new();
151
152 // loop until success
153 loop {
154 // run the computation
155 match f(&mut transaction) {
156 // on success exit loop
157 Ok(t) => {
158 if transaction.commit() {
159 return Ok(t);
160 }
161 }
162 // on error,
163 Err(e) => match e {
164 // abort and return the error
165 TransactionError::Abort(err) => return Err(err),
166 // retry
167 TransactionError::Stm(_) => {
168 transaction.wait_for_change();
169 }
170 },
171 }
172
173 // clear log before retrying computation
174 transaction.clear();
175 }
176 }
177
178 /// Run a function with a transaction.
179 ///
180 /// `with_control` takes another control function, that
181 /// can steer the control flow and possible terminate early.
182 ///
183 /// `control` can react to counters, timeouts or external inputs.
184 ///
185 /// It allows the user to fall back to another strategy, like a global lock
186 /// in the case of too much contention.
187 ///
188 /// Please not, that the transaction may still infinitely wait for changes when `retry` is
189 /// called and `control` does not abort.
190 /// If you need a timeout, another thread should signal this through a [`TVar`].
191 pub fn with_control_and_err<T, F, C, E>(mut control: C, f: F) -> TransactionResult<T, E>
192 where
193 F: Fn(&mut Transaction) -> TransactionClosureResult<T, E>,
194 C: FnMut(StmError) -> TransactionControl,
195 {
196 let _guard = TransactionGuard::new();
197
198 // create a log guard for initializing and cleaning up
199 // the log
200 let mut transaction = Transaction::new();
201
202 // loop until success
203 loop {
204 // run the computation
205 match f(&mut transaction) {
206 // on success exit loop
207 Ok(t) => {
208 if transaction.commit() {
209 return TransactionResult::Validated(t);
210 }
211 }
212
213 Err(e) => {
214 match e {
215 TransactionError::Abort(err) => {
216 return TransactionResult::Cancelled(err);
217 }
218 TransactionError::Stm(err) => {
219 // Check if the user wants to abort the transaction.
220 if let TransactionControl::Abort = control(err) {
221 return TransactionResult::Abandoned;
222 }
223
224 // on retry wait for changes
225 if let StmError::Retry = err {
226 transaction.wait_for_change();
227 }
228 }
229 }
230 }
231 }
232
233 // clear log before retrying computation
234 transaction.clear();
235 }
236 }
237
238 #[allow(clippy::needless_pass_by_value)]
239 /// Perform a downcast on a var.
240 fn downcast<T: Any + Clone>(var: Arc<dyn Any>) -> T {
241 match var.downcast_ref::<T>() {
242 Some(s) => s.clone(),
243 None => unreachable!("TVar has wrong type"),
244 }
245 }
246
247 /// Read a variable and return the value.
248 ///
249 /// The returned value is not always consistent with the current value of the var,
250 /// but may be an outdated or or not yet commited value.
251 ///
252 /// The used code should be capable of handling inconsistent states
253 /// without running into infinite loops.
254 /// Just the commit of wrong values is prevented by STM.
255 pub fn read<T: Send + Sync + Any + Clone>(&mut self, var: &TVar<T>) -> StmClosureResult<T> {
256 let ctrl = var.control_block().clone();
257 // Check if the same var was written before.
258 let value = match self.vars.entry(ctrl) {
259 // If the variable has been accessed before, then load that value.
260 Entry::Occupied(mut entry) => entry.get_mut().read(),
261
262 // Else load the variable statically.
263 Entry::Vacant(entry) => {
264 // Read the value from the var.
265 let value = var.read_ref_atomic();
266
267 // Store in in an entry.
268 entry.insert(LogVar::Read(value.clone()));
269 value
270 }
271 };
272
273 // For now always succeeds, but that may change later.
274 Ok(Transaction::downcast(value))
275 }
276
277 /// Write a variable.
278 ///
279 /// The write is not immediately visible to other threads,
280 /// but atomically commited at the end of the computation.
281 pub fn write<T: Any + Send + Sync + Clone>(
282 &mut self,
283 var: &TVar<T>,
284 value: T,
285 ) -> StmClosureResult<()> {
286 // box the value
287 let boxed = Arc::new(value);
288
289 // new control block
290 let ctrl = var.control_block().clone();
291 // update or create new entry
292 match self.vars.entry(ctrl) {
293 Entry::Occupied(mut entry) => entry.get_mut().write(boxed),
294 Entry::Vacant(entry) => {
295 entry.insert(LogVar::Write(boxed));
296 }
297 }
298
299 // For now always succeeds, but that may change later.
300 Ok(())
301 }
302
303 /// Combine two calculations. When one blocks with `retry`,
304 /// run the other, but don't commit the changes in the first.
305 ///
306 /// If both block, `Transaction::or` still waits for `TVar`s in both functions.
307 /// Use `Transaction::or` instead of handling errors directly with the `Result::or`.
308 /// The later does not handle all the blocking correctly.
309 pub fn or<T, F1, F2>(&mut self, first: F1, second: F2) -> StmClosureResult<T>
310 where
311 F1: Fn(&mut Transaction) -> StmClosureResult<T>,
312 F2: Fn(&mut Transaction) -> StmClosureResult<T>,
313 {
314 // Create a backup of the log.
315 let mut copy = Transaction {
316 vars: self.vars.clone(),
317 };
318
319 // Run the first computation.
320 let f = first(self);
321
322 match f {
323 // Run other on manual retry call.
324 Err(StmError::Retry) => {
325 // swap, so that self is the current run
326 mem::swap(self, &mut copy);
327
328 // Run other action.
329 let s = second(self);
330
331 // If both called retry then exit.
332 match s {
333 Err(StmError::Failure) => Err(StmError::Failure),
334 s => {
335 self.combine(copy);
336 s
337 }
338 }
339 }
340
341 // Return success and failure directly
342 x => x,
343 }
344 }
345
346 /// Combine two logs into a single log, to allow waiting for all reads.
347 fn combine(&mut self, other: Transaction) {
348 // combine reads
349 for (var, value) in other.vars {
350 // only insert new values
351 if let Some(value) = value.obsolete() {
352 self.vars.entry(var).or_insert(value);
353 }
354 }
355 }
356
357 /// Clear the log's data.
358 ///
359 /// This should be used before redoing a computation, but
360 /// nowhere else.
361 fn clear(&mut self) {
362 self.vars.clear();
363 }
364
365 /// Wait for any variable to change,
366 /// because the change may lead to a new calculation result.
367 fn wait_for_change(&mut self) {
368 // Create control block for waiting.
369 let ctrl = Arc::new(ControlBlock::new());
370
371 #[allow(clippy::mutable_key_type)]
372 let vars = std::mem::take(&mut self.vars);
373 let mut reads = Vec::with_capacity(self.vars.len());
374
375 let blocking = vars
376 .into_iter()
377 .filter_map(|(a, b)| b.into_read_value().map(|b| (a, b)))
378 // Check for consistency.
379 .all(|(var, value)| {
380 var.wait(&ctrl);
381 let x = {
382 // Take read lock and read value.
383 let guard = var.value.read();
384 Arc::ptr_eq(&value, &guard)
385 };
386 reads.push(var);
387 x
388 });
389
390 // If no var has changed, then block.
391 if blocking {
392 // Propably wait until one var has changed.
393 ctrl.wait();
394 }
395
396 // Let others know that ctrl is dead.
397 // It does not matter, if we set too many
398 // to dead since it may slightly reduce performance
399 // but not break the semantics.
400 for var in &reads {
401 var.set_dead();
402 }
403 }
404
405 /// Write the log back to the variables.
406 ///
407 /// Return true for success and false, if a read var has changed
408 fn commit(&mut self) -> bool {
409 // Use two phase locking for safely writing data back to the vars.
410
411 // First phase: acquire locks.
412 // Check for consistency of all the reads and perform
413 // an early return if something is not consistent.
414
415 // Created arrays for storing the locks
416 // vector of locks.
417 let mut read_vec = Vec::with_capacity(self.vars.len());
418
419 // vector of tuple (value, lock)
420 let mut write_vec = Vec::with_capacity(self.vars.len());
421
422 // vector of written variables
423 let mut written = Vec::with_capacity(self.vars.len());
424
425 for (var, value) in &self.vars {
426 // lock the variable and read the value
427
428 match *value {
429 // We need to take a write lock.
430 LogVar::Write(ref w) | LogVar::ReadObsoleteWrite(_, ref w) => {
431 // take write lock
432 let lock = var.value.write();
433 // add all data to the vector
434 write_vec.push((w, lock));
435 written.push(var);
436 }
437
438 // We need to check for consistency and
439 // take a write lock.
440 LogVar::ReadWrite(ref original, ref w) => {
441 // take write lock
442 let lock = var.value.write();
443
444 if !Arc::ptr_eq(&lock, original) {
445 return false;
446 }
447 // add all data to the vector
448 write_vec.push((w, lock));
449 written.push(var);
450 }
451 // Nothing to do. ReadObsolete is only needed for blocking, not
452 // for consistency checks.
453 LogVar::ReadObsolete(_) => {}
454 // Take read lock and check for consistency.
455 LogVar::Read(ref original) => {
456 // Take a read lock.
457 let lock = var.value.read();
458
459 if !Arc::ptr_eq(&lock, original) {
460 return false;
461 }
462
463 read_vec.push(lock);
464 }
465 }
466 }
467
468 // Second phase: write back and release
469
470 // Release the reads first.
471 // This allows other threads to continue quickly.
472 drop(read_vec);
473
474 for (value, mut lock) in write_vec {
475 // Commit value.
476 *lock = value.clone();
477 }
478
479 for var in written {
480 // Unblock all threads waiting for it.
481 var.wake_all();
482 }
483
484 // Commit succeded.
485 true
486 }
487}
488
489#[cfg(test)]
490mod test {
491 use super::*;
492 #[test]
493 fn read() {
494 let mut log = Transaction::new();
495 let var = TVar::new(vec![1, 2, 3, 4]);
496
497 // The variable can be read.
498 assert_eq!(&*log.read(&var).unwrap(), &[1, 2, 3, 4]);
499 }
500
501 #[test]
502 fn write_read() {
503 let mut log = Transaction::new();
504 let var = TVar::new(vec![1, 2]);
505
506 log.write(&var, vec![1, 2, 3, 4]).unwrap();
507
508 // Consecutive reads get the updated version.
509 assert_eq!(log.read(&var).unwrap(), [1, 2, 3, 4]);
510
511 // The original value is still preserved.
512 assert_eq!(var.read_atomic(), [1, 2]);
513 }
514
515 #[test]
516 fn transaction_simple() {
517 let x = Transaction::with(|_| Ok(42));
518 assert_eq!(x, 42);
519 }
520
521 #[test]
522 fn transaction_read() {
523 let read = TVar::new(42);
524
525 let x = Transaction::with(|trans| read.read(trans));
526
527 assert_eq!(x, 42);
528 }
529
530 /// Run a transaction with a control function, that always aborts.
531 /// The transaction still tries to run a single time and should successfully
532 /// commit in this test.
533 #[test]
534 fn transaction_with_control_abort_on_single_run() {
535 let read = TVar::new(42);
536
537 let x = Transaction::with_control(|_| TransactionControl::Abort, |tx| read.read(tx));
538
539 assert_eq!(x, Some(42));
540 }
541
542 /// Run a transaction with a control function, that always aborts.
543 /// The transaction retries infinitely often. The control function will abort this loop.
544 #[test]
545 fn transaction_with_control_abort_on_retry() {
546 let x: Option<i32> =
547 Transaction::with_control(|_| TransactionControl::Abort, |_| Err(StmError::Retry));
548
549 assert_eq!(x, None);
550 }
551
552 #[test]
553 fn transaction_write() {
554 let write = TVar::new(42);
555
556 Transaction::with(|trans| write.write(trans, 0));
557
558 assert_eq!(write.read_atomic(), 0);
559 }
560
561 #[test]
562 fn transaction_copy() {
563 let read = TVar::new(42);
564 let write = TVar::new(0);
565
566 Transaction::with(|trans| {
567 let r = read.read(trans)?;
568 write.write(trans, r)
569 });
570
571 assert_eq!(write.read_atomic(), 42);
572 }
573
574 // Dat name. seriously?
575 #[test]
576 fn transaction_control_stuff() {
577 let read = TVar::new(42);
578 let write = TVar::new(0);
579
580 Transaction::with(|trans| {
581 let r = read.read(trans)?;
582 write.write(trans, r)
583 });
584
585 assert_eq!(write.read_atomic(), 42);
586 }
587
588 /// Test if nested transactions are correctly detected.
589 #[test]
590 #[should_panic]
591 fn transaction_nested_fail() {
592 Transaction::with(|_| {
593 Transaction::with(|_| Ok(42));
594 Ok(1)
595 });
596 }
597}