实时音频编程:实践与技巧
Posted 芥末的无奈
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了实时音频编程:实践与技巧相关的知识,希望对你有一定的参考价值。
文章目录
简介
在 实时音频编程(一)中,我们总结了实时音频编程几条经验法则,先来回顾下它们,在实时线程中,你不能做:
- 不要申请或者释放内存
- 不要使用锁
- 不要进行文件读写,或者其他方式的 I/O(这包括任何 print 或者 NSLog,或者 GUI API)
- 不要调用那些可能造成阻塞的系统 api
- 不要运行那些执行时间不确定,或者最坏时间复杂度有激增的代码
- 不要调用任何有上述行为的代码
- 不要调用任何你不信任的代码
在可能的情况下,有几件事你应该做:
- 用最坏时间复杂度来衡量算法,选用最坏时间复杂度友好的算法
- 在许多音频采样中摊销计算,以平滑CPU的使用,而不是使用偶尔有长处理时间的 "突发 "算法。
- 在一个非实时线程中预先分配或预先计算数据
- 采用非共享的、仅在音频回调中使用的数据结构,这样你就不需要考虑共享、并发和锁的问题。
接来下,将对 2019 JUCE 开发者大会上分享的 real-time 101 演讲进行总结。Fabian Renn-Giles & Dave Rowland 分享了许多实时编程的实用技巧,我们将通过一系列的问题,来确定不同场景下的解决方案。总结如下图:
Q&A
接下来,你将通过回答一些问题来确定不同场景的解决方案,每种场景都会引入一种工具。
Question 1: 你是选择传递还是共享对象?
让我们进入第一个问题:你是选择传递还是共享对象。如果你选择传递(即复制)对象到另一个线程,那么两个线程之间可以通过 FIFO 来交互。
FIFO(First Input First Output),即先进先出队列。它非常适合用于线程之间的数据传递。在实时编程中,我们常使用 ringbuffer 来实现 FIFO。ringbuffer 有固定的大小,也就意味着没有内存申请,此外它还有很多不同的类型。
就拿最简单的情况来说,即单个生产者单个消费者,它的 wait-free 版本如下:
template <typename T> class fifo{
public:
bool push(T && arg){
auto pos = writepos.load();
auto next = (pos + 1) % slots.size();
if(next == readpos.load())
return false;
slots[pos] = std::move(arg);
writepos.store(next);
return true;
}
bool pop(T& result){
auto pos = readpos.load();
if(pos == writepos.load())
return false;
result = std::move(slots[pos]);
readpos.store((pos + 1) % slots.size());
return true;
}
private:
std::vector<T> slots = {};
std::atomic<int> readpos = {0};
std::atomic<int> writepos = {0};
};
FIFO 有非常非常多不同的版本,应用的场景也不同,那么要如何决定选用哪种类型的 FIFO 呢?你只需要问自己两个问题:
- 是否会有多个线程同时向 FIFO 中读/写数据?
- 如果写的时候 FIFO 是满的,读的时候是空的,会发生什么?
站在生产者的角度来看,它可以是单生产者写入 FIFO,也可以是多生产者多个线程同时写入;当 FIFO 满了后,继续写入可以是强制覆盖,也可以是告知生产者写入失败。
站在消费者的角度来看,它可以是单消费者读取 FIFO 数据,也可以是多个消费者多个线程同时读取;当 FIFO 为空后,继续读取数据可以是返回 null,也可是告知消费者读取失败。
举个几个例子来说明如何进行选择。
例子1,假设你正在写一个实时显示音频波形的是程序,音频线程从麦克风获取音频,并将音频数据发送给 UI 线程。这种情况下,因为只有音频线程在生产数据,只有 UI 线程在消费数据,因此肯定采用的是单生产者-单消费者;此外,实时线程在不断的生产数据,UI 线程通常只要考虑显示最新的音频数据,因此当 FIFO 满后,直接强制覆盖没有任何影响;但 FIFO 如果为空,有可能是音频线程出现了问题,因此最好还是告知 UI 线程读取数据失败。基于以上分析,应该选择单生产者、单消费者、队列满时写入覆盖、队列空时读取失败的 FIFO。
例子2,假设你正在给一个高精度感应器完成异常值检测的功能,该仪器有多个感应器,它们同时向 FIFO 写入数据值,在消费线程中从 FIFO 中读取数据并检测是否有异常值。因此我们可以采用 多生产者-单消费者模式。由于异常值非常重要,当 FIFO 满后,你不能简单地强制覆盖,这会导致你错过异常值,因此合适的做法应该是告知写入失败。另一方面,当 FIFO 为空时,可以认为此时没有异常值,因此返回 null/0 是合适的。基于以上分析,应该选择多生产者、单消费者、队列满时写入失败、队列空时读返回 null/0 的 FIFO。
根据上面两个问题的答案,排列组合可以得到共有 16 中 FIFO,它们对读写线程 wait-free 的支持程度是不同的,具体如下表:
比如例子1,单生产者、单消费者、队列满时写入覆盖、队列空时读取失败的 FIFO,读写两端都支持 wait-free;而例子2,多生产者、单消费者、队列满时写入失败、队列空时读返回 null/0 的 FIFO,只在读线程是 wait-free。farbot 提供了上述 16 种 FIFO。
FIFO的总结:
使用场景:
- 数据比较大,
std::atomic<float>::is_always_lock_free == false
- 在非实时与实时线程之间传递数据
代价:
- 固定的 FIFO 大小
- 当 FIFO 满后的行为(阻塞/丢弃/覆盖)
- FIFO 读写的开销
示例
- 打印日志、写入数据到磁盘(录音)、文件读写等
Question 2: 是否与实时线程交互?
当你选择线程之间共享数据,那么再问自己一个问题:我是否正在与实现线程交互?如果答案是否定的,那么使用锁就能解决你的问题。
举个例子,下面的代码中一个线程接受事件,同时记录事件信息,另一个线程将事件信息写入文件。事件信息使用 std::queue
来存放,两个线程共享这个对列,同时使用 std::mutex
来同步数据。
#include <queue>
#include <mutex>
#include <algorithm>
std::queue<string> string_queue; // 1
std::mutex some_mutex; // 2
auto log_worker = [&] () {
while(is_running()){
std::lock_guard<std::mutex> guard(some_mutex);
for(;!string_queue.empty();){
print_log_to_file(string_queue.front());
string_queue.pop();
}
}
};
auto event_worker = [&] () {
while (is_running())
{
wait_for_event();
// log event information
std::lock_guard<std::mutex> guard(some_mutex);
string_queue.push(get_event_info());
}
};
auto log_future = std::async (std::launch::async, audio_worker);
auto event_future = std::async (std::launch::async, gui_worker);
log_future.wait();
event_future.wait();
Question 3: 共享数据是否足够小?
如果你与实时线程共享的数据足够小,那么使用 std::atomic
。举个具体的例子,如下:
auto gain = 1.0f;
void processSensorData(float* sensorInOut, int n)
{
// do some dsp
...
for(int i = 0; i < n; ++i)
sensorInOut *= gain;
}
// called on another thread
void setSensorGain(float newGain)
{
gain = newGain;
}
上面的代码中 setSensorGain
对 gain 进行了修改,与 processSensorData
形成了数据争用(data race)。数据争用在 C/C++ 中是未定义行为,这导致你的代码行为令人难以捉摸,出现 bug 难以定位和排查。我们讨论其中一种可能,编译器可能假设在 processSensorData
中,gain 不会发生变化,因此代码可能被优化为:
void processSensorData(float* sensorInOut, int n)
{
// do some dsp
...
regrester auto gain_copy = gain;
for(int i = 0; i < n; ++i)
sensorInOut *= gain_copy;
}
这种情况不算太糟糕,只是新参数真正起作用的时间被延后了。但考虑另一种情况,代码如下:
auto gain = 1.0f;
void realtimeThreadEntry()
{
while(rocketFlying())
{
processSensorData(sensorData, 512);
}
}
void processSensorData(float* sensorInOut, int n)
{
// do some dsp
...
for(int i = 0; i < n; ++i)
sensorInOut *= gain;
}
// called on another thread
void setSensorGain(float newGain)
{
gain = newGain;
}
上述代码中,realtimeThreadEntry
为实时线程入口,内部不停地在循环处理数据,由于 processSensorData
代码量相当小,编译器可能进行 inline 优化:
void realtimeThreadEntry()
{
while(rocketFlying())
{
// do some dsp
...
for(int i = 0; i < n; ++i)
sensorInOut *= gain;
}
}
同样的,编译器可能假设 gain 不会发生变化,采用了之前的优化,于是代码变成了这样:
void realtimeThreadEntry()
{
regrester auto gain_copy = gain;
while(rocketFlying())
{
// do some dsp
...
for(int i = 0; i < n; ++i)
sensorInOut *= gain_copy;
}
}
// called on another thread
void setSensorGain(float newGain)
{
gain = newGain;
}
这样的优化导致 setSensorGain
无论如何都无法对 realtimeThreadEntry
起作用了。因为我们的代码存在数据争用,引入了未定义的行为,这就意味着任何事情都可能发生。我们要极力避免这种情况。
再来看一个例子,加深对于未定义行为的畏惧之心,假设你想要推翻费马大定理的证明,那么你可能写下面这样的代码,来遍历所有数字企图找到一个反例:
bool threadRunning;
// thread 1
bool proveFermatsLastTheorem(){
threadRunning = true;
for(int n = 3; threadRunning; ++n){
if(pow(x, n) + pow(y, n) == pow(z, n)){
return false;
}
}
return true;
}
void testTheorem(){
bool result;
startThread([](){ result = proveFermatsLastTheorem() });
Sleep(2000);
threadRunning = false;
cout << result << endl;
}
上述代码中,优化器可能认为 threadRunning
永远都是 true
,为了减少每次都从内存中读取的耗时,它可能进行如下优化:
bool proveFermatsLastTheorem(){
while(true){
if(pow(x, n) + pow(y, n) == pow(z, n)){
return false;
}
++n;
}
return true;
}
接着,编译器进一步优化,将真正的循环条件提前:
bool proveFermatsLastTheorem(){
while(pow(x, n) + pow(y, n) != pow(z, n)) ++n;
return false;
return true;
}
显然,最后一句 return 是多余的:
bool proveFermatsLastTheorem(){
while(pow(x, n) + pow(y, n) != pow(z, n)) ++n;
return false;
}
至此,整个函数无论如何只会返回 false,因此最终优化的结果可能是:
bool proveFermatsLastTheorem(){
return false;
}
回到我们的问题中来,如果你与实时线程共享的数据足够小,那么使用 std::atomic
。它能够解决上面两个例子的未定义行为,保证线程共享数据之间的同步性,同时阻止优化器过渡优化。
需要注意的是,在使用 std::atomic
时,我们需要检查是否是 look-free 实现:
static_assert(std::atomic<float>::is_always_lock_free)
如果 ::is_always_lock_free
返回 false,那么它本质上还是使用了锁来同步数据,这是在实时线程编程中要避免的。
总结下关于 std::atomic
使用
使用场景:
- 多个线程可能对数据进行修改
代价:
- 数据必须足够小,要满足
std::atomic<float>::is_always_lock_free == true
- 只允许某些操作,例如 store, load, fetch_add,
fetch_add 等等
示例
- 线程之间共享小数据
- 增量控制量、电平值、算法参数等等
Question 4: 获取共享资源是否允许失败?
当我们需要共享一个较大的数据,即 ::is_always_lock_free
返回 false 时,考虑是否可以允许获取资源失败。如果允许,那么使用 try_lock
。举个例子,在音频线程中使用查表法,在非实时线程中更新表。
class WavetabelSynthesizer
{
public:
void audioCallback()
{
if(std::unique_lock<mutex> tryLock(mutex, std::try_to_lock); tryLock.owns_lock())
{
// Do something with wavetable
}else
{
// Do something else as wavetabel is not available
}
}
void updateWavetable(/* args */)
{
auto newWavetable = std::make_unique<Wavetable>(/* args */);
{
std::lock_guard<std::mutex> lock(mutex);
std::swap(wavetable, newWavetable);
}
}
private:
std::mutex mutex;
std::unique_ptr<Wavetable> wavetable;
}
上述代码中,我们使用 try_lock
来尝试获取 wavetable
的所有权,并对获取成功和失败两种情况分别进行不同的操作。
但有一个细节需要特别注意,代码中使用了 std::mutex
,它配合 std::unique_lock
进行 try_lock
操作。查看 c++ 文档,std::mutex::try_lock
是 wait-free 操作,这非常好,但如果获取锁成功,那么在 std::unique_lock
析构的时候会调用 std::mutex::unlock
去通知其他线程,而这会调用系统 api,这是我们需要避免的,因此这种情况下 std::mutex
是不合适的。
为此,我们可以使用 spin_lock
,即自旋锁。下面给出一种最基本的 spin_lock 实现,它使用 std::atomic_flag
进行 lock, unlock 和 try_lock 的实现,没有任何系统 api 调用,不会造成 block。
class spin_lock
{
public:
void lock() noexcept { while(flag.test_and_set()); }
void unlock() noexcept { flag.clear(); }
bool try_lock() noexcept { return !flag.test_and_set(); }
private:
std::atomic_flag flag = ATOMIC_FLAG_INIT;
}
try_lock 总结:
使用场景:
- 数据比较大,
std::atomic<float>::is_always_lock_free == false
- 允许获取资源失败
代价:
- 非实时线程获取资源时需要等待实时线程释放资源
- 实时线程需要处理好资源获取失败的情况
示例
- 将大型数据传递给实时线程供其使用
- 音频采样数组、波形图数据、滤波器参数等
Question 5:实时线程会修改线程间共享的数据吗?
如果资源获取失败是不被允许的,即无法使用 try_lock
,那么考虑一个问题:实时线程是否会对共享数据进行修改?如果答案是否定的,那么可以使用 CAS 循环(Compare and Set Loop)。一个经典的例子就是双二阶滤波器算法的实现:
struct BiquadCoeffecients { float b0, b1, b2, a1, a2;}
BiquadCoeffecients coeffs;
BiquadCoeffecients calculateLowPassCoeffecients(float freq);
void audioThread(const float* src, float* dst, size_t n)
{
static float lv1, lv2;
for(size_t i = 0; i < n; ++i)
{
auto input = src[i];
auto output = (input * coeffs.b0) + lv1;
lv1 = (input * coeffs.b1) - (output * coeffs.a1) + lv2;
lv2 = (input * coeffs.b2) - (output * coeffs.a2);
}
}
void updateFrequecyParameter(float newValue)
{
coeffs = calculateLowPassCoeffecients(newValue);
}
其中 audioThread
运行在实时线程中进行音频计算;updateFrequecyParameter
被其他线程调用,负责更新参数。
上述代码并没有对coeffs
进行多线程保护,这会导致未定义行为。在这个例子中,可能会偶尔出现 glitch 或者奇怪的杂音。
为了避免这种情况,一种最简单的方式是使用 std::atomic
:
struct BiquadCoeffecients { float b0, b1, b2, a1, a2;}
std::atomic<BiquadCoeffecients> coeffs;
void audioThread(const float* src, float* dst, size_t n)
{
static float lv1, lv2;
auto local_coeffs = coeffs.load();
for(size_t i = 0; i < n; ++i)
{
auto input = src[i];
auto output = (input * local_coeffs.b0) + lv1;
lv1 = (input * local_coeffs.b1) - (output * local_coeffs.a1) + lv2;
lv2 = (input * local_coeffs.b2) - (output * local_coeffs.a2);
}
}
但很遗憾,由于BiquadCoeffecients
包含 5 个 float,它太大了,以至于 std::atomic<BiquadCoeffecients>
并不是 lock free 的实现。因此这种情况下,我们不能使用 std::atomic<BiquadCoeffecients>
。
在引出 CAS 方法前,我们先考虑一种使用 std::atomic<BiquadCoeffecients*>
的方法,对上述代码做简单的修改:
struct BiquadCoeffecients { float b0, b1, b2, a1, a2;}
std::atomic<BiquadCoeffecients*> coeffs;
void audioThread(const float* src, float* dst, size_t n)
{
auto* coeffsCopy = coeffs.copy();
processBiquad(src, dst, n, coeffsCopy);
}
void updateFrequecyParameter(float newValue)
{
coeffs = new BiquadCoeffecients(calculateLowPassCoeffecients(newValue));
}
使用 std::atomic
来管理指针,保证其更新是多线程安全的。这种方式可以工作,但它会造成内存泄露,因为你无法知道 coeffs
是否在被使用。也许我们可以通过设置 flag 来标识 coeffs
是否在实时线程中被使用:
struct BiquadCoeffecients { float b0, b1, b2, a1, a2;}
BiquadCoeffecients* coeffs;
std::atomic<bool> isInAudioThread{ false };
void audioThread(const float* src, float* dst, size_t n)
{
isInAudioThread = true;
auto* coeffsCopy = coeffs.copy();
processBiquad(src, dst, n, coeffsCopy);
isInAudioThread = false;
}
void updateFrequecyParameter(float newValue)
{
auto* ptr = new BiquadCoeffecients(calculateLowPassCoeffecients(newValue));
while(isInAudioThread.load())
;
std::swap(ptr, coeffs);
delete ptr;
}
上面代码中,我们加入 isInAudioThread
flag 变量,在进行音频线程处理时将其设置为 true,退出时为 flase;在非实时线程中,如果 isInAudioThread
为 true,则循环等待实时线程退出,然后通过 swap
对数据指针进行更新,最后释放旧数据。
但这仍然存在问题,在检查资源是否被使用(即 while 循环)和更新资源之间,实时线程重新占用了资源,即isInAudioThread
发生变化。因此检查与更新不具有原子性,导致了所谓的 ABA 问题:
void updateFrequecyParameter(float newValue)
{
auto* ptr = new BiquadCoeffecients(calculateLowPassCoeffecients(newValue));
while(isInAudioThread.load())
;
// isInAudioThread could be changed here
std::swap(ptr, coeffs);
delete ptr;
}
最终的解决方案是结合上面两种方法,具体看下面代码:
struct BiquadCoeffecients { float b0, b1, b2, a1, a2;}
std::unique_ptr<BiquadCoeffecients> storage { std::make_unique<BiquadCoeffecients>() };
std::atomic<BiquadCoeffecients*> biquadCoeffs;
void processAudio(float* buffer)
{
auto* coeffs = biquadCoeffs.exchange(nullptr);
processBiquad(*coeffs, buffer);
biquadCoeffs = coeffs;
}
void changeBiquadParameters(BiquadCoeffecients newCoeffs)
{
auto newBiquad = std::make_unique<BiquadCoeffecients>(newCoeffs);
for(auto* expected = storage.get(); !biquadCoeffs.compare_exchange_strong(expected, newBiquad.get());
expected = storage.get())
;
storage = std::move(newBiquad);
}
我们使用 std::unique_ptr<BiquadCoeffecients>
存放参数数据资源,用 std::atomic<BiquadCoeffecients*>
指向当前可用资源。
在实时线程中,首先使用 exchange
返回当前可用资源指针的同时,将其置空,而置空则表示实时线程正在使用该资源。在实时线程完成处理后,biquadCoeffs = coeffs;
重新赋值资源指针。
在非实时线程中对参数进行更新,for
循环中关键代码 biquadCoeffs.compare_exchange_strong(expected, newBiquad.get())
表示如果实时线程不再使用资源,那么就将资源指针更新;否则就自旋等待实时线程。最后通过 move
来更新资源。
这种方法无法处理实时线程更新数据的情况,下面代码为例,如果 b0 参数在实时线程进行了更新,那么非实时线程将无法得知这一更新,导致实时线程所有的参数更新就被丢失。
void processAudio(float* buffer)
{
auto* coeffs = biquadCoeffs.exchange(nullptr);
processBiquad(*coeffs, buffer);
coeffs->b0 *= 2; // update data in real-time thread
biquadCoeffs = coeffs;
}
void changeBiquadParameters(BiquadCoeffecients newCoeffs)
{
auto newBiquad = std::make_unique<BiquadCoeffecients>(newCoeffs);
for(auto* expected = storage.get(); !biquadCoeffs.compare_exchange_strong(expected, newBiquad.get());
expected = storage.get())
;
storage = std::move(newBiquad);
}
非实时线程修改数据的总结:
使用场景:
- 数据比较大,
std::atomic<float>::is_always_lock_free == false
- 非实时线程可以修改数据
- 实时线程总能获取到资源
代价:
- 实时线程不能修改数据
- 非实时线程需等待实时线程释放资源
- 在非实时线程上有复制的开销
示例
- 非实时线程共享大型数据供实时线程使用
- 音频采样数组、波形图数据、滤波器参数等
Question 6:非实时线程会修改线程间共享的数据吗?
如果实时线程需要更新数据,并且非实时线程不会修改数据,那么使用双缓冲策略。其中一个 buffer 用于实时线程,另一个用于非实时线程。它们满足:
- 两个 buffer 的数据都被预先正确的初始化
- 实时线程能够随时对数据进行更新
- 当非实时线程想要读取数据时,交换两个 buffer 的槽(slot)
- 实时线程能够继续更新数据,而非实时线程也能读取数据。
看下面这个例子:
using FrequencySpectrum = std::array<float, 512>;
std::array<FrequencySpectrum,2> mostRecentSpectrum;
// idx denotes current slot of realtime thread, idx xor 1 denotes slot of non-realtime-thread
std::atomic<int> idx = {0};
void processAudio(const float* buffer, size_t n)
{
auto freqSpec = calcualteSpectrum(buffer, n);
mostRecentSpectrum[idx.load()] = freqSpec;
}
void updateSpectrumUIbuttonClicked()
{
auto i = idx.fetch_xor(1);
displaySpectrum(mostRecentSpectrum[i]);
}
我们用 idx
来表示对两个 buffer 的引用。实时线程更新数据时,只需要简单的引用 idx.load()
即可;而非实时线程读取数据时,需要做 fetch_xor
操作,进行交换 slots。
很简单的策略,但非常遗憾,它是有缺陷的。。这里举例说明这种情况:
- 开始时,两个 buffer 数据为:A - B
- 接着,实时线程更新数据:C - B
- 非实时线程获取数据第一次,进行 slots 交换:B - C
- 非实时线程获取数据第二次,进行 slots 交换:C - B
可以看到,如果 updateSpectrumUIbuttonClicked
连续被触发两次,它将使用错误的旧数据。因此需要引入额外的 flag 来标识实时是否更新了数据,例如:
using FrequencySpectrum = std::array<float, 512>;
std::array<FrequencySpectrum,2> mostRecentSpectrum;
std::atomic<int> idx = {0};
std::atomic<bool> hasNewData = false;
void processAudio(const float* buffer, size_t n)
{
auto freqSpec = calcualteSpectrum(buffer, n);
mostRecentSpectrum[idx.load()] = freqSpec;
hasNewData = true;
}
void updateSpectrumUIbuttonClicked()
{
if(hasNewData){
auto i = idx.fetch_xor(1); // swap slot index
displaySpectrum(mostRecentSpectrum[i]);
// hasNewData could be changed here
hasNewData = false;
}else{
auto i = idx.load() ^ 1;
displaySpectrum(mostRecentSpectrum[i]);
}
}
但引入两个 atomic 来解决一个问题,通常会导致 ABA 问题。在上面代码中,在 hasNewData = false
之前,实时线程可能对 hasNewData = true
的操作,而这个操作直接就被覆盖了,这就导致数据的丢失。
为了不引入 ABA 问题,我们采用一种曲线救国的办法:使用位运算。具体代码如下:
using FrequencySpectrum = std::array<float, 512>;
enum {BIT_IDX = (1 << 0), BIT_NEWDATA = (1 << 1)};
std::array<FrequencySpectrum,2> mostRecentSpectrum;
std::atomic<int> idx = {0};
bool hasNewData(int i)
{
return (i & BIT_NEWDATA) != 0;
}
int swapIndicesAndResetNewDataBit(int i)
{
i = (i & BIT_IDX) ^ 1;
return i;
}
void processAudio(const float* buffer, size_t n)
{
auto freqSpec = calcualteSpectrum(buffer, n);
auto i = idx.load() & BIT_IDX;
mostRecentSpectrum[i] = freqSpec;
idx.store((i & BIT_INX) | BIT_NEWDATA);
}
void updateSpectrumUIbuttonClicked()
{
auto current = idx.load();
if( hasNewData(current) )
{
current = swapIndicesAndResetNewDataBit(current);
idx.store(current);
}
displaySpectrum(mostRecentSpectrum[(current & BIT_IDX) ^ 1]);
}
上述代码乍一看挺复杂,耐下心来看其实不复杂。它利用一个 std::atomic<int>
的位信息来表示多个状态,避免了 ABA 问题。从二进制的角度来看,状态有四种:
位信息 | 状态 |
---|---|
0x00 | 无新数据;实时线程的 slot_index = 0 |
0x01 | 无新数据;实时线程的 slot_index = 1 |
0x10 | 有新数据;实时线程的 slot_index = 0 |
0x11 | 有新数据;实时线程的 slot_index = 1 |
可以看到,高位用于表示有无新数据更新,低位用于表示实时线程使用的 slot index。当实时线程更新数据时,通过 | BIT_NEWDATA
来设置高位 bit 的值;当非实时线程读取数据时,它将交换 slot index 同时重置高位 bit。
这种方法避免了 ABA 问题,但很遗憾它是错误的,虽然已经接近正确答案了。你能看出哪里有问题吗?答案是 mostRecentSpectrum 发生了数据竞争,我们以下面线程执行顺序为例
Realtime Thread | Non-realtime Thread |
---|---|
void processAudio(...); idx = 0x10 | |
void processAudio(...) { i = idx & BIT_IDX = 0; } | |
current = idx = 0x01 current = swapIndicesAndResetNewDataBit() = 0x01 (current & BIT_IDX) ^ 1 = 0 |
如果线程按照上述顺序执行,那么此时此刻,实时线程将往 0 号 buffer 写数据,同时非实时线程将从 0 号 buffer 读数据,这就引入了对于 0 号 buffer 的数据竞争问题。
OK 为了避免这种问题,需要再额外引入一个 bit 位,BUSY_BIT。当 BUSY_BIT 被设置位 1 时,表示实时线程正在在使用某个 slot index,非实时线程则需要等待 BUSY_BIT 为 0 才去重置 idx。具体看下面的代码:
using FrequencySpectrum = std::array<float, 512>;
enum {BIT_IDX = (1 << 0), BIT_NEWDATA = (1 << 1), BUSY_BIT = (1 << 2)};
std::array<FrequencySpectrum,2> mostRecentSpectrum;
std::atomic<int> idx = {0};
bool hasNewData(int i)
{
return (i & BIT_NEWDATA) != 0;
}
void processAudio(const float* buffer, size_t n)
{
auto freqSpec = calcualteSpectrum(buffer, n);
auto i = idx.fetch_or(BIT_BUSY) & BIT_IDX;
mostRecentSpectrum[i] = freqSpec;
idx.store((i & BIT_INX) | BIT_NEWDATA);
}
void updateSpectrumUIbuttonClicked()
{
auto current = idx.load();
if( hasNewData(current) )
{
int newValue;
do{
current &= ~BIT_BUSY;
newValue = (current ^ BIT_IDX) & BIT_IDX;
}while(!idx.compare_exchange_weak(current, newVaule))
current = newValue;
}
displaySpectrum(mostRecentSpectrum[(current & BIT_IDX) ^ 1]);
}
好吧,这实在是有些复杂,好在 Fabian Renn-Giles & Dave Rowland 开源了 farbot 简化了这些复杂的操作。使用 farbot 后代码如下:
using FrequencySpectrum = std::array<float, 512>;
RealtimeObject<FrequencySpectrum, RealtimeObjectOptions::realtimeMutatable> mostRecentSpectrum;
/* called on realtime thread */
void processAudio (const float* buffer, size_t n) {
RealtimeObject<FrequencySpectrum, RealtimeObjectOptions::realtimeMutatable>::ScopedAccess<ThreadType::realtime> freqSpec(mostRecentSpectrum);
*freqSpec = calculateSpectrum (buffer, n);
}
/* called on non-realtime thread */
void updateSpectrumUIButtonClicked() {
RealtimeObject<FrequencySpectrum, RealtimeObjectOptions::realtimeMutatable>::ScopedAccess<ThreadType::nonRealtime> recentSpectrum(mostRecentSpectrum);
displaySpectrum(*recentSpectrum);
}
实时线程修改数据的总结:
使用场景:
- 数据比较大,
std::atomic<float>::is_always_lock_free == false
- 实时线程可以修改数据
- 实时线程总能获取到资源
代价:
- 非实时线程不能修改数据
- 非实时线程需等待实时线程释放资源
- 在 实时/非实时线程 上有复制的开销
示例
- 实时线程共享大型数据供非实时线程使用
- GUI 数据可视化,频谱图,示波器等
Question 7: 实时线程和非实时线程都会修改数据,这该怎么办?
最后一种情况,也是最复杂的情况,当实时/非实时线程都会修改数据时,
接下来要介绍 callAsync 机制。如果你想在实时线程做一些非实时线程安全的事情,例如打印 log、读取文件等等,我们可以采取这样一种策略:将这些任务延缓执行,将其放置到某个 FIFO 中,之后再由非实时线程去执行它们。
在 farbot 中提供了 AsyncCaller
来实现 callAsync。具体代码如下:
class AsyncCaller{
public:
void callAsycn(std::function<void()>&& lambda)
{
auto success = queue.push(std::move(lambda));
assert(success);
}
void process()
{
std::function<void()> lambda;
while(queue.pop(lambda))
lambda();
}
private:
fifo<std::function<void()>> queue;
}
AsyncCaller messageThreadExecutor;
void timerCallback(){
messageThreadExecutor.process();
}
messageThreadExecutor.callAsync([](){ cout << "hello world" << endl; });
上述代码中:
- 实时线程中调用
callAsycn
将 lambda 函数 push 到 wait-free FIFO 中。这里需要保证 lambda 是 real-time movable。关于 real-time movable,farbot 提供了方便的工具来帮助我们判断。 - 在 JUCE 中也有类似
AsyncCaller
的工具,叫AsyncUpdater
。在其实现中,AsyncUpdater
将任务 push 到队列后,然后去唤醒另一个线程来执行任务。而唤醒线程是系统调用,它不是实现线程安全的操作。 - 非实时线程中,调用
process
从 FIFO 中 pop 出 lambda 函数,并执行它。具体实现中,可以通过设置定时器(timer)来间歇性执行process
函数。
介绍完 AsyncCaller
后,让我们进入最后也是最困难的一中情况:实时线程和非实时线程都会修改数据。作者之言,他认为这种情况下无法做到 wait-free,但可以曲线救国,选定一个线程负责执行修改数据的操作,其他线程只要把修改数据命令发送到该线程即可。
举个例子,例如在音频线程中去 mix 音频,用户可以在 UI 线程增加或者删除音源,音频线程也可以增加或者删除音源。我们选择音频负责执行数据修改,因为
当实时线程中发生修改事件时,能够立马得到处理。
伪代码如下:
struct SourceList{
std::array<const float*, MAX_SOURCES> buffers = {};
int numSources = 0;
};
class Mixer
{
public:
void mixAllSources(float* output, char* realtimeEventMessage, int n)
{
processRealtimeEvents(realtimeEventMessage); // may add and remove sources
realtimeThreadCaller.process(); // process all the lambdas
RealtimeObject<SourceList, RealtimeObjectOptions::realtimeMutatable>::ScopedAccess<ThreadType::realtime> sourceList(sharedSourceList);
for(int i = 0; i < sourceList->numSources; ++i)
mixSource(output, sourceList->buffers[i]);
}
void addSource(const float* src)
{
if(!isRealtimeThread())
{
realtimeThreadCaller.callAsync([src]( addSource(src); ));
return;
}
RealtimeObject<SourceList, RealtimeObjectOptions::realtimeMutatable>::ScopedAccess<ThreadType::realtime> sourceList(sharedSourceList);
assert(sourceList->numSources < MAX_SOURCES);
sourceList->buffers[sourceList->numSources++] = src;
}
private:
RealtimeObject<SourceList, RealtimeObjectOptions::realtimeMutatable> sharedSourceList;
AsyncCaller realtimeThreadCaller;
}
Mixer mixer;
// audio thread
while(audioThreadIsRuning()){
if(comesNewSource())
{
mixer.mixAllSources(output, "Add Source", num_samples);
}else{
mixer.mixAllSources(output, nullptr, num_samples);
}
}
// UI thread
mixer.addSource(...)
上述代码中,
mixAllSources
在进行 mix 音频数据之前,调用realtimeThreadCaller.process()
去执行修改数据的任务。addSource
如果是非实时线程调用,那么通过callAsync
函数将任务 push 到 FIFO 中;如果是实时线程调用,那么立刻马上修改sourceList
数据。
实时/非实时线程修改数据的总结:
使用场景:
- 数据比较大,
std::atomic<float>::is_always_lock_free == false
- 实时线程与非实时线程共享数据
- 实时/非实时线程都能修改数据
代价:
- 其中一个线程需要持有数据
- 同时具有 FIFO 和实时/非实时线程修改数据的代价
- 实现复杂
示例
- 管理动态音频流,其中数据包丢失是不可接受的
总结
最后用一张图总结
以上是关于实时音频编程:实践与技巧的主要内容,如果未能解决你的问题,请参考以下文章