以源文件的形式交作业是什么意思?
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了以源文件的形式交作业是什么意思?相关的知识,希望对你有一定的参考价值。
Spark作业提交方式一般有两种,一种是使用spark-submit脚本进行提交作业,另一种方式是使用rest api方式,但是有时候由于集群的安全原因可能无法使用rest api方式进行提交作业,但是由于业务需要的话我们可以直接调用SparkSubmit.main方法进行api方式提交作业。(当然也可以使用ProcessBuilder执行shell脚本提交)。除了通常两种情况,Spark其实还提供了API提交作业的方法。1 调用SparkSubmit提交作业:
为了使用SparkSubmit#main方法方式提交作业,因此重点就是需要构造参数,我们可以根据spark-submit脚本源码进行构造参数,翻阅spark-submit源码不难发现最终调用的是spark-class脚本提交作业,spark-class脚本中最核心的代码如下:
exec "$CMD[@]"
因此只要我们弄清楚 "$CMD[@]"参数是什么就可以解决问题,最好的方法就是在spark-class脚本中添加echo 打印出来看看。打印结果如下:
/usr/local/jdk1.8.0_161/bin/java -cp /home/daxin/bigdata/spark/conf/:/home/daxin/bigdata/spark/jars/*:/home/daxin/bigdata/hadoop/etc/hadoop/:/home/daxin/bigdata/hadoop/etc/hadoop/ -Xmx1g org.apache.spark.deploy.SparkSubmit --master yarn --class com.daxin.remote.debug.App wc.jar
由于我们提交的作业是使用yarn调度,因此需要传入hadoop相关配置。如果使用yarn调度却没有传入hadoop配置文件的话此时会提示操作不到ResourceMangaer,即信息如下:
0.0.0.0/0.0.0.0:8032. Already tried 7 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1 SECONDS)
正常spark脚本提交作业时候hadoop等相关配置文件是通过classpath传入的,因此我们在classpath传入即可。
2:使用SPARK API提交作业。
参考文档:https://spark.apache.org/docs/latest/rdd-programming-guide.html#launching-spark-jobs-from-java--scala,内容比较简单就不描述了。
此处需要注意:
如果Spark提交作业和Spring boot的jar整合的话,使用-cp是启动不起来的,由于Spring打包插件比较特殊,jar内部的目录结构与一般的jar结构不一样,所以使用-cp 指定mainclass会提示无法找到类,对于该情况可以将-cp与-jar联合使用。
例如:
java -cp 普通.jar -jar springboot.jar
最后还需要注意的就是:Linux权限问题,有时候可能因为classpath下面的配置文件没有权限读,因此也是会失败的,一定要确保有权限。 参考技术A
就是说,不要改变文件本来的格式。
参考技术B 就是说,不要改变文件本来的格式。交作业:手写数字识别-Minst数据集-SoftMax回归
自己跑的时候正确率0.89
库
- numpy
- PIL(如果需要对实际图片进行预测)
结果
- minst的四个文件解压之后和这四个py文件放在同级文件夹
- 运行结束后的权重W和偏移b也在同级文件夹下,csv文件只用来看,用的是bin文件
softmax.py
和训练、测试有关的所有函数
#!/usr/bin/python
import numpy as np
np.random.seed(0)
# 定义softmax函数
def SoftMax(z):
if np.ndim(z) == 2:
axisn = 1
else:
axisn = 0
s = np.exp(z) / np.sum(np.exp(z), axis=axisn, keepdims=True)
return s
# b = np.array([1,2,4, 5,5,6]).reshape(2,3)
# print(SoftMax(b))
# 编码标签
def OneCode(y):
r = len(y)
c = len(np.unique(y))
one_hot = np.zeros((r,c))
one_hot[np.arange(len(y)), y.astype(int).T] = 1
return one_hot
# 定义 y_ 的计算函数
def CalcY_(x, w, b):
# w * X.T + b 后面 +b 是一个广播运算,
y_ = np.dot(w, x.T) + b
return y_.T
# 定义损失函数 - 交叉熵
def cross_entropy(y, y_):
loss = -(1/len(y))*np.sum(y * np.log(y_))
return loss
# 定义训练函数
def train(tr_x, tr_y, N):
\'\'\'
\'\'\'
# 模型
# y = w1 * x1 + w2 * x2 + b
W = np.random.rand(10,784)
b = np.random.rand(10,1)
losss = []
losi = 0
y = OneCode(tr_y) # 把 1 2 3 4 转换成向量 0001 0010 0100 1000
for i in range(N):
# 计算loss
x = tr_x
y_ = SoftMax(CalcY_(x, W, b))
loss = cross_entropy(y, y_)
losss.append(loss)
# 计算梯度
grad_w = (1/len(x)) * np.dot((y_ - y).T, x)
grad_b = (1/len(x)) * np.sum((y_ - y))
# 更新参数
# 学习率 × 梯度
W = W - 0.5 * grad_w
b = b - 0.5 * grad_b
delta = abs(losi - loss)
print(i , loss ,delta)
# 损失值低于0.01 或者 其变化值低于0.0001
if(loss < 0.01 or delta < 0.0001):
break
losi = loss
return W,b
# 定义测试函数
def check(te_x, te_y, W, b):
# te_x,te_y = Iread(\'te\')
# te_x = te_x / 255
# te_y = te_y
# print(W)
# print(b)
y_ = SoftMax(CalcY_(te_x, W, b))
l = np.argmax(y_, axis=1).reshape(10000,1)
right = np.sum(l == te_y.astype())/10000
print(\'right rate:\', right)
return right
Idata.py
文件读取函数
#!/usr/bin/python
import numpy as np
filename_train_data =\'./train-images-idx3-ubyte\'
filename_train_label=\'./train-labels-idx1-ubyte\'
filename_test_data =\'./t10k-images-idx3-ubyte\'
filename_test_label =\'./t10k-labels-idx1-ubyte\'
def Iread_train_data():
fp = open(filename_train_data, \'rb\')
fl = open(filename_train_label, \'rb\')
fp.read(4*4)
fl.read(2*4)
nstrs=np.zeros((60000, 28*28))
l =np.zeros((60000, 1))
for i in range( 60000):
fstr = fp.read(28*28)
lstr = fl.read(1)
l[i] = int.from_bytes(lstr,byteorder=\'big\',signed=False)
nstrs[i,:] = np.frombuffer(fstr, dtype=np.uint8)
return nstrs,l
def Iread_test_data():
fp = open(filename_test_data, \'rb\')
fl = open(filename_test_label, \'rb\')
fp.read(4*4)
fl.read(2*4)
nstrs=np.zeros((10000, 28*28))
l =np.zeros((10000, 1))
for i in range( 10000):
fstr = fp.read(28*28)
lstr = fl.read(1)
l[i] = int.from_bytes(lstr,byteorder=\'big\',signed=False)
nstrs[i,:] = np.frombuffer(fstr, dtype=np.uint8)
return nstrs,l
def Iread(option):
if(option == \'tr\'):
d,l = Iread_train_data()
return d,l
else if(option == \'te\'):
d,l = Iread_test_data()
return d,l
else:
print(\'op err\')
minst.py
完成训练和测试的脚本
#!/usr/bin/python
import numpy as np
from Idata import Iread
from softmax import train,check
# 读数据
tr_x,tr_y = Iread(\'tr\')
te_x,te_y = Iread(\'te\')
# 训练集和测试集正规到 0-1 区间
tr_x = tr_x / 255
te_x = te_x / 255
# print(tr_x.shape, tr_y.shape)
# print(te_x.shape, te_y.shape)
# 训练
W,b = train(tr_x, tr_y, 1000)
# 然后保存参数
W.tofile(\'W.bin\')
b.tofile(\'b.bin\')
np.savetxt(\'w.csv\', W, fmt=\'%f\', delimiter=\',\')
np.savetxt(\'b.csv\', b, fmt=\'%f\', delimiter=\',\')
# 读取参数
W = np.fromfile(\'W.bin\').reshape(10, 784)
b = np.fromfile(\'b.bin\').reshape(10, 1)
# 测试
r = check(te_x, te_y, W, b)
predict.py
用来对一个实际的手写数字图像识别的脚本
#!/usr/bin/python
import numpy as np
from softmax import SoftMax,CalcY_,cross_entropy,OneCode
from PIL import Image
# 图像文件必须是 28 × 28 的 0~255 灰度图像
fname = \'4.bmp\'
img = np.array(Image.open(fname))
te_x = img.reshape(1, 28*28)
te_x = te_x / 255
print(te_x)
W = np.fromfile(\'W.bin\').reshape(10, 784)
b = np.fromfile(\'b.bin\').reshape(10, 1)
y_ = SoftMax(CalcY_(te_x, W, b))
y_ = np.argmax(y_, axis=1)
print(\'pred:\', y_)
以上是关于以源文件的形式交作业是什么意思?的主要内容,如果未能解决你的问题,请参考以下文章