阿里云api调用
Posted slqt
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了阿里云api调用相关的知识,希望对你有一定的参考价值。
# coding: utf-8
"""
This module be able to manage ecs instances on the aliyun cloud. We choose
some helper functions from salt.cloud.clouds.aliyun since DRY principle.
Full documentations about the Aliyun APIs are located at
"https://help.aliyun.com/document_detail/25484.html".
"""
import sys
import time
import uuid
import hmac
import base64
import json
#隐藏部分内部调用库
import logging
log = logging.getLogger(__name__)
from hashlib import sha1
import requests
import salt.utils
try:
from salt.ext.six.moves.urllib.parse import quote as _quote
except:
pass
DEFAULT_ALIYUN_API_VERSION = ‘2014-05-26‘
def get_params(integration_info):
utils.logger.info(‘{}‘.format(integration_info))
access_key_id = None
secret_key = None
if not isinstance(integration_info,dict):
integration_info=json.loads(integration_info)
for k, v in integration_info.items():
if k == ‘access_key_id‘:
access_key_id = v
elif k == ‘secret_key‘:
secret_key = v
return access_key_id, secret_key
class HTTPError(Exception):
def __init__(self, response):
self.has_data = False
self.status_code = response.status_code
utils.logger.debug(self.status_code)
self.content = response.content
utils.logger.debug(self.content)
if ‘application/json‘ in response.headers[‘Content-Type‘].lower():
self.has_data = True
self.data = json.loads(
self.content, object_hook=salt.utils.decode_dict)
def _compute_signature(parameters, secret_key):
‘‘‘
Generate aliyun request signature
‘‘‘
def percent_encode(line):
if not isinstance(line, str):
return line
s = line
utils.logger.debug(sys.stdin.encoding)
utils.logger.debug(sys.getdefaultencoding())
utils.logger.debug(sys.getfilesystemencoding())
if sys.stdin.encoding is None and sys.getfilesystemencoding() is None:
s = line.decode().encode(‘utf8‘)
elif sys.stdin.encoding is None and sys.getfilesystemencoding():
s = line.decode(sys.getfilesystemencoding()).encode(‘utf8‘)
else:
s = line.decode(sys.stdin.encoding).encode(‘utf8‘)
res = _quote(s, ‘‘)
res = res.replace(‘+‘, ‘%20‘)
res = res.replace(‘*‘, ‘%2A‘)
res = res.replace(‘%7E‘, ‘~‘)
return res
sortedParameters = sorted(list(parameters.items()),
key=lambda items: items[0])
canonicalizedQueryString = ‘‘
for k, v in sortedParameters:
canonicalizedQueryString += ‘&‘ + percent_encode(k) \
+ ‘=‘ + percent_encode(v)
# All aliyun API only support GET method
stringToSign = ‘GET&%2F&‘ + percent_encode(canonicalizedQueryString[1:])
h = hmac.new(secret_key + "&", stringToSign, sha1)
signature = base64.encodestring(h.digest()).strip()
return signature
def query(access_key_id, secret_key, params, jid=None, outputParam=[], **kwargs):
‘‘‘
Make a web call to aliyun ECS REST API
‘‘‘
path = ‘https://ecs.aliyuncs.com/‘
timestamp = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
# public interface parameters
parameters = {
‘Format‘: ‘JSON‘,
‘Version‘: DEFAULT_ALIYUN_API_VERSION,
‘AccessKeyId‘: access_key_id,
‘SignatureVersion‘: ‘1.0‘,
‘SignatureMethod‘: ‘HMAC-SHA1‘,
‘SignatureNonce‘: str(uuid.uuid1()),
‘TimeStamp‘: timestamp,
}
# include action or function parameters
if params:
parameters.update(params)
# Calculate the string for Signature
signature = _compute_signature(parameters, secret_key)
parameters[‘Signature‘] = signature
utils.logger.debug(parameters)
utils.logger.debug(path)
utils.logger.debug(parameters)
request = requests.get(path, params=parameters, verify=True)
utils.logger.debug(‘request url:{}‘.format(path))
utils.logger.debug(‘parameters:{}‘.format(parameters))
if request.status_code != 200:
raise HTTPError(request)
log.debug(request.url)
utils.logger.debug(request.url)
utils.logger.debug(request.status_code)
content = request.text
utils.logger.debug(content)
result = json.loads(content, object_hook=salt.utils.decode_dict)
if ‘Code‘ in result:
raise HTTPError(request)
return result
def http_error_code_result(http_error):
if http_error.has_data and ‘Code‘ in http_error.data:
return {"success": False, "message": u‘error code:{0}‘.format(http_error.data[‘Code‘])}
else:
log.error(‘{}‘.format(http_error.content))
utils.logger.error(‘{}‘.format(http_error.content))
return {"success": False, "message": u‘unknown error‘}
def byteify(input_str):
if isinstance(input_str, dict):
return {byteify(key): byteify(value) for key, value in input_str.iteritems()}
elif isinstance(input_str, list):
return [byteify(element) for element in input_str]
elif isinstance(input_str, unicode):
return input_str.encode(‘utf-8‘)
else:
return input_str
def create(integration_info=None, vm_conf=None, jid=None, outputParam=[], **kwargs):
params = {
‘Action‘: ‘CreateInstance‘,
}
try:
ret, param = utils_errors.check_inputs(locals())
if not ret:
return {‘success‘: False, ‘message‘: ‘input params error,please check input params:{}‘.format(param)}
access_key_id, secret_key = get_params(integration_info)
if vm_conf in ({}, None, ‘‘):
return {"success": False, "message": u‘create ECS instance fail,please check the config params‘}
else:
params.update(vm_conf)
params = byteify(params)
access_key_id = byteify(access_key_id)
secret_key = byteify(secret_key)
result = query(access_key_id, secret_key, params)
utils.logger.info(‘result:{}‘.format(result))
instanceId = result.get(‘InstanceId‘, None)
out = {‘instance_id‘: instanceId}
utils.logger.info(‘{} {}‘.format(out, outputParam))
outs = utils_params.get_output(out, outputParam)
except HTTPError as e:
return http_error_code_result(e)
return {"success": True, "message": u‘create ECS instance success,instance ID:{0}‘.format(instanceId),
‘outputParam‘: outs}
def edit(integration_info=None, opts=None, access_key_id=None, secret_key=None, instance_id=None,
instanceName=None,
Description=None, Password=None, HostName=None, jid=None, outputParam=[], **kwargs):
access_key_id, secret_key = get_params(integration_info)
params = {
‘Action‘: ‘ModifyInstanceAttribute‘,
‘InstanceId‘: instance_id,
‘InstanceName‘: instanceName,
‘Description‘: Description,
‘Password‘: Password,
‘HostName‘: HostName,
}
for k, v in params.items():
if not v:
del params[k]
try:
ret, param = utils_errors.check_inputs(locals())
if not ret:
return {‘success‘: False, ‘message‘: ‘input params error,please check input params:{}‘.format(param)}
params = byteify(params)
access_key_id = byteify(access_key_id)
secret_key = byteify(secret_key)
result = query(access_key_id, secret_key, params)
utils.logger.info(‘{}‘.format(result))
instanceId = result.get(‘RequestId‘, None)
out = {‘instance_id‘: instanceId}
outs = utils_params.get_output(out, outputParam)
utils.logger.info(‘{}‘.format(outs))
except HTTPError as e:
return http_error_code_result(e)
else:
return {‘success‘: True, ‘message‘: u‘edit ECS instance success,instance ID:{0}‘.format(
instanceId), ‘outputParam‘: outs}
def _query_status(integration_info, region_id=None, instance_id=None, **kwargs):
access_key_id, secret_key = get_params(integration_info)
params = {
‘Action‘: ‘DescribeInstanceStatus‘,
}
try:
params = byteify(params)
access_key_id = byteify(access_key_id)
secret_key = byteify(secret_key)
regions = _query_region(integration_info)
status = None
for i in regions:
params[‘RegionId‘] = i
result = query(access_key_id, secret_key, params)
for i in result[‘InstanceStatuses‘][‘InstanceStatus‘]:
if i[‘InstanceId‘] == instance_id:
status = i[‘Status‘]
break
if status: # Running|Stopped
return True, status
if not status:
return False, ‘the instance not exists‘
except HTTPError as e:
return http_error_code_result(e)
def start(integration_info=None, opts=None, access_key_id=None, secret_key=None, instance_id=None,
jid=None,
outputParam=[], **kwargs):
access_key_id, secret_key = get_params(integration_info)
params = {
‘Action‘: ‘StartInstance‘,
‘InstanceId‘: instance_id,
}
try:
ret, param = utils_errors.check_inputs(locals())
if not ret:
return {‘success‘: False, ‘message‘: ‘input params error,please check input params:{}‘.format(param)}
params = byteify(params)
access_key_id = byteify(access_key_id)
secret_key = byteify(secret_key)
result = query(access_key_id, secret_key, params)
except HTTPError as e:
return http_error_code_result(e)
else:
return {‘success‘: True, ‘message‘: u‘instance start success‘}
def _query_zone(integration_info=None, region_id=None):
params = {
‘Action‘: ‘DescribeZones‘,
‘RegionId‘: region_id,
}
access_key_id, secret_key = get_params(integration_info)
params = byteify(params)
access_key_id = byteify(access_key_id)
secret_key = byteify(secret_key)
result = query(access_key_id, secret_key, params)
return result.get(‘ZoneId‘)
def _query_region(integration_info=None):
params = {
‘Action‘: ‘DescribeRegions‘,
}
access_key_id, secret_key = get_params(integration_info)
params = byteify(params)
access_key_id = byteify(access_key_id)
secret_key = byteify(secret_key)
result = query(access_key_id, secret_key, params)
result = result[‘Regions‘][‘Region‘]
results = [i[‘RegionId‘] for i in result]
return results
def query_ecs(integration_info=None, opts=None, region_id=None, instance_id=‘Instances‘, jid=None,
outputParam=[], **kwargs):
params = {
‘Action‘: ‘DescribeInstances‘,
‘RegionId‘: region_id,
}
try:
ret, param = utils_errors.check_inputs(locals())
if not ret:
return {‘success‘: False, ‘message‘: ‘input params error,please check input params:{}‘.format(param)}
access_key_id, secret_key = get_params(integration_info)
params = byteify(params)
access_key_id = byteify(access_key_id)
secret_key = byteify(secret_key)
result = query(access_key_id, secret_key, params)[‘Instances‘][‘Instance‘]
out = {}
for i in result:
if i[‘InstanceId‘] == instance_id:
out[‘PrivateIpAddresses‘] = ‘,‘.join(
i[‘VpcAttributes‘][‘PrivateIpAddress‘][‘IpAddress‘])
out[‘InnerIpAddresses‘] = ‘,‘.join(i[‘InnerIpAddress‘][‘IpAddress‘])
out[‘PublicIpAddresses‘] = ‘,‘.join(i[‘PublicIpAddress‘][‘IpAddress‘])
out[‘InstanceName‘] = ‘,‘.join(i[‘InstanceName‘])
out[‘Memory‘] = ‘,‘.join(i[‘Memory‘])
out[‘CPU‘] = ‘,‘.join(i[‘Cpu‘])
out[‘HostName‘] = ‘,‘.join(i[‘HostName‘])
out[‘Status‘] = ‘,‘.join(i[‘Status‘])
out[‘CreationTime‘] = ‘,‘.join(i[‘CreationTime‘])
out[‘ExpiredTime‘] = ‘,‘.join(i[‘ExpiredTime‘])
out[‘InstanceNetworkType‘] = ‘,‘.join(i[‘InstanceNetworkType‘])
outs = utils_params.get_output(out, outputParam)
except HTTPError as e:
return http_error_code_result(e)
else:
return {‘success‘: True, ‘message‘: ‘query success‘, ‘outputParam‘: outs}
def query_zone(integration_info=None, region_id=None, **kwargs):
params = {
‘Action‘: ‘DescribeZones‘,
‘RegionId‘: region_id
}
try:
ret, param = utils_errors.check_inputs(locals())
if not ret:
return {‘success‘: False, ‘message‘: ‘input params error,please check input params:{}‘.format(param)}
access_key_id, secret_key = get_params(integration_info)
params = byteify(params)
access_key_id = byteify(access_key_id)
secret_key = byteify(secret_key)
result = query(access_key_id, secret_key, params)
except HTTPError as e:
return http_error_code_result(e)
else:
return {‘success‘: True, ‘message‘: ‘query result:\n{}‘.format(utils_params.format_json(result))}
def reboot(integration_info=None, opts=None, access_key_id=None, secret_key=None, instance_id=None,
jid=None, outputParam=[], **kwargs):
params = {
‘Action‘: ‘RebootInstance‘,
‘InstanceId‘: instance_id,
}
try:
ret, param = utils_errors.check_inputs(locals())
if not ret:
return {‘success‘: False, ‘message‘: ‘input params error,please check input params:{}‘.format(param)}
access_key_id, secret_key = get_params(integration_info)
params = byteify(params)
access_key_id = byteify(access_key_id)
secret_key = byteify(secret_key)
result = query(access_key_id, secret_key, params)
except HTTPError as e:
return http_error_code_result(e)
else:
return {‘success‘: True, ‘message‘: u‘instance restart success‘}
def stop(integration_info=None, instance_id=None, **kwargs):
params = {
‘Action‘: ‘StopInstance‘,
‘InstanceId‘: instance_id,
}
try:
ret, param = utils_errors.check_inputs(locals())
if not ret:
return {‘success‘: False, ‘message‘: ‘input params error,please check input params:{}‘.format(param)}
access_key_id, secret_key = get_params(integration_info)
params = byteify(params)
access_key_id = byteify(access_key_id)
secret_key = byteify(secret_key)
result = query(access_key_id, secret_key, params)
start_time = time.time()
while True:
status, res = _query_status(integration_info, instance_id=instance_id)
if status and res == ‘Stopped‘:
return {‘success‘: True, ‘message‘: u‘instance stop success‘}
end_time = time.time()
if end_time - start_time >= 10000:
return {‘success‘: False, ‘message‘: u‘time is out‘}
except HTTPError as e:
return http_error_code_result(e)
def delete(integration_info=None, opts=None, instance_id=None, access_key_id=None, secret_key=None,
jid=None, outputParam=[], **kwargs):
params = {
‘Action‘: ‘DeleteInstance‘,
‘InstanceId‘: instance_id,
}
try:
ret, param = utils_errors.check_inputs(locals())
if not ret:
return {‘success‘: False, ‘message‘: ‘input params error,please check input params:{}‘.format(param)}
access_key_id, secret_key = get_params(integration_info)
params = byteify(params)
access_key_id = byteify(access_key_id)
secret_key = byteify(secret_key)
result = query(access_key_id, secret_key, params)
except HTTPError as e:
return http_error_code_result(e)
else:
return {‘success‘: True, ‘message‘: u‘instance delete success‘}
def create_image(integration_info=None, opts=None, image=None, jid=None, access_key_id=None,
secret_key=None,
outputParam=[], **kwargs):
params = {
‘Action‘: ‘CreateImage‘,
}
try:
ret, param = utils_errors.check_inputs(locals())
if not ret:
return {‘success‘: False, ‘message‘: ‘input params error,please check input params:{}‘.format(param)}
access_key_id, secret_key = get_params(integration_info)
params.update(image)
params = byteify(params)
access_key_id = byteify(access_key_id)
secret_key = byteify(secret_key)
result = query(access_key_id, secret_key, params)
utils.logger.debug(‘create_image‘)
except HTTPError as e:
return http_error_code_result(e)
else:
return {‘success‘: True, ‘message‘: u‘create image success,image ID:{0}‘.format(result[‘ImageId‘])}
以上是关于阿里云api调用的主要内容,如果未能解决你的问题,请参考以下文章
Powershell 调用阿里云 云解析API 实现动态域名解析
调用阿里云api获取阿里云数据同步服务(DTS)并且作图发送邮件的整个流程