When Ferrous Metals Corrode, pt. XVIII

Intro

This post summarizes chapter 19, "Concurrency". The concurrency chapter introduces has this nice bit at the beginning: "Experience inculcates a healthy skepticism, if not outright cynicism, toward all multithreaded code". Yes, yes it does.

All the safety features Rust brings with it should come to good use here.

Fork-Join Parallelism

Fork-Join uses threads to perform individual tasks, and later block until all threads can be joined together. The premise here is that those tasks are independent and there is basically no data flow between threads. As there are no shared resources, those can't bottleneck.

spawn and join

Spawning a thread is done with std::thread. Example, process a number of files in parallel:


use std::{thread, io};

fn process_files_in_parallel(filenames: Vec<String>) -> io::Result<()> {
    // Divide the work into several chunks.
    const NTHREADS: usize = 8;
    let worklists = split_vec_into_chunks(filenames, NTHREADS);

    // Fork: Spawn a thread to handle each chunk.
    // thread_handles stores handles for threads so we can join them later
    let mut thread_handles = vec![];
    for worklist in worklists {
        thread_handles.push(
            // actually spawning the thread with a move closure
            // moving the worklist is cheap, only the vec (3 words) is moved,
            // not the actual worklist data
            thread::spawn(move || process_files(worklist))
        );
    }

    // Join: Wait for all threads to finish.
    for handle in thread_handles {
        handle.join().unwrap()?;
    }

    Ok(())
}

Error Handling Across Threads

Above we had threads joined with:

handle.join().unwrap()?;

The join() returns Results which have Err if the thread panicked. Note a panicking thread != panicking main, it's isolated. Instead of unwrapping we could also do more elaborate error handling here, e.g. logging and maybe retrying a task.

Ok results will contain the nested Result from the process_files() func. This is a good example of the benefits of having a common data / error handling code path.

Sharing Immutable Data Across Threads

When sharing refs into a thread we're running into lifetime problems – we don't know how long the thread will run; e.g. the main thread could hit an i/o error and exit, and dropping the referenced value. Thats why we used the move closure above.

If we want to share some common data across threads we'd need to clone it, but this can get expensive.

The way to deal with this is via Arc, atomic ref counting.

Example snippet:

use std::sync::Arc;

fn process_files_in_parallel(filenames: Vec<String>,
                             glossary: Arc<GigabyteMap>)
    -> io::Result<()>
{
    ...
    // glossary now is a smart pointer with data living on the heap
    // note data in an Arc is immutable
    for worklist in worklists {
        // This call to .clone() only clones the Arc and bumps the
        // reference count (which are cheap operations)
        // 
        // It does not clone the underlying data in GigabyteMap 
        let glossary_for_child = glossary.clone();
        thread_handles.push(
            spawn(move || process_files(worklist, &glossary_for_child))
        );
    }
    ...
}

Rayon

The rayon crate helps with fork-join. It provides two ways of running:

use rayon::prelude::*;

// "do 2 things in parallel"
let (v1, v2) = rayon::join(fn1, fn2);

// "do N things in parallel"
giant_vector.par_iter().for_each(|value| {
    do_thing_with_value(value);
});

The latter spreads tasks out over a thread pool, conceptually parallelizing all tasks.

Revisiting the example from above:

use rayon::prelude::*;

fn process_files_in_parallel(filenames: Vec<String>, glossary: &GigabyteMap)
    -> io::Result<()>
{
    // iterate over file names
    filenames.par_iter()
        // pass in filename via closure. Note how rayon supports
        // passing in refs too i.e. the glossary ref
        .map(|filename| process_file(filename, glossary))
        // combine results. This here keeps the first error; could also accumulate
        // all errors instead
        .reduce_with(|r1, r2| {
            if r1.is_err() { r1 } else { r2 }
        })
        // reduce_with only returns None if filenames were empty
        .unwrap_or(Ok(()))
}

Tasks will be spread out over worker pool threads (by default, one per core) via work stealing.

Revisiting the Mandelbrot Set

Quick example of writing the mandelbrot program with rayon

Channels

Channels are typed queues that move data between threads. They're not part of the core language but are implemented in std::sync::mpsc

Sending Values

Example snippet:

use std::{fs, thread};
use std::sync::mpsc;

// sender and receiver are typed, type is inferred
// from below
let (sender, receiver) = mpsc::channel();

// spawn a thread, move sender into closure
let handle = thread::spawn(move || {
    for filename in documents {
        let text = fs::read_to_string(filename)?;
        // send String into chan, check if there was an error
        if sender.send(text).is_err() {
            // other end hung up on is, quietly exit
            break;
        }
    }
    Ok(())
});

Note the channel transports only 3 machine words for the text var per send.

Sender and Receiver both return Results, but those Error only if the other end has been dropped. Dropping your end of a chan is the idiomatic way of signalling EOT.

Alternatively, instead of passing text in the chan we could also have arranged for Results to be passed; or create a second chan for passing errors.

Receiving Values

Continuing the example above, an example snippet of driving a receiver.

while let Ok(text) = receiver.recv() {
    do_something_with(text);
}

Or, as Receivers are iterable, reading text could just be:

for text in receiver {
    do_something_with(text);
}

In both cases, the receiver blocks until some data is available, at which point it will start at the top of the loop. The loop will only be done if the sender is dropped.

Channel Features and Performance

mpsc stands for multiproducer, single-consumer. Any Sender from mpsc can be cloned, and each clone can send data into the chan.

Interestingly, when a channel is first created it will be created with a lightweight "one-shot" implementation, and will switch implementations if a second value is sent along; also, if the Sender is cloned it'll switch implementation yet again for thread-safety of multiple senders.

Backpressure: the channel() func will created an unbounded chan. To create a bounded channel i.e. one that has a defined max. queue length, use the sync_channel() func. For those sender.send() will block once the max is hit.

There's also an external crate crossbeam-channel that supports multi-producer, multi-consumer channels.

Thread Safety: Send and Sync

Rust has two traits to mark values as safe to pass between threads, Send and Sync

Send

safe to pass by value (i.e.: moved) to another thread

Sync

safe to pass by shared ref to another thread (implies Send)

Most types are both Send and Sync; a user-defined struct or enum will be Send/Sync if its fields are Send/Sync.

Some exceptions:

mpsc::Receiver

Send, but not Sync. Only one thread may use it at a time (but can be moved between threads)

Cell

also Send, but not Sync

std::rc::Rc<T>

the ref-counting smart pointer is neither Send nor Sync

Shared Mutable State

Mutex<T>

How to set up a mutex:

use std::sync::Mutex;

/// All threads have shared access to this big context struct.
struct FernEmpireApp {
    ...
    waiting_list: Mutex<WaitingList>,
    ...
}


use std::sync::Arc;

// set up whole app on the heap via Arc
let app = Arc::new(FernEmpireApp {
    ...
    waiting_list: Mutex::new(vec![]),
    ...
});

Mutex locking/unlocking:

impl FernEmpireApp {
    /// Add a player to the waiting list for the next game.
    /// Start a new game immediately if enough players are waiting.
    fn join_waiting_list(&self, player: PlayerId) {
        // Lock the mutex and gain access to the data inside.
        // The scope of `guard` is a critical section.
        let mut guard = self.waiting_list.lock().unwrap();

        // Now do the game logic.
        guard.push(player);
        if guard.len() == GAME_SIZE {
            let players = guard.split_off(0);
            self.start_game(players);
        }
    }
}

Access to data goes via the guard. Once the guard is dropped, the mutex locks again. To drop explicitly do drop(guard)

mut and Mutex

Note how in the example above self was passed via shared ref – no mut required, even though the waiting_list was modified. That's the point of the mutex, provide mutable access exclusively to one thread while all threads have shared access.

In other words, Mutex provides interior mutability similar to what RefCell did (though the latter doesn't support multithreading).

Why Mutexes Are Not Always a Good Idea

Rusts Mutexes prevent data races, however there's other issues they cannot prevent, e.g.:

  • non-data races

  • encourage bad encapsulation

Deadlock

Borrow system doesn't prevent deadlocks. While channels also can deadlock is typically unlikely, e.g. in a pipeline design.

Poisoned Mutexes

If a thread panics when holding a mutex, that mutex will be marked poisoned by Rust; all attempts at locking will return an error. The rationale is that a panicking thread probably didn't finish cleanly and other threads shouldn't just stumble in. It's possible to override this with PoisonError::into_inner()

Multiconsumer Channels Using Mutexes

As an example here's a mpsc receiver that is threadsafe, i.e. can be shared among several threads. Comments inline.

use std::sync::{Arc, Mutex};
use std::sync::mpsc::{channel, Sender, Receiver};

/// A thread-safe wrapper around a `Receiver`.
// From outer to inner type:
//   Arc: heap-alloc, atomically refcounted container
//       Mutex: for controlling access
//           Receiver: channel recv
#[derive(Clone)]
pub struct SharedReceiver<T>(Arc<Mutex<Receiver<T>>>);

// implement the iterator trait for some type receiver over T
impl<T> Iterator for SharedReceiver<T> {
    type Item = T;

    /// Get the next item from the wrapped receiver.
    fn next(&mut self) -> Option<T> {
        // first field is arc which is being auto-deref'ed
        let guard = self.0.lock().unwrap();
        // return val from receiver as an Option
        guard.recv().ok()
    }
}

/// Create a new channel whose receiver can be shared across threads.
/// This returns a sender and a receiver, just like the stdlib's
/// `channel()`, and sometimes works as a drop-in replacement.
pub fn shared_channel<T>() -> (Sender<T>, SharedReceiver<T>) {
    let (sender, receiver) = channel();
    (sender, SharedReceiver(Arc::new(Mutex::new(receiver))))
}

Read/Write Locks (RwLock<T>)

A RwLock (many readers, xor one exclusive writer):

use std::sync::RwLock;

struct FernEmpireApp {
    ...
    config: RwLock<AppConfig>,
    ...
}
...

/// True if experimental fungus code should be used.
fn mushrooms_enabled(&self) -> bool {
    let config_guard = self.config.read().unwrap();
    config_guard.mushrooms_enabled
}

...

fn reload_config(&self) -> io::Result<()> {
    let new_config = AppConfig::load()?;
    let mut config_guard = self.config.write().unwrap();
    *config_guard = new_config;
    Ok(())
}

Condition Variables (Condvar)

Condvars has methods to .wait() and .notify_all(). The waits block until some other thread calls notify all.

Atomics

There are various atomic integer types and a ref type – those can be safely read/written from multiple threads without locking. The operations are exposed as loads, stores, exchanges and arithmetic operators.

Example, safely add 1 to an atomic value:

use std::sync::atomic::{AtomicIsize, Ordering};

let atom = AtomicIsize::new(0);
atom.fetch_add(1, Ordering::SeqCst);

The SeqCst argument specifies a memory ordering, like RDBMS transaction isolation levels. SeqCst is the strictest ordering "sequential consistency". Unlike SQL SERIALIZABLE the perf overhead is often quite low here.

Example usage, implement a cancel flag for some long-running operation:

use std::sync::Arc;
use std::sync::atomic::AtomicBool;

let cancel_flag = Arc::new(AtomicBool::new(false));
let worker_cancel_flag = cancel_flag.clone();


// worker code ...

use std::thread;
use std::sync::atomic::Ordering;

let worker_handle = thread::spawn(move || {
    for pixel in animation.pixels_mut() {
        render(pixel); // ray-tracing - this takes a few microseconds
        // setting cancel_flag=true in the main thread would cause us to
        // return None in the worker
        if worker_cancel_flag.load(Ordering::SeqCst) {
            return None;
        }
    }
    Some(animation)
});

One benefit of atomics is the low overhead they have. They're a form of interior mutability, like Mutex or RwLock – their methods take self as a shared ref.

Global Variables

Rust doesn't like global mutable state much, e.g. for safety, static variable must be both Sync and non-mut.

One way to implement things like a global counter is an atomic integer:

use std::sync::atomic::AtomicUsize;

static PACKETS_SERVED: AtomicUsize = AtomicUsize::new(0);

...

use std::sync::atomic::Ordering;

PACKETS_SERVED.fetch_add(1, Ordering::SeqCst);

Static initializers can only call const functions; the compiler can evaluate those during compile time. The Atomic constructors are all const.

To create a const fun, prefix it with const fn .... Const funs are restricted in what they can do: they can't take types as generic args, they can't alloc mem or access raw pointers. We can do arithemtic, assignments and call other const funs though. Unfort. Mutex::new() is not a const fun though. One way around this is the lazy_static create; we used this in pt. XVI to create a static RE that got compiled on first use.

Example of creating a global mutex-controlled string:

use lazy_static::lazy_static;

use std::sync::Mutex;

lazy_static! {
    static ref HOSTNAME: Mutex<String> = Mutex::new(String::new());
}

Similarly for other types like hash maps, lists etc.

Coda

The safety features of Rust become extra-important when writing concurrent code. This is where all the fuss about ownership and mutability pays off.

I'm still a big fan of Erlangs message-passing actor model for it's simplicity, but perf can suffer there. Rust provides a more diverse tool-set that's more complicated to drive which potentially avoids some perf bottlenecks, while not compromising on safety. Arguing with the compiler is vastly preferable to debugging data races imho.