实时音频编程:实践与技巧

Posted 芥末的无奈

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了实时音频编程:实践与技巧相关的知识,希望对你有一定的参考价值。

简介

实时音频编程(一)中,我们总结了实时音频编程几条经验法则,先来回顾下它们,在实时线程中,你不能做:

  1. 不要申请或者释放内存
  2. 不要使用锁
  3. 不要进行文件读写,或者其他方式的 I/O(这包括任何 print 或者 NSLog,或者 GUI API)
  4. 不要调用那些可能造成阻塞的系统 api
  5. 不要运行那些执行时间不确定,或者最坏时间复杂度有激增的代码
  6. 不要调用任何有上述行为的代码
  7. 不要调用任何你不信任的代码

在可能的情况下,有几件事你应该做:

  1. 用最坏时间复杂度来衡量算法,选用最坏时间复杂度友好的算法
  2. 在许多音频采样中摊销计算,以平滑CPU的使用,而不是使用偶尔有长处理时间的 "突发 "算法。
  3. 在一个非实时线程中预先分配或预先计算数据
  4. 采用非共享的、仅在音频回调中使用的数据结构,这样你就不需要考虑共享、并发和锁的问题。

接来下,将对 2019 JUCE 开发者大会上分享的 real-time 101 演讲进行总结。Fabian Renn-Giles & Dave Rowland 分享了许多实时编程的实用技巧,我们将通过一系列的问题,来确定不同场景下的解决方案。总结如下图:

Q&A

接下来,你将通过回答一些问题来确定不同场景的解决方案,每种场景都会引入一种工具。

Question 1: 你是选择传递还是共享对象?

让我们进入第一个问题:你是选择传递还是共享对象。如果你选择传递(即复制)对象到另一个线程,那么两个线程之间可以通过 FIFO 来交互。

FIFO(First Input First Output),即先进先出队列。它非常适合用于线程之间的数据传递。在实时编程中,我们常使用 ringbuffer 来实现 FIFO。ringbuffer 有固定的大小,也就意味着没有内存申请,此外它还有很多不同的类型。

就拿最简单的情况来说,即单个生产者单个消费者,它的 wait-free 版本如下:

template <typename T> class fifo{
public:
    bool push(T && arg){
        auto pos = writepos.load();
        auto next = (pos + 1) % slots.size();
        
        if(next == readpos.load())
            return false;
        
        slots[pos] = std::move(arg);
        writepos.store(next);
        return true;
    }
    
    bool pop(T& result){
        auto pos = readpos.load();
        
        if(pos == writepos.load())
            return false;
        
        result = std::move(slots[pos]);
        readpos.store((pos + 1) % slots.size());
        return true;
    }

private:
    std::vector<T> slots = {};
    std::atomic<int> readpos = {0};
    std::atomic<int> writepos = {0};
};

FIFO 有非常非常多不同的版本,应用的场景也不同,那么要如何决定选用哪种类型的 FIFO 呢?你只需要问自己两个问题:

  1. 是否会有多个线程同时向 FIFO 中读/写数据?
  2. 如果写的时候 FIFO 是满的,读的时候是空的,会发生什么?

站在生产者的角度来看,它可以是单生产者写入 FIFO,也可以是多生产者多个线程同时写入;当 FIFO 满了后,继续写入可以是强制覆盖,也可以是告知生产者写入失败。

站在消费者的角度来看,它可以是单消费者读取 FIFO 数据,也可以是多个消费者多个线程同时读取;当 FIFO 为空后,继续读取数据可以是返回 null,也可是告知消费者读取失败。

举个几个例子来说明如何进行选择。

例子1,假设你正在写一个实时显示音频波形的是程序,音频线程从麦克风获取音频,并将音频数据发送给 UI 线程。这种情况下,因为只有音频线程在生产数据,只有 UI 线程在消费数据,因此肯定采用的是单生产者-单消费者;此外,实时线程在不断的生产数据,UI 线程通常只要考虑显示最新的音频数据,因此当 FIFO 满后,直接强制覆盖没有任何影响;但 FIFO 如果为空,有可能是音频线程出现了问题,因此最好还是告知 UI 线程读取数据失败。基于以上分析,应该选择单生产者、单消费者、队列满时写入覆盖、队列空时读取失败的 FIFO。

例子2,假设你正在给一个高精度感应器完成异常值检测的功能,该仪器有多个感应器,它们同时向 FIFO 写入数据值,在消费线程中从 FIFO 中读取数据并检测是否有异常值。因此我们可以采用 多生产者-单消费者模式。由于异常值非常重要,当 FIFO 满后,你不能简单地强制覆盖,这会导致你错过异常值,因此合适的做法应该是告知写入失败。另一方面,当 FIFO 为空时,可以认为此时没有异常值,因此返回 null/0 是合适的。基于以上分析,应该选择多生产者、单消费者、队列满时写入失败、队列空时读返回 null/0 的 FIFO。

根据上面两个问题的答案,排列组合可以得到共有 16 中 FIFO,它们对读写线程 wait-free 的支持程度是不同的,具体如下表:

比如例子1,单生产者、单消费者、队列满时写入覆盖、队列空时读取失败的 FIFO,读写两端都支持 wait-free;而例子2,多生产者、单消费者、队列满时写入失败、队列空时读返回 null/0 的 FIFO,只在读线程是 wait-free。farbot 提供了上述 16 种 FIFO。

FIFO的总结:

使用场景

  • 数据比较大,std::atomic<float>::is_always_lock_free == false
  • 在非实时与实时线程之间传递数据

代价

  • 固定的 FIFO 大小
  • 当 FIFO 满后的行为(阻塞/丢弃/覆盖)
  • FIFO 读写的开销

示例

  • 打印日志、写入数据到磁盘(录音)、文件读写等

Question 2: 是否与实时线程交互?

当你选择线程之间共享数据,那么再问自己一个问题:我是否正在与实现线程交互?如果答案是否定的,那么使用锁就能解决你的问题。
举个例子,下面的代码中一个线程接受事件,同时记录事件信息,另一个线程将事件信息写入文件。事件信息使用 std::queue 来存放,两个线程共享这个对列,同时使用 std::mutex 来同步数据。

#include <queue>
#include <mutex>
#include <algorithm>

std::queue<string> string_queue; // 1 
std::mutex some_mutex; // 2

 
auto log_worker = [&] () {
    while(is_running()){
        std::lock_guard<std::mutex> guard(some_mutex);
        for(;!string_queue.empty();){
            print_log_to_file(string_queue.front());
            string_queue.pop();
        }
    }
};
 
auto event_worker = [&] () {
    while (is_running())
    {
        wait_for_event();
        
        // log event information
        std::lock_guard<std::mutex> guard(some_mutex);
        string_queue.push(get_event_info());
    }
};
 
auto log_future = std::async (std::launch::async, audio_worker);
auto event_future = std::async (std::launch::async, gui_worker);

log_future.wait();
event_future.wait();

Question 3: 共享数据是否足够小?

如果你与实时线程共享的数据足够小,那么使用 std::atomic。举个具体的例子,如下:

auto gain = 1.0f;

void processSensorData(float* sensorInOut, int n)
{
    // do some dsp
    ...
    
    for(int i = 0; i < n; ++i)
        sensorInOut *= gain;
}

// called on another thread
void setSensorGain(float newGain)
{
    gain = newGain;
}

上面的代码中 setSensorGain 对 gain 进行了修改,与 processSensorData 形成了数据争用(data race)。数据争用在 C/C++ 中是未定义行为,这导致你的代码行为令人难以捉摸,出现 bug 难以定位和排查。我们讨论其中一种可能,编译器可能假设在 processSensorData 中,gain 不会发生变化,因此代码可能被优化为:

void processSensorData(float* sensorInOut, int n)
{
    // do some dsp
    ...
    regrester auto gain_copy = gain;
    for(int i = 0; i < n; ++i)
        sensorInOut *= gain_copy;
}

这种情况不算太糟糕,只是新参数真正起作用的时间被延后了。但考虑另一种情况,代码如下:

auto gain = 1.0f;

void realtimeThreadEntry()
{
    while(rocketFlying())
    {
        processSensorData(sensorData, 512);
    }
}

void processSensorData(float* sensorInOut, int n)
{
    // do some dsp
    ...
    
    for(int i = 0; i < n; ++i)
        sensorInOut *= gain;
}

// called on another thread
void setSensorGain(float newGain)
{
    gain = newGain;
}

上述代码中,realtimeThreadEntry 为实时线程入口,内部不停地在循环处理数据,由于 processSensorData 代码量相当小,编译器可能进行 inline 优化:

void realtimeThreadEntry()
{
    while(rocketFlying())
    {
        // do some dsp
        ...
        
        for(int i = 0; i < n; ++i)
            sensorInOut *= gain;
    }
}

同样的,编译器可能假设 gain 不会发生变化,采用了之前的优化,于是代码变成了这样:

void realtimeThreadEntry()
{
    regrester auto gain_copy = gain;

    while(rocketFlying())
    {
        // do some dsp
        ...
    
        for(int i = 0; i < n; ++i)
            sensorInOut *= gain_copy;
    }
}

// called on another thread
void setSensorGain(float newGain)
{
    gain = newGain;
}

这样的优化导致 setSensorGain 无论如何都无法对 realtimeThreadEntry 起作用了。因为我们的代码存在数据争用,引入了未定义的行为,这就意味着任何事情都可能发生。我们要极力避免这种情况。

再来看一个例子,加深对于未定义行为的畏惧之心,假设你想要推翻费马大定理的证明,那么你可能写下面这样的代码,来遍历所有数字企图找到一个反例:

bool threadRunning;

// thread 1
bool proveFermatsLastTheorem(){
    threadRunning = true;
    for(int n = 3; threadRunning; ++n){
        if(pow(x, n) + pow(y, n) == pow(z, n)){
            return false;
        }
    }
    
    return true;
}

void testTheorem(){
    bool result;
    startThread([](){ result = proveFermatsLastTheorem() });
    Sleep(2000);
    threadRunning = false;
    cout << result << endl;
}

上述代码中,优化器可能认为 threadRunning 永远都是 true,为了减少每次都从内存中读取的耗时,它可能进行如下优化:

bool proveFermatsLastTheorem(){
    while(true){
        if(pow(x, n) + pow(y, n) == pow(z, n)){
            return false;
        }
        ++n;
    }

    return true;
}

接着,编译器进一步优化,将真正的循环条件提前:

bool proveFermatsLastTheorem(){
    while(pow(x, n) + pow(y, n) != pow(z, n)) ++n;
    
    return false;
    return true;
}

显然,最后一句 return 是多余的:

bool proveFermatsLastTheorem(){
    while(pow(x, n) + pow(y, n) != pow(z, n)) ++n;
    
    return false;
}

至此,整个函数无论如何只会返回 false,因此最终优化的结果可能是:

bool proveFermatsLastTheorem(){
    return false;
}

回到我们的问题中来,如果你与实时线程共享的数据足够小,那么使用 std::atomic。它能够解决上面两个例子的未定义行为,保证线程共享数据之间的同步性,同时阻止优化器过渡优化。

需要注意的是,在使用 std::atomic 时,我们需要检查是否是 look-free 实现:

static_assert(std::atomic<float>::is_always_lock_free)

如果 ::is_always_lock_free 返回 false,那么它本质上还是使用了锁来同步数据,这是在实时线程编程中要避免的。

总结下关于 std::atomic 使用

使用场景

  • 多个线程可能对数据进行修改

代价

  • 数据必须足够小,要满足 std::atomic<float>::is_always_lock_free == true
  • 只允许某些操作,例如 store, load, fetch_add,
    fetch_add 等等

示例

  • 线程之间共享小数据
  • 增量控制量、电平值、算法参数等等

Question 4: 获取共享资源是否允许失败?

当我们需要共享一个较大的数据,即 ::is_always_lock_free 返回 false 时,考虑是否可以允许获取资源失败。如果允许,那么使用 try_lock。举个例子,在音频线程中使用查表法,在非实时线程中更新表。

class WavetabelSynthesizer
{
public:
    void audioCallback()
    {
        if(std::unique_lock<mutex> tryLock(mutex, std::try_to_lock); tryLock.owns_lock())
        {
            // Do something with wavetable
        }else
        {
            // Do something else as wavetabel is not available
        }
    }
    
    void updateWavetable(/* args */)
    {
        auto newWavetable = std::make_unique<Wavetable>(/* args */);
        {
            std::lock_guard<std::mutex> lock(mutex);
            std::swap(wavetable, newWavetable);
        }
    }
private:
    std::mutex mutex;
    std::unique_ptr<Wavetable> wavetable;
}

上述代码中,我们使用 try_lock 来尝试获取 wavetable 的所有权,并对获取成功和失败两种情况分别进行不同的操作。

但有一个细节需要特别注意,代码中使用了 std::mutex ,它配合 std::unique_lock 进行 try_lock 操作。查看 c++ 文档,std::mutex::try_lock 是 wait-free 操作,这非常好,但如果获取锁成功,那么在 std::unique_lock 析构的时候会调用 std::mutex::unlock 去通知其他线程,而这会调用系统 api,这是我们需要避免的,因此这种情况下 std::mutex 是不合适的。

为此,我们可以使用 spin_lock,即自旋锁。下面给出一种最基本的 spin_lock 实现,它使用 std::atomic_flag 进行 lock, unlock 和 try_lock 的实现,没有任何系统 api 调用,不会造成 block。

class spin_lock
{
public:
    void lock() noexcept { while(flag.test_and_set()); }
    void unlock() noexcept { flag.clear(); }
    bool try_lock() noexcept { return !flag.test_and_set(); }
    
private:
    std::atomic_flag flag = ATOMIC_FLAG_INIT;
}

try_lock 总结:

使用场景

  • 数据比较大,std::atomic<float>::is_always_lock_free == false
  • 允许获取资源失败

代价

  • 非实时线程获取资源时需要等待实时线程释放资源
  • 实时线程需要处理好资源获取失败的情况

示例

  • 将大型数据传递给实时线程供其使用
  • 音频采样数组、波形图数据、滤波器参数等

Question 5:实时线程会修改线程间共享的数据吗?

如果资源获取失败是不被允许的,即无法使用 try_lock,那么考虑一个问题:实时线程是否会对共享数据进行修改?如果答案是否定的,那么可以使用 CAS 循环(Compare and Set Loop)。一个经典的例子就是双二阶滤波器算法的实现:

struct BiquadCoeffecients { float b0, b1, b2, a1, a2;}
BiquadCoeffecients coeffs;

BiquadCoeffecients calculateLowPassCoeffecients(float freq);

void audioThread(const float* src, float* dst, size_t n)
{
    static float lv1, lv2;
    
    for(size_t i = 0; i < n; ++i)
    {
        auto input = src[i];
        auto output = (input * coeffs.b0) + lv1;
        
        lv1 = (input * coeffs.b1) - (output * coeffs.a1) + lv2;
        lv2 = (input * coeffs.b2) - (output * coeffs.a2);
    }
}

void updateFrequecyParameter(float newValue)
{
    coeffs = calculateLowPassCoeffecients(newValue);
}

其中 audioThread 运行在实时线程中进行音频计算;updateFrequecyParameter 被其他线程调用,负责更新参数。

上述代码并没有对coeffs 进行多线程保护,这会导致未定义行为。在这个例子中,可能会偶尔出现 glitch 或者奇怪的杂音。

为了避免这种情况,一种最简单的方式是使用 std::atomic

struct BiquadCoeffecients { float b0, b1, b2, a1, a2;}
std::atomic<BiquadCoeffecients> coeffs;

void audioThread(const float* src, float* dst, size_t n)
{
    static float lv1, lv2;
    auto local_coeffs = coeffs.load();
    
    for(size_t i = 0; i < n; ++i)
    {
        auto input = src[i];
        auto output = (input * local_coeffs.b0) + lv1;
        
        lv1 = (input * local_coeffs.b1) - (output * local_coeffs.a1) + lv2;
        lv2 = (input * local_coeffs.b2) - (output * local_coeffs.a2);
    }
}

但很遗憾,由于BiquadCoeffecients 包含 5 个 float,它太大了,以至于 std::atomic<BiquadCoeffecients> 并不是 lock free 的实现。因此这种情况下,我们不能使用 std::atomic<BiquadCoeffecients>

在引出 CAS 方法前,我们先考虑一种使用 std::atomic<BiquadCoeffecients*> 的方法,对上述代码做简单的修改:

struct BiquadCoeffecients { float b0, b1, b2, a1, a2;}
std::atomic<BiquadCoeffecients*> coeffs;

void audioThread(const float* src, float* dst, size_t n)
{
    auto* coeffsCopy = coeffs.copy();
    processBiquad(src, dst, n, coeffsCopy);
}

void updateFrequecyParameter(float newValue)
{
    coeffs = new BiquadCoeffecients(calculateLowPassCoeffecients(newValue));
}

使用 std::atomic 来管理指针,保证其更新是多线程安全的。这种方式可以工作,但它会造成内存泄露,因为你无法知道 coeffs 是否在被使用。也许我们可以通过设置 flag 来标识 coeffs 是否在实时线程中被使用:

struct BiquadCoeffecients { float b0, b1, b2, a1, a2;}
BiquadCoeffecients* coeffs;
std::atomic<bool> isInAudioThread{ false };

void audioThread(const float* src, float* dst, size_t n)
{
    isInAudioThread = true;
    auto* coeffsCopy = coeffs.copy();
    processBiquad(src, dst, n, coeffsCopy);
    isInAudioThread = false;
}

void updateFrequecyParameter(float newValue)
{
    auto* ptr = new BiquadCoeffecients(calculateLowPassCoeffecients(newValue));
    
    while(isInAudioThread.load())
        ;
    
    std::swap(ptr, coeffs);
    delete ptr;
}

上面代码中,我们加入 isInAudioThread flag 变量,在进行音频线程处理时将其设置为 true,退出时为 flase;在非实时线程中,如果 isInAudioThread 为 true,则循环等待实时线程退出,然后通过 swap 对数据指针进行更新,最后释放旧数据。

但这仍然存在问题,在检查资源是否被使用(即 while 循环)和更新资源之间,实时线程重新占用了资源,即isInAudioThread 发生变化。因此检查与更新不具有原子性,导致了所谓的 ABA 问题:

void updateFrequecyParameter(float newValue)
{
    auto* ptr = new BiquadCoeffecients(calculateLowPassCoeffecients(newValue));
    
    while(isInAudioThread.load())
        ;
    // isInAudioThread could be changed here
    std::swap(ptr, coeffs);
    delete ptr;
}

最终的解决方案是结合上面两种方法,具体看下面代码:

struct BiquadCoeffecients { float b0, b1, b2, a1, a2;}
std::unique_ptr<BiquadCoeffecients> storage { std::make_unique<BiquadCoeffecients>() };
std::atomic<BiquadCoeffecients*> biquadCoeffs;

void processAudio(float* buffer)
{
    auto* coeffs = biquadCoeffs.exchange(nullptr);
    
    processBiquad(*coeffs, buffer);
    
    biquadCoeffs = coeffs;
}

void changeBiquadParameters(BiquadCoeffecients newCoeffs)
{
    auto newBiquad = std::make_unique<BiquadCoeffecients>(newCoeffs);
    
    for(auto* expected = storage.get(); !biquadCoeffs.compare_exchange_strong(expected, newBiquad.get()); 
        expected = storage.get())
        ;
        
    storage = std::move(newBiquad);
}

我们使用 std::unique_ptr<BiquadCoeffecients> 存放参数数据资源,用 std::atomic<BiquadCoeffecients*> 指向当前可用资源。

在实时线程中,首先使用 exchange 返回当前可用资源指针的同时,将其置空,而置空则表示实时线程正在使用该资源。在实时线程完成处理后,biquadCoeffs = coeffs; 重新赋值资源指针。

在非实时线程中对参数进行更新,for 循环中关键代码 biquadCoeffs.compare_exchange_strong(expected, newBiquad.get()) 表示如果实时线程不再使用资源,那么就将资源指针更新;否则就自旋等待实时线程。最后通过 move 来更新资源。

这种方法无法处理实时线程更新数据的情况,下面代码为例,如果 b0 参数在实时线程进行了更新,那么非实时线程将无法得知这一更新,导致实时线程所有的参数更新就被丢失。

void processAudio(float* buffer)
{
    auto* coeffs = biquadCoeffs.exchange(nullptr);
    
    processBiquad(*coeffs, buffer);
    coeffs->b0 *= 2; // update data in real-time thread
    
    biquadCoeffs = coeffs;
}

void changeBiquadParameters(BiquadCoeffecients newCoeffs)
{
    auto newBiquad = std::make_unique<BiquadCoeffecients>(newCoeffs);
    
    for(auto* expected = storage.get(); !biquadCoeffs.compare_exchange_strong(expected, newBiquad.get()); 
        expected = storage.get())
        ;
        
    storage = std::move(newBiquad);
}

非实时线程修改数据的总结:

使用场景

  • 数据比较大,std::atomic<float>::is_always_lock_free == false
  • 非实时线程可以修改数据
  • 实时线程总能获取到资源

代价

  • 实时线程不能修改数据
  • 非实时线程需等待实时线程释放资源
  • 在非实时线程上有复制的开销

示例

  • 非实时线程共享大型数据供实时线程使用
  • 音频采样数组、波形图数据、滤波器参数等

Question 6:非实时线程会修改线程间共享的数据吗?

如果实时线程需要更新数据,并且非实时线程不会修改数据,那么使用双缓冲策略。其中一个 buffer 用于实时线程,另一个用于非实时线程。它们满足:

  1. 两个 buffer 的数据都被预先正确的初始化
  2. 实时线程能够随时对数据进行更新
  3. 当非实时线程想要读取数据时,交换两个 buffer 的槽(slot)
  4. 实时线程能够继续更新数据,而非实时线程也能读取数据。

看下面这个例子:

using FrequencySpectrum = std::array<float, 512>;

std::array<FrequencySpectrum,2> mostRecentSpectrum;
// idx denotes current slot of realtime thread, idx xor 1 denotes slot of non-realtime-thread
std::atomic<int> idx = {0};

void processAudio(const float* buffer, size_t n)
{
    auto freqSpec = calcualteSpectrum(buffer, n);
    
    mostRecentSpectrum[idx.load()] = freqSpec;
}

void updateSpectrumUIbuttonClicked()
{
    auto i = idx.fetch_xor(1);
    displaySpectrum(mostRecentSpectrum[i]);
}

我们用 idx 来表示对两个 buffer 的引用。实时线程更新数据时,只需要简单的引用 idx.load() 即可;而非实时线程读取数据时,需要做 fetch_xor 操作,进行交换 slots。

很简单的策略,但非常遗憾,它是有缺陷的。。这里举例说明这种情况:

  1. 开始时,两个 buffer 数据为:A - B
  2. 接着,实时线程更新数据:C - B
  3. 非实时线程获取数据第一次,进行 slots 交换:B - C
  4. 非实时线程获取数据第二次,进行 slots 交换:C - B

可以看到,如果 updateSpectrumUIbuttonClicked 连续被触发两次,它将使用错误的旧数据。因此需要引入额外的 flag 来标识实时是否更新了数据,例如:

using FrequencySpectrum = std::array<float, 512>;

std::array<FrequencySpectrum,2> mostRecentSpectrum;
std::atomic<int> idx = {0};
std::atomic<bool> hasNewData = false;

void processAudio(const float* buffer, size_t n)
{
    auto freqSpec = calcualteSpectrum(buffer, n);
    
    mostRecentSpectrum[idx.load()] = freqSpec;
    hasNewData = true;
}

void updateSpectrumUIbuttonClicked()
{
    if(hasNewData){
        auto i = idx.fetch_xor(1); // swap slot index
        displaySpectrum(mostRecentSpectrum[i]);
        
        // hasNewData could be changed here
        hasNewData = false;
    }else{
        auto i = idx.load() ^ 1;
        displaySpectrum(mostRecentSpectrum[i]);
    }

}

但引入两个 atomic 来解决一个问题,通常会导致 ABA 问题。在上面代码中,在 hasNewData = false 之前,实时线程可能对 hasNewData = true 的操作,而这个操作直接就被覆盖了,这就导致数据的丢失。

为了不引入 ABA 问题,我们采用一种曲线救国的办法:使用位运算。具体代码如下:

using FrequencySpectrum = std::array<float, 512>;

enum {BIT_IDX = (1 << 0), BIT_NEWDATA = (1 << 1)};

std::array<FrequencySpectrum,2> mostRecentSpectrum;
std::atomic<int> idx = {0};


bool hasNewData(int i)
{
    return (i & BIT_NEWDATA) != 0;
}

int swapIndicesAndResetNewDataBit(int i)
{
    i = (i & BIT_IDX) ^ 1;
    return i;
}

void processAudio(const float* buffer, size_t n)
{
    auto freqSpec = calcualteSpectrum(buffer, n);
    
    auto i = idx.load() & BIT_IDX;
    mostRecentSpectrum[i] = freqSpec;
    idx.store((i & BIT_INX) | BIT_NEWDATA);
}

void updateSpectrumUIbuttonClicked()
{
    auto current = idx.load();
    
    if( hasNewData(current) )
    {
        current = swapIndicesAndResetNewDataBit(current);
        idx.store(current);
    }
    
    displaySpectrum(mostRecentSpectrum[(current & BIT_IDX) ^ 1]);
}

上述代码乍一看挺复杂,耐下心来看其实不复杂。它利用一个 std::atomic<int> 的位信息来表示多个状态,避免了 ABA 问题。从二进制的角度来看,状态有四种:

位信息状态
0x00无新数据;实时线程的 slot_index = 0
0x01无新数据;实时线程的 slot_index = 1
0x10有新数据;实时线程的 slot_index = 0
0x11有新数据;实时线程的 slot_index = 1

可以看到,高位用于表示有无新数据更新,低位用于表示实时线程使用的 slot index。当实时线程更新数据时,通过 | BIT_NEWDATA 来设置高位 bit 的值;当非实时线程读取数据时,它将交换 slot index 同时重置高位 bit。

这种方法避免了 ABA 问题,但很遗憾它是错误的,虽然已经接近正确答案了。你能看出哪里有问题吗?答案是 mostRecentSpectrum 发生了数据竞争,我们以下面线程执行顺序为例

Realtime ThreadNon-realtime Thread
void processAudio(...);
idx = 0x10
void processAudio(...)
{
i = idx & BIT_IDX = 0;
}
current = idx = 0x01
current = swapIndicesAndResetNewDataBit() = 0x01
(current & BIT_IDX) ^ 1 = 0

如果线程按照上述顺序执行,那么此时此刻,实时线程将往 0 号 buffer 写数据,同时非实时线程将从 0 号 buffer 读数据,这就引入了对于 0 号 buffer 的数据竞争问题。

OK 为了避免这种问题,需要再额外引入一个 bit 位,BUSY_BIT。当 BUSY_BIT 被设置位 1 时,表示实时线程正在在使用某个 slot index,非实时线程则需要等待 BUSY_BIT 为 0 才去重置 idx。具体看下面的代码:

using FrequencySpectrum = std::array<float, 512>;

enum {BIT_IDX = (1 << 0), BIT_NEWDATA = (1 << 1), BUSY_BIT = (1 << 2)};

std::array<FrequencySpectrum,2> mostRecentSpectrum;
std::atomic<int> idx = {0};

bool hasNewData(int i)
{
   return (i & BIT_NEWDATA) != 0;
}

void processAudio(const float* buffer, size_t n)
{
   auto freqSpec = calcualteSpectrum(buffer, n);
   
   auto i = idx.fetch_or(BIT_BUSY) & BIT_IDX;
   mostRecentSpectrum[i] = freqSpec;
   idx.store((i & BIT_INX) | BIT_NEWDATA);
}

void updateSpectrumUIbuttonClicked()
{
   auto current = idx.load();
   
   if( hasNewData(current) )
   {
       int newValue;
       
       do{
           current &= ~BIT_BUSY;
           newValue = (current ^ BIT_IDX) & BIT_IDX;
       }while(!idx.compare_exchange_weak(current, newVaule))
       
       current = newValue;
   }
   
   displaySpectrum(mostRecentSpectrum[(current & BIT_IDX) ^ 1]);
}

好吧,这实在是有些复杂,好在 Fabian Renn-Giles & Dave Rowland 开源了 farbot 简化了这些复杂的操作。使用 farbot 后代码如下:

using FrequencySpectrum = std::array<float, 512>;
RealtimeObject<FrequencySpectrum, RealtimeObjectOptions::realtimeMutatable> mostRecentSpectrum;

/* called on realtime thread */
void processAudio (const float* buffer, size_t n) {
    RealtimeObject<FrequencySpectrum, RealtimeObjectOptions::realtimeMutatable>::ScopedAccess<ThreadType::realtime> freqSpec(mostRecentSpectrum);
    *freqSpec = calculateSpectrum (buffer, n);
}

/* called on non-realtime thread */
void updateSpectrumUIButtonClicked() {
    RealtimeObject<FrequencySpectrum, RealtimeObjectOptions::realtimeMutatable>::ScopedAccess<ThreadType::nonRealtime> recentSpectrum(mostRecentSpectrum);
    displaySpectrum(*recentSpectrum);
}

实时线程修改数据的总结:

使用场景

  • 数据比较大,std::atomic<float>::is_always_lock_free == false
  • 实时线程可以修改数据
  • 实时线程总能获取到资源

代价

  • 非实时线程不能修改数据
  • 非实时线程需等待实时线程释放资源
  • 在 实时/非实时线程 上有复制的开销

示例

  • 实时线程共享大型数据供非实时线程使用
  • GUI 数据可视化,频谱图,示波器等

Question 7: 实时线程和非实时线程都会修改数据,这该怎么办?

最后一种情况,也是最复杂的情况,当实时/非实时线程都会修改数据时,

接下来要介绍 callAsync 机制。如果你想在实时线程做一些非实时线程安全的事情,例如打印 log、读取文件等等,我们可以采取这样一种策略:将这些任务延缓执行,将其放置到某个 FIFO 中,之后再由非实时线程去执行它们。

farbot 中提供了 AsyncCaller 来实现 callAsync。具体代码如下:

class AsyncCaller{
public:
    void callAsycn(std::function<void()>&& lambda)
    {
        auto success = queue.push(std::move(lambda));
        assert(success);
    }
    
    void process()
    {
        std::function<void()> lambda;
        while(queue.pop(lambda))
            lambda();
    }
private:
    fifo<std::function<void()>> queue;
}

AsyncCaller messageThreadExecutor;

void timerCallback(){
    messageThreadExecutor.process();
}

messageThreadExecutor.callAsync([](){ cout << "hello world" << endl; });

上述代码中:

  1. 实时线程中调用 callAsycn 将 lambda 函数 push 到 wait-free FIFO 中。这里需要保证 lambda 是 real-time movable。关于 real-time movable,farbot 提供了方便的工具来帮助我们判断。
  2. 在 JUCE 中也有类似 AsyncCaller 的工具,叫 AsyncUpdater。在其实现中,AsyncUpdater 将任务 push 到队列后,然后去唤醒另一个线程来执行任务。而唤醒线程是系统调用,它不是实现线程安全的操作。
  3. 非实时线程中,调用 process 从 FIFO 中 pop 出 lambda 函数,并执行它。具体实现中,可以通过设置定时器(timer)来间歇性执行 process 函数。

介绍完 AsyncCaller 后,让我们进入最后也是最困难的一中情况:实时线程和非实时线程都会修改数据。作者之言,他认为这种情况下无法做到 wait-free,但可以曲线救国,选定一个线程负责执行修改数据的操作,其他线程只要把修改数据命令发送到该线程即可。

举个例子,例如在音频线程中去 mix 音频,用户可以在 UI 线程增加或者删除音源,音频线程也可以增加或者删除音源。我们选择音频负责执行数据修改,因为
当实时线程中发生修改事件时,能够立马得到处理。

伪代码如下:

struct SourceList{
  std::array<const float*, MAX_SOURCES> buffers = {};
  int numSources = 0;
};

class Mixer
{
public:
    void mixAllSources(float* output, char* realtimeEventMessage, int n)
    {
        processRealtimeEvents(realtimeEventMessage); // may add and remove sources
        realtimeThreadCaller.process(); // process all the lambdas
        
        RealtimeObject<SourceList, RealtimeObjectOptions::realtimeMutatable>::ScopedAccess<ThreadType::realtime> sourceList(sharedSourceList);

        for(int i = 0; i < sourceList->numSources; ++i)
            mixSource(output, sourceList->buffers[i]);
    }
    
    void addSource(const float* src)
    {
        if(!isRealtimeThread())
        {
            realtimeThreadCaller.callAsync([src]( addSource(src); ));
            return;
        }
        
        RealtimeObject<SourceList, RealtimeObjectOptions::realtimeMutatable>::ScopedAccess<ThreadType::realtime> sourceList(sharedSourceList);
        assert(sourceList->numSources < MAX_SOURCES);
        sourceList->buffers[sourceList->numSources++] = src;
    }
    
private:
    RealtimeObject<SourceList, RealtimeObjectOptions::realtimeMutatable> sharedSourceList;
    AsyncCaller realtimeThreadCaller;
    
}


Mixer mixer;

// audio thread
while(audioThreadIsRuning()){

    if(comesNewSource())
    {
        mixer.mixAllSources(output, "Add Source", num_samples);
    }else{
        mixer.mixAllSources(output, nullptr, num_samples);
    }
}

// UI thread
mixer.addSource(...)

上述代码中,

  1. mixAllSources 在进行 mix 音频数据之前,调用 realtimeThreadCaller.process() 去执行修改数据的任务。
  2. addSource 如果是非实时线程调用,那么通过callAsync 函数将任务 push 到 FIFO 中;如果是实时线程调用,那么立刻马上修改sourceList 数据。

实时/非实时线程修改数据的总结:

使用场景

  • 数据比较大,std::atomic<float>::is_always_lock_free == false
  • 实时线程与非实时线程共享数据
  • 实时/非实时线程都能修改数据

代价

  • 其中一个线程需要持有数据
  • 同时具有 FIFO 和实时/非实时线程修改数据的代价
  • 实现复杂

示例

  • 管理动态音频流,其中数据包丢失是不可接受的

总结

最后用一张图总结

以上是关于实时音频编程:实践与技巧的主要内容,如果未能解决你的问题,请参考以下文章

是否有一种方法可以将实时记录的音频片段连续发送到Flutter.io中的后端服务器?

具有不同长度的音频文件的 HTTP 实时流式传输

你可能不知道的JavaScript代码片段和技巧(下)

你可能不知道的JavaScript代码片段和技巧(上)

响应式编程|Kotlin与LiveData扩展函数实践技巧

《Python机器学习及实践》----模型实用技巧