怎么用python写一个期货突破新高报警的程序

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了怎么用python写一个期货突破新高报警的程序相关的知识,希望对你有一定的参考价值。

参考技术A 1、首先用python写一个期货突破新高报警的程序包含两个菜单栏。
2、其次使用Python语言编写该策略,也非常容易实现。
3、最后加上回测配置信息,有70行代码,实际可以更加精简。

python2写ping监控,自动发现ip

玩了hostmonitor,老外写的很好。但是不符合国情,只有邮件适合发送。

今天用python 写一个自动发现ip,ping失败报警的程序。(微信和邮件报警)

以前用python写的发微信,发邮件直接导入即可。

# -*- coding: utf-8 -*-
# ping for Windows

import os
import sys
import socket
import struct
import select
import time
import ConfigParser
import weixin
import mail

class getcfg(object):
	def __init__(self,filename):
		self.filename=filename
		self.cfg=ConfigParser.ConfigParser()
		self.cfg.read(self.filename)
		#self.allip=self.cfg.items(‘allip‘)
		self.retry=self.cfg.get(‘rule‘,‘retry‘)
		self.timeout=self.cfg.get(‘rule‘,‘timeout‘)
		self.reload=self.cfg.get(‘rule‘,‘next_check‘)
		self.scan=self.cfg.items(‘scan_network‘)
		self.mailto=self.cfg.items(‘mail‘,‘mail_to‘)
	
	#发送邮件报警
	def sendmessage(self,failip):
		weixin.sendMessage(‘@all‘,‘ping %s failed.‘% failip)
		for key,value in self.mailto:
			mail.send(value,"ping %s failed." % failip,‘python ping fail‘)
			
	#判断要扫描ip
	def scan_network(self):
		self.scan.sort()
		allip=self.cfg.items(‘allip‘)
		i=0
		while i < len(self.scan):
			ipbegin=int(self.scan[i][1].split(‘.‘)[-1])
			ipend=int(self.scan[i+1][1])+1
			for x in range(ipbegin,ipend):
				ipvalue=self.scan[i][1].split(‘.‘)[0]+‘.‘+self.scan[i][1].split(‘.‘)[1]+‘.‘+self.scan[i][1].split(‘.‘)[2]+‘.‘+str(x)
				ip_dict=[]
				for k,v in allip:
					ip_dict.append(v)
				if ipvalue in ip_dict:
					pass
				else:
					#执行扫描
					self.to_ping(ipvalue,int(self.timeout))
			i=i+2
	
	#执行扫描
	def to_ping(self,ip_addr,timeout):
		print ‘scan ip ‘ + ip_addr,
		try:
			delay = ping(self.filename).ping_once(ip_addr, timeout)
			if delay == None:
				print ‘failed. (timeout within %s second.)‘ % timeout
			else:
				print ‘get reply in %0.4f ms‘ % (delay * 1000)
				self.scan_resule(ip_addr)
		except socket.gaierror, e:
			print "failed. (socket error: ‘%s‘)" % e[1]
	
	#判断扫描结果,如果有新扫描到的ip则保存到配置文件
	def scan_resule(self,ip_addr):
		allip=self.cfg.items(‘allip‘)
		ip_dict=[]
		for k,v in allip:
			ip_dict.append(v)
		if ip_addr in ip_dict:
			pass
		else:
			self.cfg_write(ip_addr)
	
	#保存扫描到的ip到配置文件
	def cfg_write(self,ipvalue):
		ipkey=‘auto_‘+str(time.time())
		self.cfg.set(‘allip‘,ipkey,ipvalue)
		self.cfg.write(open(‘ping_monitor.txt‘,‘w‘))
		
	#去执行ping类
	def to_ping(self):
		allip=self.cfg.items(‘allip‘)
		for k,v in allip:
			if len(v) >0:
				ping(self.filename).icmp_ping(v,int(self.timeout),int(self.retry))

	#下次执行扫描时间
	def next_check(self):
		while True:
			self.to_ping()
			print(‘------------------------------------------------------------‘)
			nextcheck=0
			while nextcheck < int(self.reload):
				sys.stdout.write(‘next check %s second\r‘ % (int(self.reload)-nextcheck))
				sys.stdout.flush()
				time.sleep(1)
				nextcheck=nextcheck+1
			self.scan_network()
			print(‘------------------------------------------------------------‘)
			

class ping(object):
	def __init__(self,filename):
		self.ICMP_ECHO_REQUEST = 8
		self.filename=filename

	def receive_ping(self,my_socket, ID, timeout):
		start_time = timeout
		while True:
			start_select = time.clock()
			what_ready = select.select([my_socket], [], [], start_time)
			how_long = (time.clock() - start_select)
			if what_ready[0] == []: #timeout
				return

			time_received = time.clock()
			rec_packet, addr = my_socket.recvfrom(1024)
			icmp_header = rec_packet[20 : 28]
			ip_type, code, checksum, packet_ID, sequence = struct.unpack("bbHHh", icmp_header)
			if ip_type != 8 and packet_ID == ID: # ip_type should be 0
				byte_in_double = struct.calcsize("d")
				time_sent = struct.unpack("d", rec_packet[28 : 28 + byte_in_double])[0]
				return time_received - time_sent

			start_time = start_time - how_long
			if start_time <= 0:
				return


	def get_checksum(self,source):
		checksum = 0
		count = (len(source) / 2) * 2
		i = 0
		while i < count:
			temp = ord(source[i + 1]) * 256 + ord(source[i]) # 256 = 2^8
			checksum = checksum + temp
			checksum = checksum & 0xffffffff # 4,294,967,296 (2^32)
			i = i + 2

		if i < len(source):
			checksum = checksum + ord(source[len(source) - 1])
			checksum = checksum & 0xffffffff

		# 32-bit to 16-bit
		checksum = (checksum >> 16) + (checksum & 0xffff)
		checksum = checksum + (checksum >> 16)
		answer = ~checksum
		answer = answer & 0xffff

		# why? ans[9:16 1:8]
		answer = answer >> 8 | (answer << 8 & 0xff00)
		return answer


	def send_ping(self,my_socket, ip_addr, ID):
		ip = socket.gethostbyname(ip_addr)

		my_checksum = 0

		header = struct.pack(‘bbHHh‘, self.ICMP_ECHO_REQUEST, 0, my_checksum, ID, 1)
		byte_in_double = struct.calcsize("d") # C type: double
		data = (192 - byte_in_double) * "P" # any char is OK, any length is OK
		data = struct.pack("d", time.clock()) + data

		my_checksum = self.get_checksum(header + data)

		header = struct.pack("bbHHh", self.ICMP_ECHO_REQUEST, 0, socket.htons(my_checksum), ID, 1)
		packet = header + data
		my_socket.sendto(packet, (ip, 80)) # it seems that 0~65535 is OK (port?)


	def ping_once(self,ip_addr, timeout):
		icmp = socket.getprotobyname(‘icmp‘)
		try:
			my_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, icmp)
		except socket.error:
			raise

		my_ID = os.getpid() & 0xFFFF

		self.send_ping(my_socket, ip_addr, my_ID)
		delay = self.receive_ping(my_socket, my_ID, timeout)

		my_socket.close()
		return delay


	def icmp_ping(self,ip_addr,timeout,count):
		number=1
		for i in range(count):
			print ‘ping ‘ + ip_addr,
			try:
				delay = self.ping_once(ip_addr, timeout)
				if delay == None:
					print ‘failed. (timeout within %s second.)‘ % timeout
					if number==count:
						print(‘The request has failed %s times,Email alerts are being sent‘ % count)
						getcfg(self.filename).sendmessage(ip_addr)
				else:
					print ‘get reply in %0.4f ms‘ % (delay * 1000)
					break
			except socket.gaierror, e:
				print "failed. (socket error: ‘%s‘)" % e[1]
				break
			number=number+1
			

if __name__ == ‘__main__‘:
	weixin=weixin.WeChat()
	mail=mail.sendmail()
	cfgname=‘ping_monitor.txt‘
	ping(cfgname)
	pingcfg=getcfg(cfgname)
	pingcfg.next_check()

 

配置文件

[rule]
retry = 4
timeout = 1
next_check = 300
#单位都是秒
[scan_network]
3paragraph_begin = 10.1.3.198
3paragraph_end = 202
2paragraph_begin = 10.1.2.1
2paragraph_end = 10


[mail]
[email protected]
[email protected]


[allip]
#这里会自动把扫描到的ip写入

  

 

以上是关于怎么用python写一个期货突破新高报警的程序的主要内容,如果未能解决你的问题,请参考以下文章

哈希日报:Cboe或将推出以太坊期货产品;比特币期货净空头头寸创2月以来新高;Equihash算法算力中30%由ASIC矿机掌握

Footprint周报:比特币期货ETF上市,BTC价格再创新高

期货程序化交易软件怎么使用?

正大国际期货:美股再创新高打击金价,美联储最关注的数据

4.3盘面数据分析

经典日内策略:ORB突破策略(期货)