poc_kokkos_rs/routines/
dispatch.rs

1//! kernel dispatch code
2//!
3//! This module contains all code used to dispatch computational kernels
4//! onto specified devices. Note that the documentation is feature-specific when the
5//! items are, i.e. documentation is altered by enabled features.
6//!
7//! The methods desccribed in this module are not meant to be used directly, they are only
8//! building blocks for the parallel statements.
9
10#[cfg(any(doc, feature = "rayon", feature = "gpu"))]
11use crate::functor::ForKernelType;
12
13#[cfg(feature = "rayon")]
14use rayon::prelude::*;
15
16use std::{fmt::Display, ops::Range};
17
18use super::parameters::{ExecutionPolicy, RangePolicy};
19use crate::functor::{KernelArgs, SerialForKernelType};
20
21// enums
22
23/// Enum used to classify possible dispatch errors.
24///
25/// In all variants, the internal value is a description of the error.
26#[derive(Debug)]
27pub enum DispatchError {
28    /// Error occured during serial dispatch.
29    Serial(&'static str),
30    /// Error occured during parallel CPU dispatch.
31    CPU(&'static str),
32    /// Error occured during GPU dispatch.
33    GPU(&'static str),
34}
35
36impl Display for DispatchError {
37    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
38        match self {
39            DispatchError::Serial(desc) => write!(f, "error during serial dispatch: {desc}"),
40            DispatchError::CPU(desc) => write!(f, "error during cpu dispatch: {desc}"),
41            DispatchError::GPU(desc) => write!(f, "error during gpu dispatch: {desc}"),
42        }
43    }
44}
45
46impl std::error::Error for DispatchError {
47    // may be useful in case of an error coming from an std call
48    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
49        None
50    }
51}
52
53// dispatch routines
54
55// internal routines
56
57/// Builds a N-depth nested loop executing a kernel using the N resulting indices.
58/// Technically, this should be replaced by a tiling function, for both serial and parallel
59/// implementations.
60fn recursive_loop<const N: usize>(ranges: &[Range<usize>; N], mut kernel: SerialForKernelType<N>) {
61    // handles recursions
62    fn inner<const N: usize>(
63        current_depth: usize,
64        ranges: &[Range<usize>; N],
65        kernel: &mut SerialForKernelType<N>,
66        indices: &mut [usize; N],
67    ) {
68        if current_depth == N {
69            // all loops unraveled
70            // call the kernel
71            kernel(KernelArgs::IndexND(*indices))
72        } else {
73            // loop on next dimension; update indices
74            // can we avoid a clone by passing a slice starting one element
75            // after the unraveled range ?
76            ranges[current_depth].clone().for_each(|i_current| {
77                indices[current_depth] = i_current;
78                inner(current_depth + 1, ranges, kernel, indices);
79            });
80        }
81    }
82
83    let mut indices = [0; N];
84    inner(0, ranges, &mut kernel, &mut indices);
85}
86
87// serial dispatch
88
89/// CPU dispatch routine of `for` statements. Does not depend on enabled feature(s).
90///
91/// The dispatch function execute the kernel accordingly to the directives contained in the
92/// execution policy. The kernel signature does not vary according to enabled features as this
93/// is the invariant fallback dispatch routine.
94pub fn serial<const N: usize>(
95    execp: ExecutionPolicy<N>,
96    kernel: SerialForKernelType<N>,
97) -> Result<(), DispatchError> {
98    match execp.range {
99        RangePolicy::RangePolicy(range) => {
100            // serial, 1D range
101            if N != 1 {
102                return Err(DispatchError::Serial(
103                    "Dispatch uses N>1 for a 1D RangePolicy",
104                ));
105            }
106            range.into_iter().map(KernelArgs::Index1D).for_each(kernel)
107        }
108        RangePolicy::MDRangePolicy(ranges) => {
109            // Kokkos does tiling to handle a MDRanges, in the case of serial
110            // execution, we simply do the nested loop
111            recursive_loop(&ranges, kernel) // macros would pbly be more efficient
112        }
113        RangePolicy::TeamPolicy {
114            league_size: _, // number of teams; akin to # of work items/batches
115            team_size: _,   // number of threads per team; ignored in serial dispatch
116            vector_size: _, // possible third dim parallelism; ignored in serial dispatch?
117        } => {
118            // interpret # of teams as # of work items (chunks);
119            // necessary because serial dispatch is the fallback implementation
120            // we ignore team size & vector size? since there's no parallelism here
121
122            // is it even possible to use chunks? It would require either:
123            //  - awareness of used external data
124            //  - owning the used data; maybe in the TeamPolicy struct
125            // 2nd option is the more plausible but it creates issues when accessing
126            // multiple views for example; It also seems incompatible with the paradigm
127
128            // -> build a team handle & let the user write its kernel using it
129            todo!()
130        }
131        RangePolicy::PerTeam => {
132            // used inside a team dispatch
133            // executes the kernel once per team
134            todo!()
135        }
136        RangePolicy::PerThread => {
137            // used inside a team dispatch
138            // executes the kernel once per threads of the team
139            todo!()
140        }
141        RangePolicy::TeamThreadRange => {
142            // same as RangePolicy but inside a team
143            todo!()
144        }
145        RangePolicy::TeamThreadMDRange => {
146            // same as MDRangePolicy but inside a team
147            todo!()
148        }
149        RangePolicy::TeamVectorRange => todo!(),
150        RangePolicy::TeamVectorMDRange => todo!(),
151        RangePolicy::ThreadVectorRange => todo!(),
152        RangePolicy::ThreadVectorMDRange => todo!(),
153    };
154    Ok(())
155}
156
157cfg_if::cfg_if! {
158    if #[cfg(feature = "threads")] {
159        /// CPU dispatch routine of `for` statements. Implementation depends on enabled feature(s).
160        ///
161        /// The dispatch function execute the kernel accordingly to the directives contained in the
162        /// execution policy. The kernel signature varies according to enabled features.
163        ///
164        /// ### Possible Kernel Signatures
165        ///
166        /// - `rayon` feature enabled: [`ForKernelType`]
167        /// - `threads` feature enabled: `Box<impl Fn(KernelArgs<N>) + Send + Sync + 'a + Clone>`
168        /// - no feature enabled: fall back to [`SerialForKernelType`]
169        ///
170        /// The `threads` implementation cannot currently use the generic [`ForKernelType`] because
171        /// of the Clone requirement.
172        ///
173        /// **Current version**: `threads`
174        pub fn cpu<'a, const N: usize>(
175            execp: ExecutionPolicy<N>,
176            kernel: Box<impl Fn(KernelArgs<N>) + Send + Sync + 'a + Clone>, // cannot be replaced by functor type bc of Clone
177        ) -> Result<(), DispatchError> {
178            match execp.range {
179                RangePolicy::RangePolicy(range) => {
180                    // serial, 1D range
181                    if N != 1 {
182                        return Err(DispatchError::Serial(
183                            "Dispatch uses N>1 for a 1D RangePolicy",
184                        ));
185                    }
186                    // compute chunk_size so that there is 1 chunk per thread
187                    let chunk_size = range.len() / num_cpus::get() + 1;
188                    let indices = range.collect::<Vec<usize>>();
189                    // use scope to avoid 'static lifetime reqs
190                    std::thread::scope(|s| {
191                        let handles: Vec<_> = indices.chunks(chunk_size).map(|chunk| {
192                            s.spawn(|| chunk.iter().map(|idx_ref| KernelArgs::Index1D(*idx_ref)).for_each(kernel.clone()))
193                        }).collect();
194
195                        for handle in handles {
196                            handle.join().unwrap();
197                        }
198                    });
199                }
200                RangePolicy::MDRangePolicy(_) => {
201                    // Kokkos does tiling to handle a MDRanges
202                    unimplemented!()
203                }
204                RangePolicy::TeamPolicy {
205                    league_size: _, // number of teams; akin to # of work items/batches
206                    team_size: _,   // number of threads per team; ignored in serial dispatch
207                    vector_size: _, // possible third dim parallelism; ignored in serial dispatch?
208                } => {
209                    // interpret # of teams as # of work items (chunks);
210                    // necessary because serial dispatch is the fallback implementation
211                    // we ignore team size & vector size? since there's no parallelism here
212
213                    // is it even possible to use chunks? It would require either:
214                    //  - awareness of used external data
215                    //  - owning the used data; maybe in the TeamPolicy struct
216                    // 2nd option is the more plausible but it creates issues when accessing
217                    // multiple views for example; It also seems incompatible with the paradigm
218
219                    // -> build a team handle & let the user write its kernel using it
220                    todo!()
221                }
222                RangePolicy::PerTeam => {
223                    // used inside a team dispatch
224                    // executes the kernel once per team
225                    todo!()
226                }
227                RangePolicy::PerThread => {
228                    // used inside a team dispatch
229                    // executes the kernel once per threads of the team
230                    todo!()
231                }
232                _ => todo!(),
233            };
234            Ok(())
235        }
236    } else if #[cfg(feature = "rayon")] {
237        /// CPU dispatch routine of `for` statements. Implementation depends on enabled feature(s).
238        ///
239        /// The dispatch function execute the kernel accordingly to the directives contained in the
240        /// execution policy. The kernel signature varies according to enabled features.
241        ///
242        /// ### Possible Kernel Signatures
243        ///
244        /// - `rayon` feature enabled: [`ForKernelType`]
245        /// - `threads` feature enabled: `Box<impl Fn(KernelArgs<N>) + Send + Sync + 'a + Clone>`
246        /// - no feature enabled: fall back to [`SerialForKernelType`]
247        ///
248        /// The `threads` implementation cannot currently use the generic [`ForKernelType`] because
249        /// of the Clone requirement.
250        ///
251        /// **Current version**: `rayon`
252        pub fn cpu<const N: usize>(
253            execp: ExecutionPolicy<N>,
254            kernel: ForKernelType<N>,
255        ) -> Result<(), DispatchError> {
256            match execp.range {
257                RangePolicy::RangePolicy(range) => {
258                    // serial, 1D range
259                    if N != 1 {
260                        return Err(DispatchError::Serial(
261                            "Dispatch uses N>1 for a 1D RangePolicy",
262                        ));
263                    }
264                    // making indices N-sized arrays is necessary, even with the assertion...
265                    range
266                        .into_par_iter()
267                        .map(KernelArgs::Index1D)
268                        .for_each(kernel)
269                }
270                RangePolicy::MDRangePolicy(_) => {
271                    // Kokkos does tiling to handle a MDRanges
272                    unimplemented!()
273                }
274                RangePolicy::TeamPolicy {
275                    league_size: _, // number of teams; akin to # of work items/batches
276                    team_size: _,   // number of threads per team; ignored in serial dispatch
277                    vector_size: _, // possible third dim parallelism; ignored in serial dispatch?
278                } => {
279                    // interpret # of teams as # of work items (chunks);
280                    // necessary because serial dispatch is the fallback implementation
281                    // we ignore team size & vector size? since there's no parallelism here
282
283                    // is it even possible to use chunks? It would require either:
284                    //  - awareness of used external data
285                    //  - owning the used data; maybe in the TeamPolicy struct
286                    // 2nd option is the more plausible but it creates issues when accessing
287                    // multiple views for example; It also seems incompatible with the paradigm
288
289                    // -> build a team handle & let the user write its kernel using it
290                    todo!()
291                }
292                RangePolicy::PerTeam => {
293                    // used inside a team dispatch
294                    // executes the kernel once per team
295                    todo!()
296                }
297                RangePolicy::PerThread => {
298                    // used inside a team dispatch
299                    // executes the kernel once per threads of the team
300                    todo!()
301                }
302                _ => todo!(),
303            };
304            Ok(())
305        }
306    } else {
307        /// CPU dispatch routine of `for` statements. Implementation depends on enabled feature(s).
308        ///
309        /// The dispatch function execute the kernel accordingly to the directives contained in the
310        /// execution policy. The kernel signature varies according to enabled features.
311        ///
312        /// ### Possible Kernel Signatures
313        ///
314        /// - `rayon` feature enabled: [`ForKernelType`]
315        /// - `threads` feature enabled: `Box<impl Fn(KernelArgs<N>) + Send + Sync + 'a + Clone>`
316        /// - no feature enabled: fall back to [`SerialForKernelType`]
317        ///
318        /// The `threads` implementation cannot currently use the generic [`ForKernelType`] because
319        /// of the Clone requirement.
320        ///
321        /// **Current version**: no feature
322        pub fn cpu<const N: usize>(
323            execp: ExecutionPolicy<N>,
324            kernel: SerialForKernelType<N>,
325        ) -> Result<(), DispatchError> {
326            serial(execp, kernel)
327        }
328    }
329}
330
331cfg_if::cfg_if! {
332    if #[cfg(feature = "gpu")] {
333        /// GPU Dispatch routine of `for` statements. UNIMPLEMENTED
334        pub fn gpu<'a, const N: usize>(
335            _execp: ExecutionPolicy<N>,
336            _kernel: ForKernelType<N>,
337        ) -> Result<(), DispatchError> {
338            unimplemented!()
339        }
340    } else {
341        /// GPU Dispatch routine of `for` statements. UNIMPLEMENTED
342        pub fn gpu<const N: usize>(
343            execp: ExecutionPolicy<N>,
344            kernel: SerialForKernelType<N>,
345        ) -> Result<(), DispatchError> {
346            serial(execp, kernel)
347        }
348    }
349}
350
351// ~~~~~~
352// Tests
353
354mod tests {
355    #[test]
356    fn simple_range() {
357        use super::*;
358        use crate::{
359            routines::parameters::{ExecutionSpace, Schedule},
360            view::{parameters::Layout, ViewOwned},
361        };
362        // fixes warnings when testing using a parallel feature
363        cfg_if::cfg_if! {
364            if #[cfg(any(feature = "threads", feature = "rayon", feature = "gpu"))] {
365                let mat = ViewOwned::new_from_data(vec![0.0; 15], Layout::Right, [15]);
366            } else {
367                let mut mat = ViewOwned::new_from_data(vec![0.0; 15], Layout::Right, [15]);
368            }
369        }
370        let ref_mat = ViewOwned::new_from_data(vec![1.0; 15], Layout::Right, [15]);
371        let rangep = RangePolicy::RangePolicy(0..15);
372        let execp = ExecutionPolicy {
373            space: ExecutionSpace::DeviceCPU,
374            range: rangep,
375            schedule: Schedule::default(),
376        };
377
378        // very messy way to write a kernel but it should work for now
379        let kernel = Box::new(|arg: KernelArgs<1>| match arg {
380            KernelArgs::Index1D(i) => mat.set([i], 1.0),
381            KernelArgs::IndexND(_) => unimplemented!(),
382            KernelArgs::Handle => unimplemented!(),
383        });
384
385        serial(execp, kernel).unwrap();
386
387        assert_eq!(mat.raw_val().unwrap(), ref_mat.raw_val().unwrap());
388    }
389
390    #[test]
391    fn simple_mdrange() {
392        use super::*;
393        use crate::{
394            routines::parameters::{ExecutionSpace, Schedule},
395            view::{parameters::Layout, ViewOwned},
396        };
397        // fixes warnings when testing using a parallel feature
398        cfg_if::cfg_if! {
399            if #[cfg(any(feature = "threads", feature = "rayon", feature = "gpu"))] {
400                let mat = ViewOwned::new_from_data(vec![0.0; 150], Layout::Right, [10, 15]);
401            } else {
402                let mut mat = ViewOwned::new_from_data(vec![0.0; 150], Layout::Right, [10, 15]);
403            }
404        }
405        let ref_mat = ViewOwned::new_from_data(vec![1.0; 150], Layout::Right, [10, 15]);
406        let rangep = RangePolicy::MDRangePolicy([0..10, 0..15]);
407        let execp = ExecutionPolicy {
408            space: ExecutionSpace::DeviceCPU,
409            range: rangep,
410            schedule: Schedule::default(),
411        };
412
413        // very messy way to write a kernel but it should work for now
414        let kernel = Box::new(|arg: KernelArgs<2>| match arg {
415            KernelArgs::Index1D(_) => unimplemented!(),
416            KernelArgs::IndexND([i, j]) => mat.set([i, j], 1.0),
417            KernelArgs::Handle => unimplemented!(),
418        });
419
420        serial(execp, kernel).unwrap();
421
422        assert_eq!(mat.raw_val().unwrap(), ref_mat.raw_val().unwrap());
423    }
424
425    #[test]
426    fn dim1_mdrange() {
427        use super::*;
428        use crate::{
429            routines::parameters::{ExecutionSpace, Schedule},
430            view::{parameters::Layout, ViewOwned},
431        };
432
433        // fixes warnings when testing using a parallel feature
434        cfg_if::cfg_if! {
435            if #[cfg(any(feature = "threads", feature = "rayon", feature = "gpu"))] {
436                let mat = ViewOwned::new_from_data(vec![0.0; 15], Layout::Right, [15]);
437            } else {
438                let mut mat = ViewOwned::new_from_data(vec![0.0; 15], Layout::Right, [15]);
439            }
440        }
441        let ref_mat = ViewOwned::new_from_data(vec![1.0; 15], Layout::Right, [15]);
442        #[allow(clippy::single_range_in_vec_init)]
443        let rangep = RangePolicy::MDRangePolicy([0..15]);
444        let execp = ExecutionPolicy {
445            space: ExecutionSpace::DeviceCPU,
446            range: rangep,
447            schedule: Schedule::default(),
448        };
449
450        // very messy way to write a kernel but it should work for now
451        let kernel = Box::new(|arg: KernelArgs<1>| match arg {
452            KernelArgs::Index1D(_) => unimplemented!(),
453            KernelArgs::IndexND(idx) => mat.set(idx, 1.0),
454            KernelArgs::Handle => unimplemented!(),
455        });
456
457        serial(execp, kernel).unwrap();
458        assert_eq!(mat.raw_val().unwrap(), ref_mat.raw_val().unwrap());
459    }
460}