poc_kokkos_rs/routines/dispatch.rs
1//! kernel dispatch code
2//!
3//! This module contains all code used to dispatch computational kernels
4//! onto specified devices. Note that the documentation is feature-specific when the
5//! items are, i.e. documentation is altered by enabled features.
6//!
7//! The methods desccribed in this module are not meant to be used directly, they are only
8//! building blocks for the parallel statements.
9
10#[cfg(any(doc, feature = "rayon", feature = "gpu"))]
11use crate::functor::ForKernelType;
12
13#[cfg(feature = "rayon")]
14use rayon::prelude::*;
15
16use std::{fmt::Display, ops::Range};
17
18use super::parameters::{ExecutionPolicy, RangePolicy};
19use crate::functor::{KernelArgs, SerialForKernelType};
20
21// enums
22
23/// Enum used to classify possible dispatch errors.
24///
25/// In all variants, the internal value is a description of the error.
26#[derive(Debug)]
27pub enum DispatchError {
28 /// Error occured during serial dispatch.
29 Serial(&'static str),
30 /// Error occured during parallel CPU dispatch.
31 CPU(&'static str),
32 /// Error occured during GPU dispatch.
33 GPU(&'static str),
34}
35
36impl Display for DispatchError {
37 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
38 match self {
39 DispatchError::Serial(desc) => write!(f, "error during serial dispatch: {desc}"),
40 DispatchError::CPU(desc) => write!(f, "error during cpu dispatch: {desc}"),
41 DispatchError::GPU(desc) => write!(f, "error during gpu dispatch: {desc}"),
42 }
43 }
44}
45
46impl std::error::Error for DispatchError {
47 // may be useful in case of an error coming from an std call
48 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
49 None
50 }
51}
52
53// dispatch routines
54
55// internal routines
56
57/// Builds a N-depth nested loop executing a kernel using the N resulting indices.
58/// Technically, this should be replaced by a tiling function, for both serial and parallel
59/// implementations.
60fn recursive_loop<const N: usize>(ranges: &[Range<usize>; N], mut kernel: SerialForKernelType<N>) {
61 // handles recursions
62 fn inner<const N: usize>(
63 current_depth: usize,
64 ranges: &[Range<usize>; N],
65 kernel: &mut SerialForKernelType<N>,
66 indices: &mut [usize; N],
67 ) {
68 if current_depth == N {
69 // all loops unraveled
70 // call the kernel
71 kernel(KernelArgs::IndexND(*indices))
72 } else {
73 // loop on next dimension; update indices
74 // can we avoid a clone by passing a slice starting one element
75 // after the unraveled range ?
76 ranges[current_depth].clone().for_each(|i_current| {
77 indices[current_depth] = i_current;
78 inner(current_depth + 1, ranges, kernel, indices);
79 });
80 }
81 }
82
83 let mut indices = [0; N];
84 inner(0, ranges, &mut kernel, &mut indices);
85}
86
87// serial dispatch
88
89/// CPU dispatch routine of `for` statements. Does not depend on enabled feature(s).
90///
91/// The dispatch function execute the kernel accordingly to the directives contained in the
92/// execution policy. The kernel signature does not vary according to enabled features as this
93/// is the invariant fallback dispatch routine.
94pub fn serial<const N: usize>(
95 execp: ExecutionPolicy<N>,
96 kernel: SerialForKernelType<N>,
97) -> Result<(), DispatchError> {
98 match execp.range {
99 RangePolicy::RangePolicy(range) => {
100 // serial, 1D range
101 if N != 1 {
102 return Err(DispatchError::Serial(
103 "Dispatch uses N>1 for a 1D RangePolicy",
104 ));
105 }
106 range.into_iter().map(KernelArgs::Index1D).for_each(kernel)
107 }
108 RangePolicy::MDRangePolicy(ranges) => {
109 // Kokkos does tiling to handle a MDRanges, in the case of serial
110 // execution, we simply do the nested loop
111 recursive_loop(&ranges, kernel) // macros would pbly be more efficient
112 }
113 RangePolicy::TeamPolicy {
114 league_size: _, // number of teams; akin to # of work items/batches
115 team_size: _, // number of threads per team; ignored in serial dispatch
116 vector_size: _, // possible third dim parallelism; ignored in serial dispatch?
117 } => {
118 // interpret # of teams as # of work items (chunks);
119 // necessary because serial dispatch is the fallback implementation
120 // we ignore team size & vector size? since there's no parallelism here
121
122 // is it even possible to use chunks? It would require either:
123 // - awareness of used external data
124 // - owning the used data; maybe in the TeamPolicy struct
125 // 2nd option is the more plausible but it creates issues when accessing
126 // multiple views for example; It also seems incompatible with the paradigm
127
128 // -> build a team handle & let the user write its kernel using it
129 todo!()
130 }
131 RangePolicy::PerTeam => {
132 // used inside a team dispatch
133 // executes the kernel once per team
134 todo!()
135 }
136 RangePolicy::PerThread => {
137 // used inside a team dispatch
138 // executes the kernel once per threads of the team
139 todo!()
140 }
141 RangePolicy::TeamThreadRange => {
142 // same as RangePolicy but inside a team
143 todo!()
144 }
145 RangePolicy::TeamThreadMDRange => {
146 // same as MDRangePolicy but inside a team
147 todo!()
148 }
149 RangePolicy::TeamVectorRange => todo!(),
150 RangePolicy::TeamVectorMDRange => todo!(),
151 RangePolicy::ThreadVectorRange => todo!(),
152 RangePolicy::ThreadVectorMDRange => todo!(),
153 };
154 Ok(())
155}
156
157cfg_if::cfg_if! {
158 if #[cfg(feature = "threads")] {
159 /// CPU dispatch routine of `for` statements. Implementation depends on enabled feature(s).
160 ///
161 /// The dispatch function execute the kernel accordingly to the directives contained in the
162 /// execution policy. The kernel signature varies according to enabled features.
163 ///
164 /// ### Possible Kernel Signatures
165 ///
166 /// - `rayon` feature enabled: [`ForKernelType`]
167 /// - `threads` feature enabled: `Box<impl Fn(KernelArgs<N>) + Send + Sync + 'a + Clone>`
168 /// - no feature enabled: fall back to [`SerialForKernelType`]
169 ///
170 /// The `threads` implementation cannot currently use the generic [`ForKernelType`] because
171 /// of the Clone requirement.
172 ///
173 /// **Current version**: `threads`
174 pub fn cpu<'a, const N: usize>(
175 execp: ExecutionPolicy<N>,
176 kernel: Box<impl Fn(KernelArgs<N>) + Send + Sync + 'a + Clone>, // cannot be replaced by functor type bc of Clone
177 ) -> Result<(), DispatchError> {
178 match execp.range {
179 RangePolicy::RangePolicy(range) => {
180 // serial, 1D range
181 if N != 1 {
182 return Err(DispatchError::Serial(
183 "Dispatch uses N>1 for a 1D RangePolicy",
184 ));
185 }
186 // compute chunk_size so that there is 1 chunk per thread
187 let chunk_size = range.len() / num_cpus::get() + 1;
188 let indices = range.collect::<Vec<usize>>();
189 // use scope to avoid 'static lifetime reqs
190 std::thread::scope(|s| {
191 let handles: Vec<_> = indices.chunks(chunk_size).map(|chunk| {
192 s.spawn(|| chunk.iter().map(|idx_ref| KernelArgs::Index1D(*idx_ref)).for_each(kernel.clone()))
193 }).collect();
194
195 for handle in handles {
196 handle.join().unwrap();
197 }
198 });
199 }
200 RangePolicy::MDRangePolicy(_) => {
201 // Kokkos does tiling to handle a MDRanges
202 unimplemented!()
203 }
204 RangePolicy::TeamPolicy {
205 league_size: _, // number of teams; akin to # of work items/batches
206 team_size: _, // number of threads per team; ignored in serial dispatch
207 vector_size: _, // possible third dim parallelism; ignored in serial dispatch?
208 } => {
209 // interpret # of teams as # of work items (chunks);
210 // necessary because serial dispatch is the fallback implementation
211 // we ignore team size & vector size? since there's no parallelism here
212
213 // is it even possible to use chunks? It would require either:
214 // - awareness of used external data
215 // - owning the used data; maybe in the TeamPolicy struct
216 // 2nd option is the more plausible but it creates issues when accessing
217 // multiple views for example; It also seems incompatible with the paradigm
218
219 // -> build a team handle & let the user write its kernel using it
220 todo!()
221 }
222 RangePolicy::PerTeam => {
223 // used inside a team dispatch
224 // executes the kernel once per team
225 todo!()
226 }
227 RangePolicy::PerThread => {
228 // used inside a team dispatch
229 // executes the kernel once per threads of the team
230 todo!()
231 }
232 _ => todo!(),
233 };
234 Ok(())
235 }
236 } else if #[cfg(feature = "rayon")] {
237 /// CPU dispatch routine of `for` statements. Implementation depends on enabled feature(s).
238 ///
239 /// The dispatch function execute the kernel accordingly to the directives contained in the
240 /// execution policy. The kernel signature varies according to enabled features.
241 ///
242 /// ### Possible Kernel Signatures
243 ///
244 /// - `rayon` feature enabled: [`ForKernelType`]
245 /// - `threads` feature enabled: `Box<impl Fn(KernelArgs<N>) + Send + Sync + 'a + Clone>`
246 /// - no feature enabled: fall back to [`SerialForKernelType`]
247 ///
248 /// The `threads` implementation cannot currently use the generic [`ForKernelType`] because
249 /// of the Clone requirement.
250 ///
251 /// **Current version**: `rayon`
252 pub fn cpu<const N: usize>(
253 execp: ExecutionPolicy<N>,
254 kernel: ForKernelType<N>,
255 ) -> Result<(), DispatchError> {
256 match execp.range {
257 RangePolicy::RangePolicy(range) => {
258 // serial, 1D range
259 if N != 1 {
260 return Err(DispatchError::Serial(
261 "Dispatch uses N>1 for a 1D RangePolicy",
262 ));
263 }
264 // making indices N-sized arrays is necessary, even with the assertion...
265 range
266 .into_par_iter()
267 .map(KernelArgs::Index1D)
268 .for_each(kernel)
269 }
270 RangePolicy::MDRangePolicy(_) => {
271 // Kokkos does tiling to handle a MDRanges
272 unimplemented!()
273 }
274 RangePolicy::TeamPolicy {
275 league_size: _, // number of teams; akin to # of work items/batches
276 team_size: _, // number of threads per team; ignored in serial dispatch
277 vector_size: _, // possible third dim parallelism; ignored in serial dispatch?
278 } => {
279 // interpret # of teams as # of work items (chunks);
280 // necessary because serial dispatch is the fallback implementation
281 // we ignore team size & vector size? since there's no parallelism here
282
283 // is it even possible to use chunks? It would require either:
284 // - awareness of used external data
285 // - owning the used data; maybe in the TeamPolicy struct
286 // 2nd option is the more plausible but it creates issues when accessing
287 // multiple views for example; It also seems incompatible with the paradigm
288
289 // -> build a team handle & let the user write its kernel using it
290 todo!()
291 }
292 RangePolicy::PerTeam => {
293 // used inside a team dispatch
294 // executes the kernel once per team
295 todo!()
296 }
297 RangePolicy::PerThread => {
298 // used inside a team dispatch
299 // executes the kernel once per threads of the team
300 todo!()
301 }
302 _ => todo!(),
303 };
304 Ok(())
305 }
306 } else {
307 /// CPU dispatch routine of `for` statements. Implementation depends on enabled feature(s).
308 ///
309 /// The dispatch function execute the kernel accordingly to the directives contained in the
310 /// execution policy. The kernel signature varies according to enabled features.
311 ///
312 /// ### Possible Kernel Signatures
313 ///
314 /// - `rayon` feature enabled: [`ForKernelType`]
315 /// - `threads` feature enabled: `Box<impl Fn(KernelArgs<N>) + Send + Sync + 'a + Clone>`
316 /// - no feature enabled: fall back to [`SerialForKernelType`]
317 ///
318 /// The `threads` implementation cannot currently use the generic [`ForKernelType`] because
319 /// of the Clone requirement.
320 ///
321 /// **Current version**: no feature
322 pub fn cpu<const N: usize>(
323 execp: ExecutionPolicy<N>,
324 kernel: SerialForKernelType<N>,
325 ) -> Result<(), DispatchError> {
326 serial(execp, kernel)
327 }
328 }
329}
330
331cfg_if::cfg_if! {
332 if #[cfg(feature = "gpu")] {
333 /// GPU Dispatch routine of `for` statements. UNIMPLEMENTED
334 pub fn gpu<'a, const N: usize>(
335 _execp: ExecutionPolicy<N>,
336 _kernel: ForKernelType<N>,
337 ) -> Result<(), DispatchError> {
338 unimplemented!()
339 }
340 } else {
341 /// GPU Dispatch routine of `for` statements. UNIMPLEMENTED
342 pub fn gpu<const N: usize>(
343 execp: ExecutionPolicy<N>,
344 kernel: SerialForKernelType<N>,
345 ) -> Result<(), DispatchError> {
346 serial(execp, kernel)
347 }
348 }
349}
350
351// ~~~~~~
352// Tests
353
354mod tests {
355 #[test]
356 fn simple_range() {
357 use super::*;
358 use crate::{
359 routines::parameters::{ExecutionSpace, Schedule},
360 view::{parameters::Layout, ViewOwned},
361 };
362 // fixes warnings when testing using a parallel feature
363 cfg_if::cfg_if! {
364 if #[cfg(any(feature = "threads", feature = "rayon", feature = "gpu"))] {
365 let mat = ViewOwned::new_from_data(vec![0.0; 15], Layout::Right, [15]);
366 } else {
367 let mut mat = ViewOwned::new_from_data(vec![0.0; 15], Layout::Right, [15]);
368 }
369 }
370 let ref_mat = ViewOwned::new_from_data(vec![1.0; 15], Layout::Right, [15]);
371 let rangep = RangePolicy::RangePolicy(0..15);
372 let execp = ExecutionPolicy {
373 space: ExecutionSpace::DeviceCPU,
374 range: rangep,
375 schedule: Schedule::default(),
376 };
377
378 // very messy way to write a kernel but it should work for now
379 let kernel = Box::new(|arg: KernelArgs<1>| match arg {
380 KernelArgs::Index1D(i) => mat.set([i], 1.0),
381 KernelArgs::IndexND(_) => unimplemented!(),
382 KernelArgs::Handle => unimplemented!(),
383 });
384
385 serial(execp, kernel).unwrap();
386
387 assert_eq!(mat.raw_val().unwrap(), ref_mat.raw_val().unwrap());
388 }
389
390 #[test]
391 fn simple_mdrange() {
392 use super::*;
393 use crate::{
394 routines::parameters::{ExecutionSpace, Schedule},
395 view::{parameters::Layout, ViewOwned},
396 };
397 // fixes warnings when testing using a parallel feature
398 cfg_if::cfg_if! {
399 if #[cfg(any(feature = "threads", feature = "rayon", feature = "gpu"))] {
400 let mat = ViewOwned::new_from_data(vec![0.0; 150], Layout::Right, [10, 15]);
401 } else {
402 let mut mat = ViewOwned::new_from_data(vec![0.0; 150], Layout::Right, [10, 15]);
403 }
404 }
405 let ref_mat = ViewOwned::new_from_data(vec![1.0; 150], Layout::Right, [10, 15]);
406 let rangep = RangePolicy::MDRangePolicy([0..10, 0..15]);
407 let execp = ExecutionPolicy {
408 space: ExecutionSpace::DeviceCPU,
409 range: rangep,
410 schedule: Schedule::default(),
411 };
412
413 // very messy way to write a kernel but it should work for now
414 let kernel = Box::new(|arg: KernelArgs<2>| match arg {
415 KernelArgs::Index1D(_) => unimplemented!(),
416 KernelArgs::IndexND([i, j]) => mat.set([i, j], 1.0),
417 KernelArgs::Handle => unimplemented!(),
418 });
419
420 serial(execp, kernel).unwrap();
421
422 assert_eq!(mat.raw_val().unwrap(), ref_mat.raw_val().unwrap());
423 }
424
425 #[test]
426 fn dim1_mdrange() {
427 use super::*;
428 use crate::{
429 routines::parameters::{ExecutionSpace, Schedule},
430 view::{parameters::Layout, ViewOwned},
431 };
432
433 // fixes warnings when testing using a parallel feature
434 cfg_if::cfg_if! {
435 if #[cfg(any(feature = "threads", feature = "rayon", feature = "gpu"))] {
436 let mat = ViewOwned::new_from_data(vec![0.0; 15], Layout::Right, [15]);
437 } else {
438 let mut mat = ViewOwned::new_from_data(vec![0.0; 15], Layout::Right, [15]);
439 }
440 }
441 let ref_mat = ViewOwned::new_from_data(vec![1.0; 15], Layout::Right, [15]);
442 #[allow(clippy::single_range_in_vec_init)]
443 let rangep = RangePolicy::MDRangePolicy([0..15]);
444 let execp = ExecutionPolicy {
445 space: ExecutionSpace::DeviceCPU,
446 range: rangep,
447 schedule: Schedule::default(),
448 };
449
450 // very messy way to write a kernel but it should work for now
451 let kernel = Box::new(|arg: KernelArgs<1>| match arg {
452 KernelArgs::Index1D(_) => unimplemented!(),
453 KernelArgs::IndexND(idx) => mat.set(idx, 1.0),
454 KernelArgs::Handle => unimplemented!(),
455 });
456
457 serial(execp, kernel).unwrap();
458 assert_eq!(mat.raw_val().unwrap(), ref_mat.raw_val().unwrap());
459 }
460}