python disan.py
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python disan.py相关的知识,希望对你有一定的参考价值。
"""
@ author: xx
@ Email: xx@xxx
@ Date: August 26, 2017
Directional Self-Attention Network
Requirements: Python 3.5.2, Tensorflow 1.2
Usage: from xx.disan import disan
"""
import tensorflow as tf
from functools import reduce
from operator import mul
VERY_BIG_NUMBER = 1e30
VERY_SMALL_NUMBER = 1e-30
VERY_POSITIVE_NUMBER = VERY_BIG_NUMBER
VERY_NEGATIVE_NUMBER = -VERY_BIG_NUMBER
# --------------- DiSAN Interface ----------------
def disan(rep_tensor, rep_mask, scope=None,
keep_prob=1., is_train=None, wd=0., activation='elu',
tensor_dict=None, name=''):
with tf.variable_scope(scope or 'DiSAN'):
with tf.variable_scope('ct_attn'):
fw_res = directional_attention_with_dense(
rep_tensor, rep_mask, 'forward', 'dir_attn_fw',
keep_prob, is_train, wd, activation,
tensor_dict=tensor_dict, name=name + '_fw_attn')
bw_res = directional_attention_with_dense(
rep_tensor, rep_mask, 'backward', 'dir_attn_bw',
keep_prob, is_train, wd, activation,
tensor_dict=tensor_dict, name=name + '_bw_attn')
seq_rep = tf.concat([fw_res, bw_res], -1)
with tf.variable_scope('sent_enc_attn'):
sent_rep = multi_dimensional_attention(
seq_rep, rep_mask, 'multi_dimensional_attention',
keep_prob, is_train, wd, activation,
tensor_dict=tensor_dict, name=name + '_attn')
return sent_rep
# --------------- supporting networks ----------------
def directional_attention_with_dense(rep_tensor, rep_mask, direction=None, scope=None,
keep_prob=1., is_train=None, wd=0., activation='elu',
tensor_dict=None, name=None):
def scaled_tanh(x, scale=5.):
return scale * tf.nn.tanh(1. / scale * x)
bs, sl, vec = tf.shape(rep_tensor)[0], tf.shape(rep_tensor)[1], tf.shape(rep_tensor)[2]
ivec = rep_tensor.get_shape()[2]
with tf.variable_scope(scope or 'directional_attention_%s' % direction or 'diag'):
# mask generation
sl_indices = tf.range(sl, dtype=tf.int32)
sl_col, sl_row = tf.meshgrid(sl_indices, sl_indices)
if direction is None:
direct_mask = tf.cast(tf.diag(- tf.ones([sl], tf.int32)) + 1, tf.bool)
else:
if direction == 'forward':
direct_mask = tf.greater(sl_row, sl_col)
else:
direct_mask = tf.greater(sl_col, sl_row)
direct_mask_tile = tf.tile(tf.expand_dims(direct_mask, 0), [bs, 1, 1]) # bs,sl,sl
rep_mask_tile = tf.tile(tf.expand_dims(rep_mask, 1), [1, sl, 1]) # bs,sl,sl
attn_mask = tf.logical_and(direct_mask_tile, rep_mask_tile) # bs,sl,sl
# non-linear
rep_map = bn_dense_layer(rep_tensor, ivec, True, 0., 'bn_dense_map', activation,
False, wd, keep_prob, is_train)
rep_map_tile = tf.tile(tf.expand_dims(rep_map, 1), [1, sl, 1, 1]) # bs,sl,sl,vec
rep_map_dp = dropout(rep_map, keep_prob, is_train)
# attention
with tf.variable_scope('attention'): # bs,sl,sl,vec
f_bias = tf.get_variable('f_bias', [ivec], tf.float32, tf.constant_initializer(0.))
dependent = linear(rep_map_dp, ivec, False, scope='linear_dependent') # bs,sl,vec
dependent_etd = tf.expand_dims(dependent, 1) # bs,1,sl,vec
head = linear(rep_map_dp, ivec, False, scope='linear_head') # bs,sl,vec
head_etd = tf.expand_dims(head, 2) # bs,sl,1,vec
logits = scaled_tanh(dependent_etd + head_etd + f_bias, 5.0) # bs,sl,sl,vec
logits_masked = exp_mask_for_high_rank(logits, attn_mask)
attn_score = tf.nn.softmax(logits_masked, 2) # bs,sl,sl,vec
attn_score = mask_for_high_rank(attn_score, attn_mask)
attn_result = tf.reduce_sum(attn_score * rep_map_tile, 2) # bs,sl,vec
with tf.variable_scope('output'):
o_bias = tf.get_variable('o_bias', [ivec], tf.float32, tf.constant_initializer(0.))
# input gate
fusion_gate = tf.nn.sigmoid(
linear(rep_map, ivec, True, 0., 'linear_fusion_i', False, wd, keep_prob, is_train) +
linear(attn_result, ivec, True, 0., 'linear_fusion_a', False, wd, keep_prob, is_train) +
o_bias)
output = fusion_gate * rep_map + (1 - fusion_gate) * attn_result
output = mask_for_high_rank(output, rep_mask)
# save attn
if tensor_dict is not None and name is not None:
tensor_dict[name + '_dependent'] = dependent
tensor_dict[name + '_head'] = head
tensor_dict[name] = attn_score
tensor_dict[name + '_gate'] = fusion_gate
return output
def multi_dimensional_attention(rep_tensor, rep_mask, scope=None,
keep_prob=1., is_train=None, wd=0., activation='elu',
tensor_dict=None, name=None):
bs, sl, vec = tf.shape(rep_tensor)[0], tf.shape(rep_tensor)[1], tf.shape(rep_tensor)[2]
ivec = rep_tensor.get_shape()[2]
with tf.variable_scope(scope or 'multi_dimensional_attention'):
map1 = bn_dense_layer(rep_tensor, ivec, True, 0., 'bn_dense_map1', activation,
False, wd, keep_prob, is_train)
map2 = bn_dense_layer(map1, ivec, True, 0., 'bn_dense_map2', 'linear',
False, wd, keep_prob, is_train)
map2_masked = exp_mask_for_high_rank(map2, rep_mask)
soft = tf.nn.softmax(map2_masked, 1) # bs,sl,vec
attn_output = tf.reduce_sum(soft * rep_tensor, 1) # bs, vec
# save attn
if tensor_dict is not None and name is not None:
tensor_dict[name] = soft
return attn_output
def bn_dense_layer(input_tensor, hn, bias, bias_start=0.0, scope=None,
activation='relu', enable_bn=True,
wd=0., keep_prob=1.0, is_train=None):
if is_train is None:
is_train = False
# activation
if activation == 'linear':
activation_func = tf.identity
elif activation == 'relu':
activation_func = tf.nn.relu
elif activation == 'elu':
activation_func = tf.nn.elu
elif activation == 'selu':
activation_func = selu
else:
raise AttributeError('no activation function named as %s' % activation)
with tf.variable_scope(scope or 'bn_dense_layer'):
linear_map = linear(input_tensor, hn, bias, bias_start, 'linear_map',
False, wd, keep_prob, is_train)
if enable_bn:
linear_map = tf.contrib.layers.batch_norm(
linear_map, center=True, scale=True, is_training=is_train, scope='bn')
return activation_func(linear_map)
def dropout(x, keep_prob, is_train, noise_shape=None, seed=None, name=None):
with tf.name_scope(name or "dropout"):
assert is_train is not None
if keep_prob < 1.0:
d = tf.nn.dropout(x, keep_prob, noise_shape=noise_shape, seed=seed)
out = tf.cond(is_train, lambda: d, lambda: x)
return out
return x
def linear(args, output_size, bias, bias_start=0.0, scope=None, squeeze=False, wd=0.0, input_keep_prob=1.0,
is_train=None):
if args is None or (isinstance(args, (tuple, list)) and not args):
raise ValueError("`args` must be specified")
if not isinstance(args, (tuple, list)):
args = [args]
flat_args = [flatten(arg, 1) for arg in args] # for dense layer [(-1, d)]
if input_keep_prob < 1.0:
assert is_train is not None
flat_args = [tf.cond(is_train, lambda: tf.nn.dropout(arg, input_keep_prob), lambda: arg)
# for dense layer [(-1, d)]
for arg in flat_args]
flat_out = _linear(flat_args, output_size, bias, bias_start=bias_start, scope=scope) # dense
out = reconstruct(flat_out, args[0], 1) # ()
if squeeze:
out = tf.squeeze(out, [len(args[0].get_shape().as_list()) - 1])
if wd:
add_reg_without_bias()
return out
def _linear(xs, output_size, bias, bias_start=0., scope=None):
with tf.variable_scope(scope or 'linear_layer'):
x = tf.concat(xs, -1)
input_size = x.get_shape()[-1]
W = tf.get_variable('W', shape=[input_size, output_size], dtype=tf.float32,
)
if bias:
bias = tf.get_variable('bias', shape=[output_size], dtype=tf.float32,
initializer=tf.constant_initializer(bias_start))
out = tf.matmul(x, W) + bias
else:
out = tf.matmul(x, W)
return out
def flatten(tensor, keep):
fixed_shape = tensor.get_shape().as_list()
start = len(fixed_shape) - keep
left = reduce(mul, [fixed_shape[i] or tf.shape(tensor)[i] for i in range(start)])
out_shape = [left] + [fixed_shape[i] or tf.shape(tensor)[i] for i in range(start, len(fixed_shape))]
flat = tf.reshape(tensor, out_shape)
return flat
def reconstruct(tensor, ref, keep, dim_reduced_keep=None):
dim_reduced_keep = dim_reduced_keep or keep
ref_shape = ref.get_shape().as_list() # original shape
tensor_shape = tensor.get_shape().as_list() # current shape
ref_stop = len(ref_shape) - keep # flatten dims list
tensor_start = len(tensor_shape) - dim_reduced_keep # start
pre_shape = [ref_shape[i] or tf.shape(ref)[i] for i in range(ref_stop)] #
keep_shape = [tensor_shape[i] or tf.shape(tensor)[i] for i in range(tensor_start, len(tensor_shape))] #
# pre_shape = [tf.shape(ref)[i] for i in range(len(ref.get_shape().as_list()[:-keep]))]
# keep_shape = tensor.get_shape().as_list()[-keep:]
target_shape = pre_shape + keep_shape
out = tf.reshape(tensor, target_shape)
return out
def mask_for_high_rank(val, val_mask, name=None):
val_mask = tf.expand_dims(val_mask, -1)
return tf.multiply(val, tf.cast(val_mask, tf.float32), name=name or 'mask_for_high_rank')
def exp_mask_for_high_rank(val, val_mask, name=None):
val_mask = tf.expand_dims(val_mask, -1)
return tf.add(val, (1 - tf.cast(val_mask, tf.float32)) * VERY_NEGATIVE_NUMBER,
name=name or 'exp_mask_for_high_rank')
def selu(x):
with tf.name_scope('elu') as scope:
alpha = 1.6732632423543772848170429916717
scale = 1.0507009873554804934193349852946
return scale * tf.where(x >= 0.0, x, alpha * tf.nn.elu(x))
def add_reg_without_bias(scope=None):
scope = scope or tf.get_variable_scope().name
variables = tf.get_collection(tf.GraphKeys.TRAINABLE_VARIABLES, scope=scope)
counter = 0
for var in variables:
if len(var.get_shape().as_list()) <= 1: continue
tf.add_to_collection('reg_vars', var)
counter += 1
return counter
以上是关于python disan.py的主要内容,如果未能解决你的问题,请参考以下文章