markdown TVM如何优化CPU GEMM(矩阵乘法)
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了markdown TVM如何优化CPU GEMM(矩阵乘法)相关的知识,希望对你有一定的参考价值。
[TOC]
# TVM如何优化CPU GEMM(矩阵乘法)
TVM提供抽象接口,允许用户分别描述算法和算法的实施组织(所谓的调度Schedule)。通常,写高性能调度的算法时,会破坏算法的可读性和模块性。此外,尝试各种看似有用的调度是非常耗费人力的。在TVM帮助下,我们能高效地尝试这些调度,来提高计算性能。
在本教程中,我们将演示如何使用TVM优化方阵乘法,并通过简单地添加18行额外代码,实现比基线版本快200倍的优化版本。
**在CPU上执行密集计算有两个优化重点:**
1. 提高内存访问的cache命中率。复杂的数值计算和hot-spot存储器访问都可以从高cache命中率中加速。这要求我们将原始内存访问模式转换为符合cache策略的模式。
2. SIMD(单指令多数据),也成为向量化处理单元。每次都会处理一小批数据,而不是单个网格。这要求我们以*统一模式*在循环体中转变`数据访问模式`,以便LLVM后端可以将其降低到SIMD。
实际上,本教程中使用的所有方法都是此[repo](https://github.com/flame/how-to-optimize-gemm)中提到的技巧的子集。其中一些已经被TVM抽象并自动应用,但由于TVM限制,其中一些不能简单地应用。
下面提到的所有实验结果都是在配备Intel i7-4770HQ CPU的2015款15英寸MacBook上执行的。对于所有x86 CPU,`高速缓存行`大小应为**64字节**。
- `注:代码里面的测试时间是个人在E5-2620 v4测得`
## 准备和基线
在本教程中,我们将演示如何使用TVM来优化矩阵乘法。在实际演示之前,我们首先定义这些变量。然后我们编写一个基线实现,这是在TVM中编写矩阵乘法的最简单方法。
```python
import tvm
import numpy
import timeit
#矩阵大小(M,K)x(K,N),可以自由尝试不同的形状,有时TVM优化优于MKL的numpy。
M = 1024
K = 1024
N = 1024
#TVM中默认张量类型
dtype = "float32"
#使用Intel AVX2(高级矢量扩展)ISA进行SIMD
#获得最佳新能要修改下一行为'llvm -mcpu=core-avx2',或者指定你使用的其他CPU类型
#实测指定CPU以后,Opt6版本可以达到略高于MKL numpy的性能。
target = 'llvm'
ctx = tvm.context(target, 0)
# Random generated tensor for testing
a = tvm.nd.array(numpy.random.rand(M, K).astype(dtype), ctx)
b = tvm.nd.array(numpy.random.rand(K, N).astype(dtype), ctx)
#numpy测试Numpy running time: 0.004753ms
np_repeat = 100
np_runing_time = timeit.timeit(setup='import numpy\n'
'M = ' + str(M) + '\n'
'K = ' + str(K) + '\n'
'N = ' + str(N) + '\n'
'dtype = "float32"\n'
'a = numpy.random.rand(M, K).astype(dtype)\n'
'b = numpy.random.rand(K, N).astype(dtype)\n',
stmt='answer = numpy.dot(a, b)',
number=np_repeat)
print("Numpy running time: %f" % (np_runing_time / np_repeat))
#基准测试:Baseline: 2.174880ms
#算法
k = tvm.reduce_axis((0, K), 'k')
A = tvm.placeholder((M, K), name='A')
B = tvm.placeholder((K, N), name='B')
C = tvm.compute(
(M, N),
lambda x, y: tvm.sum(A[x, k] * B[k, y], axis=k),
name='C')
#默认调度
c = tvm.nd.array(numpy.zeros((M, N), dtype=dtype), ctx)
func(a, b, c)
tvm.testing.assert_allclose(c.asnumpy(), answer, rtol=1e-5)
evaluator = func.time_evaluator(func.entry_name, ctx, number=1)
print('Baseline: %f' % evaluator(a, b, c).mean)
```
输出:
```
Numpy running time: 0.036594
Baseline: 2.533367
```
在TVM中,我们可以随时检查较低级别的IR以调试或优化我们的调度。以下是使用我们的基线计划生成的IR。
```python
print(tvm.lower(s, [A, B, C], simple_mode=True))
```
输出:
```c
produce C {
for (x, 0, 1024) {
for (y, 0, 1024) {
C[((x*1024) + y)] = 0.000000f
for (k, 0, 1024) {
C[((x*1024) + y)] = (C[((x*1024) + y)] + (A[((x*1024) + k)]*B[((k*1024) + y)]))
}
}
}
}
```
## Opt1:分块
分块(数据块将逐块计算)是一个增强cache命中率的重要技巧。块内的存储器访问是一个具有高内存局部性的小邻域。在本教程中,我们选择32作为分块因子。因此该块将填充32 * 32 * sizeof(float),它占总大小为32KB的缓存中的4KB(L1数据缓存)。
```python
#Opt1调度方法:
bn = 32
#通过循环平铺tile来分块
xo, yo, xi, yi = s[C].tile(C.op.axis[0], C.op.axis[1], bn, bn)#32x32的块,返回内外循环索引
k, = s[C].op.reduce_axis
ko, ki = s[C].split(k, factor=4)
#ko,ki移到内外块循环之间
s[C].reorder(xo, yo, ko, ki, xi, yi)
func = tvm.build(s, [A, B, C], target=target, name='mmult')
assert func
c = tvm.nd.array(numpy.zeros((M, N), dtype = dtype), ctx)
func(a, b, c)
tvm.testing.assert_allclose(c.asnumpy(), answer, rtol=1e-5)
#通过简单地平铺循环32x32,并在外分块循环出加入ko,ki循环,我们可以看到与基线相比,有很大加速。
#Opt1: 0.382091ms
evaluator = func.time_evaluator(func.entry_name, ctx, number=10)
print('Opt1: %f' % evaluator(a, b, c).mean)
```
输出:
```
Opt1: 0.296531
```
这是在分块后生成的IR。
```python
print(tvm.lower(s, [A, B, C], simple_mode=True))
```
输出:
```c
produce C {
for (x.outer, 0, 32) {
for (y.outer, 0, 32) {
for (x.inner.init, 0, 32) {
for (y.inner.init, 0, 32) {
C[((((((x.outer*32) + x.inner.init)*32) + y.outer)*32) + y.inner.init)] = 0.000000f
}
}
for (k.outer, 0, 256) {
for (k.inner, 0, 4) {
for (x.inner, 0, 32) {
for (y.inner, 0, 32) {
C[((((((x.outer*32) + x.inner)*32) + y.outer)*32) + y.inner)] = (C[((((((x.outer*32) + x.inner)*32) + y.outer)*32) + y.inner)] + (A[((((((x.outer*32) + x.inner)*256) + k.outer)*4) + k.inner)]*B[((((((k.outer*4) + k.inner)*32) + y.outer)*32) + y.inner)]))
}
}
}
}
}
}
}
```
## Opt2:向量化
向量化是另一个重要的技巧。当内存访问模式一致时,编译器可以检测到这种模式并将连续内存传递给向量处理器。在TVM中,我们可以使用`vectorize`接口来暗示编译器这种模式,这样我们就可以大大加速它。 在本教程中,我们选择对内部循环行数据进行矢量化,因为它是cache友好的。
```python
s = tvm.create_schedule(C.op)
xo, yo, xi, yi = s[C].tile(C.op.axis[0], C.op.axis[1], bn, bn)
k, = s[C].op.reduce_axis
ko, ki = s[C].split(k, factor=4)
s[C].reorder(xo, yo, ko, ki, xi, yi)
# 与Opt1唯一区别在:向量化
s[C].vectorize(yi)
func = tvm.build(s, [A, B, C], target=target, name='mmult')
assert func
c = tvm.nd.array(numpy.zeros((M, N), dtype = dtype), ctx)
func(a, b, c)
tvm.testing.assert_allclose(c.asnumpy(), answer, rtol=1e-5)
#Opt2: 0.356233ms
evaluator = func.time_evaluator(func.entry_name, ctx, number=10)
print('Opt2: %f' % evaluator(a, b, c).mean)
```
输出:
```
Opt2: 0.312343
```
这是向量化后生产的IR。
```python
print(tvm.lower(s, [A, B, C], simple_mode=True))
```
输出:
```c
produce C {
for (x.outer, 0, 32) {
for (y.outer, 0, 32) {
for (x.inner.init, 0, 32) {
C[ramp((((((x.outer*32) + x.inner.init)*32) + y.outer)*32), 1, 32)] = x32(0.000000f)
}
for (k.outer, 0, 256) {
for (k.inner, 0, 4) {
for (x.inner, 0, 32) {
C[ramp((((((x.outer*32) + x.inner)*32) + y.outer)*32), 1, 32)] = (C[ramp((((((x.outer*32) + x.inner)*32) + y.outer)*32), 1, 32)] + (x32(A[((((((x.outer*32) + x.inner)*256) + k.outer)*4) + k.inner)])*B[ramp((((((k.outer*4) + k.inner)*32) + y.outer)*32), 1, 32)]))
}
}
}
}
}
}
```
## Opt3:循环排布permute
如果我们看一下上面的IR,我们可以看到内部循环行数据被向量化,B被转换成PackedB。PackedB的遍历现在是顺序的。因此,我们将查看A的访问模式。在当前调度中,A`逐列访问`,这是cache不友好的。如果我们改变ki和内轴xi的嵌套循环次序,则A矩阵的访问模式更加cache友好。
```python
s = tvm.create_schedule(C.op)
xo, yo, xi, yi = s[C].tile(C.op.axis[0], C.op.axis[1], bn, bn)
k, = s[C].op.reduce_axis
ko, ki = s[C].split(k, factor=4)
# re-ordering,改变xi和ki的循环位置
s[C].reorder(xo, yo, ko, xi, ki, yi)
s[C].vectorize(yi)
func = tvm.build(s, [A, B, C], target=target, name='mmult')
assert func
c = tvm.nd.array(numpy.zeros((M, N), dtype = dtype), ctx)
func(a, b, c)
tvm.testing.assert_allclose(c.asnumpy(), answer, rtol=1e-5)
#Opt3: 0.098853
evaluator = func.time_evaluator(func.entry_name, ctx, number=10)
print('Opt3: %f' % evaluator(a, b, c).mean)
```
输出:
```
Opt3: 0.141706
```
这是循环重排后生成的IR。
```python
print(tvm.lower(s, [A, B, C], simple_mode=True))
```
输出:
```c
produce C {
for (x.outer, 0, 32) {
for (y.outer, 0, 32) {
for (x.inner.init, 0, 32) {
C[ramp((((((x.outer*32) + x.inner.init)*32) + y.outer)*32), 1, 32)] = x32(0.000000f)
}
for (k.outer, 0, 256) {
for (x.inner, 0, 32) {
for (k.inner, 0, 4) {
C[ramp((((((x.outer*32) + x.inner)*32) + y.outer)*32), 1, 32)] = (C[ramp((((((x.outer*32) + x.inner)*32) + y.outer)*32), 1, 32)] + (x32(A[((((((x.outer*32) + x.inner)*256) + k.outer)*4) + k.inner)])*B[ramp((((((k.outer*4) + k.inner)*32) + y.outer)*32), 1, 32)]))
}
}
}
}
}
}
```
## Opt4:数组打包
数组打包是另一个重要技巧。这个技巧是重新排序数组的存储维度,以便在展平flatten后将特定维度上的连续访问模式转换为顺序模式。
![](https://github.com/dmlc/web-data/raw/master/tvm/tutorial/array-packing.png)
正如上图所示,在分块计算之后,我们可以观察到B(在展平flatten后)的数组访问模式是规则的,但是`不连续`的。我们希望在经过一些转换后,我们可以获得连续的访问模式我们可以将[16] [16]数组重新排序为[16/4] [16] [4]数组,这样当从打包数组中获取相应的值时,B的访问模式将是连续的。
```python
#打包B
packedB = tvm.compute((N/bn,K,bn),lambda x, y, z: B[y, x * bn + z], name='packedB')
#矩阵乘法
C = tvm.compute((M, N),
lambda x, y: tvm.sum(A[x, k] * packedB[y / bn, k, y % bn], axis=k),
name = 'C')
#下面与上面Opt3调度
s = tvm.create_schedule(C.op)
xo, yo, xi, yi = s[C].tile(C.op.axis[0], C.op.axis[1], bn, bn)
k, = s[C].op.reduce_axis
ko, ki = s[C].split(k, factor=4)
s[C].reorder(xo, yo, ko, xi, ki, yi)
s[C].vectorize(yi)
x, y, z = s[packedB].op.axis
s[packedB].vectorize(z)
s[packedB].parallel(x)
func = tvm.build(s, [A, B, C], target=target, name='mmult')
assert func
c = tvm.nd.array(numpy.zeros((M, N), dtype = dtype), ctx)
func(a, b, c)
tvm.testing.assert_allclose(c.asnumpy(), answer, rtol=1e-5)
#Opt4: 0.118722 打包耗时,与Opt3速度相比,反而有所下降
evaluator = func.time_evaluator(func.entry_name, ctx, number=10)
print('Opt4: %f' % evaluator(a, b, c).mean)
```
输出:
```
Opt4: 0.260761
```
这是数组打包后生成的IR。
```python
print(tvm.lower(s, [A, B, C], simple_mode=True))
```
输出:
```c
// attr [packedB] storage_scope = "global"
allocate packedB[float32x32 * 32768]
produce packedB {
parallel (x, 0, 32) {
for (y, 0, 1024) {
packedB[ramp((((x*1024) + y)*32), 1, 32)] = B[ramp((((y*32) + x)*32), 1, 32)]
}
}
}
produce C {
for (x.outer, 0, 32) {
for (y.outer, 0, 32) {
for (x.inner.init, 0, 32) {
C[ramp((((((x.outer*32) + x.inner.init)*32) + y.outer)*32), 1, 32)] = x32(0.000000f)
}
for (k.outer, 0, 256) {
for (x.inner, 0, 32) {
for (k.inner, 0, 4) {
C[ramp((((((x.outer*32) + x.inner)*32) + y.outer)*32), 1, 32)] = (C[ramp((((((x.outer*32) + x.inner)*32) + y.outer)*32), 1, 32)] + (x32(A[((((((x.outer*32) + x.inner)*256) + k.outer)*4) + k.inner)])*packedB[ramp((((((y.outer*256) + k.outer)*4) + k.inner)*32), 1, 32)]))
}
}
}
}
}
}
```
## Opt5:为块写cache
分块后,程序会逐块将结果写入C,访问模式`不是顺序`的。因此,我们可以使用顺序cache数组来保存块结果,并在所有块结果都准备就绪后一起写入C。
```python
s = tvm.create_schedule(C.op)
#为变量C分配写cache
CC = s.cache_write(C, 'global')
#分块
xo, yo, xi, yi = s[C].tile(C.op.axis[0], C.op.axis[1], bn, bn)
#写cache放在yo循环处计算
s[CC].compute_at(s[C], yo)
#新的内轴(这里的操作都是对变量CC,等完全计算结束后,再C=CC)
xc, yc = s[CC].op.axis
k, = s[CC].op.reduce_axis
ko, ki = s[CC].split(k, factor=4)
s[CC].reorder(ko, xc, ki, yc)
s[CC].unroll(ki)
s[CC].vectorize(yc)
func = tvm.build(s, [A, B, C], target=target, name='mmult')
assert func
c = tvm.nd.array(numpy.zeros((M, N), dtype = dtype), ctx)
func(a, b, c)
tvm.testing.assert_allclose(c.asnumpy(), answer, rtol=1e-5)
#Opt5: 0.050380
evaluator = func.time_evaluator(func.entry_name, ctx, number=10)
print('Opt5: %f' % evaluator(a, b, c).mean)
```
输出:
```
Opt5: 0.242124
```
这是在块写cache后生成的IR。
```python
print(tvm.lower(s, [A, B, C], simple_mode=True))
```
输出:
```c
// attr [packedB] storage_scope = "global"
allocate packedB[float32x32 * 32768]
// attr [C.global] storage_scope = "global"
allocate C.global[float32 * 1024]
produce packedB {
parallel (x, 0, 32) {
for (y, 0, 1024) {
packedB[ramp((((x*1024) + y)*32), 1, 32)] = B[ramp((((y*32) + x)*32), 1, 32)]
}
}
}
produce C {
for (x.outer, 0, 32) {
for (y.outer, 0, 32) {
produce C.global {
for (x.c.init, 0, 32) {
C.global[ramp((x.c.init*32), 1, 32)] = x32(0.000000f)
}
for (k.outer, 0, 256) {
for (x.c, 0, 32) {
C.global[ramp((x.c*32), 1, 32)] = (C.global[ramp((x.c*32), 1, 32)] + (x32(A[(((((x.outer*32) + x.c)*256) + k.outer)*4)])*packedB[ramp((((y.outer*256) + k.outer)*128), 1, 32)]))
C.global[ramp((x.c*32), 1, 32)] = (C.global[ramp((x.c*32), 1, 32)] + (x32(A[((((((x.outer*32) + x.c)*256) + k.outer)*4) + 1)])*packedB[ramp(((((y.outer*256) + k.outer)*128) + 32), 1, 32)]))
C.global[ramp((x.c*32), 1, 32)] = (C.global[ramp((x.c*32), 1, 32)] + (x32(A[((((((x.outer*32) + x.c)*256) + k.outer)*4) + 2)])*packedB[ramp(((((y.outer*256) + k.outer)*128) + 64), 1, 32)]))
C.global[ramp((x.c*32), 1, 32)] = (C.global[ramp((x.c*32), 1, 32)] + (x32(A[((((((x.outer*32) + x.c)*256) + k.outer)*4) + 3)])*packedB[ramp(((((y.outer*256) + k.outer)*128) + 96), 1, 32)]))
}
}
}
for (x.inner, 0, 32) {
for (y.inner, 0, 32) {
C[((((((x.outer*32) + x.inner)*32) + y.outer)*32) + y.inner)] = C.global[((x.inner*32) + y.inner)]
}
}
}
}
}
```
## Opt6:并行
此外,我们还可以利用多核处理器进行线程级并行化。
```python
s = tvm.create_schedule(C.op)
CC = s.cache_write(C, 'global')
xo, yo, xi, yi = s[C].tile(C.op.axis[0], C.op.axis[1], bn, bn)
s[CC].compute_at(s[C], yo)
xc, yc = s[CC].op.axis
k, = s[CC].op.reduce_axis
ko, ki = s[CC].split(k, factor=4)
s[CC].reorder(ko, xc, ki, yc)
s[CC].unroll(ki)
s[CC].vectorize(yc)
# parallel对外循环xo进行多线程并行计算
s[C].parallel(xo)
x, y, z = s[packedB].op.axis
s[packedB].vectorize(z)
s[packedB].parallel(x)
func = tvm.build(s, [A, B, C], target=target, name = 'mmult')
assert func
c = tvm.nd.array(numpy.zeros((M, N), dtype = dtype), ctx)
func(a, b, c)
tvm.testing.assert_allclose(c.asnumpy(), answer, rtol=1e-5)
#Opt6: 0.004400(这是llvm -mcpu=core-avx2的优化结果,比mkl稍快)
evaluator = func.time_evaluator(func.entry_name, ctx, number=50)
opt6_time = evaluator(a, b, c).mean
print('Opt6: %f' % opt6_time)
```
输出:
```
Opt6: 0.106932
```
这是线程并行后生成的IR。
```c
// attr [packedB] storage_scope = "global"
allocate packedB[float32x32 * 32768]
produce packedB {
parallel (x, 0, 32) {
for (y, 0, 1024) {
packedB[ramp((((x*1024) + y)*32), 1, 32)] = B[ramp((((y*32) + x)*32), 1, 32)]
}
}
}
produce C {
parallel (x.outer, 0, 32) {
// attr [C.global] storage_scope = "global"
allocate C.global[float32 * 1024]
for (y.outer, 0, 32) {
produce C.global {
for (x.c.init, 0, 32) {
C.global[ramp((x.c.init*32), 1, 32)] = x32(0.000000f)
}
for (k.outer, 0, 256) {
for (x.c, 0, 32) {
C.global[ramp((x.c*32), 1, 32)] = (C.global[ramp((x.c*32), 1, 32)] + (x32(A[(((((x.outer*32) + x.c)*256) + k.outer)*4)])*packedB[ramp((((y.outer*256) + k.outer)*128), 1, 32)]))
C.global[ramp((x.c*32), 1, 32)] = (C.global[ramp((x.c*32), 1, 32)] + (x32(A[((((((x.outer*32) + x.c)*256) + k.outer)*4) + 1)])*packedB[ramp(((((y.outer*256) + k.outer)*128) + 32), 1, 32)]))
C.global[ramp((x.c*32), 1, 32)] = (C.global[ramp((x.c*32), 1, 32)] + (x32(A[((((((x.outer*32) + x.c)*256) + k.outer)*4) + 2)])*packedB[ramp(((((y.outer*256) + k.outer)*128) + 64), 1, 32)]))
C.global[ramp((x.c*32), 1, 32)] = (C.global[ramp((x.c*32), 1, 32)] + (x32(A[((((((x.outer*32) + x.c)*256) + k.outer)*4) + 3)])*packedB[ramp(((((y.outer*256) + k.outer)*128) + 96), 1, 32)]))
}
}
}
for (x.inner, 0, 32) {
for (y.inner, 0, 32) {
C[((((((x.outer*32) + x.inner)*32) + y.outer)*32) + y.inner)] = C.global[((x.inner*32) + y.inner)]
}
}
}
}
}
```
## 总结
在仅使用18行代码应用上述简单优化之后,我们生成的代码可以使用MKL实现60%的numpy性能。`个人测试使用llvm -mcpu=core-avx2,可以优于numpy性能`
以上是关于markdown TVM如何优化CPU GEMM(矩阵乘法)的主要内容,如果未能解决你的问题,请参考以下文章