如何实现一个基于 jupyter 的 microservices
Posted 小蒋不素小蒋
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何实现一个基于 jupyter 的 microservices相关的知识,希望对你有一定的参考价值。
零、背景:
现有基于 Node.js 的项目,但需要整合 Data Science 同事的基于 python(jupyter) 的代码部分,以实现额外的数据分析功能。于是设想实现一个 microservices。下面介绍一些库的使用方法、自己写的 demo和遇到的坑,方便以后查阅。
一、jupyter_kernel_gateway
第一步,是想办法把 jupyter 文件当成一个 http server 启动,以便可以接受来自任何异构项目的调用。这里可以用到jupyter_kernel_gateway
的 notebook-http
功能。
官方文档:https://jupyter-kernel-gateway.readthedocs.io/en/latest/http-mode.html
1、安装
pip install jupyter_kernel_gateway
2、启动
jupyter kernelgateway --KernelGatewayApp.api=‘kernel_gateway.notebook_http‘ --KernelGatewayApp.seed_uri=‘/Users/xjnotxj/Program/PythonProject/main.ipynb‘
seed_uri
除了是本地路径,也可以是个urlhttp://localhost:8890/notebooks/main.ipynb
3、使用
import json
# imitate REQUEST args (调试时候用,平时请忽略)
# REQUEST = json.dumps({'body': {'age': ['181']}, 'args': {'sex': ['male'], 'location': ['shanghai']}, 'path': {'name': 'colin'}, 'headers': {'Content-Type': 'multipart/form-data; boundary=--------------------------149817035181009685206727', 'Cache-Control': 'no-cache', 'Postman-Token': '96c484cb-8709-4a42-9e12-3aaf18392c92', 'User-Agent': 'PostmanRuntime/7.6.0', 'Accept': '*/*', 'Host': 'localhost:8888', 'Accept-Encoding': 'gzip, deflate', 'Content-Length': '161', 'Connection': 'keep-alive'}})
用注释定义路由:# POST /post/:name
(可以多个 cell 一起用),请求体自动绑定在 req
对象上:
# POST /post/:name
req = json.loads(REQUEST)
# defined return vars
return_status = 200
return_code = 0
return_message = ''
return_data = {}
这里定义了一个检查 req 参数的 function,因为 jupyter_kernel_gateway 不支持 return 或者 exit 退出当前 request,还是会继续往后执行,导致多个输出干扰最终 response 结果。所以我这边代码逻辑写的不简洁,如果有知道改进的朋友可以告诉我。
# POST /post/:name
def checkReqValid(req):
global return_code
global return_message
# age
if 100 <= req["age"] or req["age"] < 0:
return_code = -2
return_message = "'age' is out of range"
return True
return False
实现 controller 部分:
# POST /post/:name
try :
name = req['path']['name']
age = int(req['body']['age'][0])
sex = req['args']['sex'][0]
location = req['args']['location'][0]
if checkReqValid({"name":name,
"age":age,
"sex":sex,
"location":location}) == True:
pass
else :
# dosomething……
return_data = {
"name":name,
"age":age,
"sex":sex,
"location":location,
"req":req
}
except KeyError: # check has field is empty
return_code = -1
return_message = "some field is empty"
finally: # return data
print(json.dumps({
"code":return_code,
"message":return_message,
"data":return_data
}))
用 # ResponseInfo POST /post/:name
定义输出响应头,用 print
写入stdout 的方式来响应请求:
# ResponseInfo POST /post/:name
print(json.dumps({
"headers" : {
"Content-Type" : "application/json"
},
"status" : return_status
}))
当我访问localhost:8888/post/colin?sex=male&location=shanghai
且body体为 age:18
时,返回值为:
{
"code": 0,
"message": "",
"data": {
"name": "colin",
"age": 18,
"sex": "male",
"location": "shanghai",
"req": {
"body": {
"age": [
"18"
]
},
"args": {
"sex": [
"male"
],
"location": [
"shanghai"
]
},
"path": {
"name": "colin"
},
"headers": {
"Content-Type": "multipart/form-data; boundary=--------------------------981201125716045634129372",
"Cache-Control": "no-cache",
"Postman-Token": "ec0f5364-b0ea-4828-b987-c12f15573296",
"User-Agent": "PostmanRuntime/7.6.0",
"Accept": "*/*",
"Host": "localhost:8888",
"Accept-Encoding": "gzip, deflate",
"Content-Length": "160",
"Connection": "keep-alive"
}
}
}
}
关于响应码:
默认下为
200 OK
(且Content-Type: text/plain
)如果发生运行错误,则返回
500 Internal Server Error
如果没有找到路由,则返回
404 Not Found
如果找到路由但是 get/post 等这类请求方法还是没匹配上,则返回
405 Not Supported
4、坑
(1)cell 里涉及到注释实现的路由功能时,首行不能是空行,不然报错:
? [email protected] ~/Program/PythonProject jupyter kernelgateway --KernelGatewayApp.api='kernel_gateway.notebook_http' --KernelGatewayApp.seed_uri='/Users/xjnotxj/Program/PythonProject/tuo.ipynb'
[KernelGatewayApp] Kernel started: bb13bcd6-514f-4682-b627-e6809cbb13ac
Traceback (most recent call last):
File "/anaconda3/bin/jupyter-kernelgateway", line 11, in <module>
sys.exit(launch_instance())
File "/anaconda3/lib/python3.7/site-packages/jupyter_core/application.py", line 266, in launch_instance
return super(JupyterApp, cls).launch_instance(argv=argv, **kwargs)
File "/anaconda3/lib/python3.7/site-packages/traitlets/config/application.py", line 657, in launch_instance
app.initialize(argv)
File "/anaconda3/lib/python3.7/site-packages/kernel_gateway/gatewayapp.py", line 382, in initialize
self.init_webapp()
File "/anaconda3/lib/python3.7/site-packages/kernel_gateway/gatewayapp.py", line 449, in init_webapp
handlers = self.personality.create_request_handlers()
File "/anaconda3/lib/python3.7/site-packages/kernel_gateway/notebook_http/__init__.py", line 112, in create_request_handlers
raise RuntimeError('No endpoints were discovered. Check your notebook to make sure your cells are annotated correctly.')
RuntimeError: No endpoints were discovered. Check your notebook to make sure your cells are annotated correctly.
? [email protected] ~/Program/PythonProject [IPKernelApp] WARNING | Parent appears to have exited, shutting down.
(2)response 里args
和body
体里的参数值是一个长度为1的数组
# 注意取法
sex = req['args']['sex'][0]
二、papermill
第二步,就是用类似胶水的东西,把不同的 Data Science 处理脚本,粘连起来,依次调用。
为什么要使用papermill
,而不是直接调用脚本?
(1)规范了调用jurpyter文件和传参的模式
(2)执行jurpyter文件后可以生成 out 文件,方便回溯
(3)上下文变量按照每一个jurpyter文件划分区域去存储,互不干扰
1、安装
https://github.com/nteract/papermill
pip install papermill
2、使用
(1)a.ipynb
import papermill as pm
for i, item in enumerate(data):
data[i] = item * multiple
pm.record("data", data)
print(data)
(2)main.ipynb
data=[1,2,3]
data
# 也可以通过命令行运行,详细看文档
pm.execute_notebook(
'a.ipynb',
'a_out.ipynb',
parameters = dict(data=data,multiple=3)
)
Papermill 支持输入和输出路径有以下几种类型:
(1)本地文件系统: local
(2)HTTP,HTTPS协议: http://, https://
(3)亚马逊网络服务:AWS S3 s3://
(4)Azure:Azure DataLake Store,Azure Blob Store adl://, abs://
(5)Google Cloud:Google云端存储 gs://
执行main.ipynb
后:
1、会生成a_out.ipynb
新文件(见下文的(3))
2、有绑定在a_out.ipynb
上的上下文变量:
re = pm.read_notebook('a_out.ipynb').dataframe
re
name | value | type | filename | |
---|---|---|---|---|
0 | data | [1, 2, 3] | parameter | a_out.ipynb |
1 | multiple | 3 | parameter | a_out.ipynb |
2 | data | [3, 6, 9] | record | a_out.ipynb |
获取参数稍微有一些繁琐,我这里封装了个 function:
# getNotebookData args
# [filename] .ipynb的文件路径
# [field] 取值变量
# [default_value] 默认返回值(default:None)
# [_type] 'parameter'|'record'(default)
def getPMNotebookData(filename, field ,default_value = None,_type='record'):
result = default_value
try:
re = pm.read_notebook(filename).dataframe
result = re[re['name']==field][re['type']==_type]["value"].values[0]
except:
pass
finally:
return result
data = getPMNotebookData('a_out.ipynb', 'data', 0)
data
# [3, 6, 9]
(3)a_out.ipynb
生成的这个新文件,会多出两块内容:
1、在所有 cell 的最开头,会自动插入新的 cell,里面有我们传入的参数
# Parameters
data = [1, 2, 3]
multiple = 3
2、cell 对应的 out 信息
[3, 6, 9]
3、坑
(1)参数不能传 pd.Dataframe 类型
会报错:
TypeError: Object of type DataFrame is not JSON serializable
解决办法:
1、序列化 Dataframe
Dataframe提供了两种序列化的方式,df.to_json()
或 df.to_csv()
,解析或者详细的用法请看:https://github.com/nteract/papermill/issues/215
缺点:
在序列化的过程中,Dataframe 每列的数据类型会发生丢失,重新读取后需重新指定。
2、不通过 papermill 的传参机制去传输 Dataframe,而是通过 csv 中间文件承接 【推荐】
三、docker 封装
第三步,就是用 docker
,封装设计好的 microservices,以便部署。
待写……
以上是关于如何实现一个基于 jupyter 的 microservices的主要内容,如果未能解决你的问题,请参考以下文章
如何将 ValueConverter 应用于基于约定的 Caliburn.Micro 绑定示例?
如何基于Jupyter notebook搭建Spark集群开发环境