python Raspberry pi拡张ユニットテストと実装。
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python Raspberry pi拡张ユニットテストと実装。相关的知识,希望对你有一定的参考价值。
#!/usr/bin/env python
# -*- coding: euc-jp -*-
import smbus
import time
def main():
# LCD initialize
i2c = smbus.SMBus(1)
addr = 0x3e
contrast = 42 # 0-63
i2c.write_byte_data(addr, 0, 0x38) # function set(IS=0)
i2c.write_byte_data(addr, 0, 0x39) # function set(IS=1)
i2c.write_byte_data(addr, 0, 0x14) # internal osc
i2c.write_byte_data(addr, 0,(0x70 | (contrast & 0x0f))) # contrast
i2c.write_byte_data(addr, 0,(0x54 | ((contrast >> 4) & 0x03))) # contrast/icon/power
i2c.write_byte_data(addr, 0, 0x6c) # follower control
time.sleep(0.2)
i2c.write_byte_data(addr, 0, 0x38) # function set(IS=0)
i2c.write_byte_data(addr, 0, 0x0C) # Display On
i2c.write_byte_data(addr, 0, 0x01) # Clear Display
i2c.write_byte_data(addr, 0, 0x06) # Entry Mode Set
time.sleep(0.2)
# LCD Clear
i2c.write_byte_data(addr, 0, 0x38) # function set(IS=0)
i2c.write_byte_data(addr, 0, 0x0C) # Display On
i2c.write_byte_data(addr, 0, 0x01) # Clear Display
i2c.write_byte_data(addr, 0, 0x06) # Entry Mode Set
time.sleep(0.2)
# Send to LCD
line1 = '__test__'
for c in line1:
i2c.write_byte_data(addr, 0x40, ord(c))
i2c.write_byte_data(addr, 0, 0xc0) # 2nd line
line2 = '__!(^^)!__'
for c in line2:
i2c.write_byte_data(addr, 0x40, ord(c))
if __name__ == '__main__':
main()
#!/usr/bin/env python
# -*- coding: euc-jp -*-
import spidev
from time import sleep
spi = spidev.SpiDev()
spi.open(0,1)
def changeLevel(ch, onOff, percent):
bit7 = ch << 7
bit6 = 0 << 6
bit5 = 1 << 5
bit4 = onOff << 4
size=12
number=(2**size-1)*percent/100
number= number<<(12-size)
number= number<<(12-size)
bottomPart= number % 256
topPart=(number-bottomPart)>>8
firstByte=bit7+bit6+bit5+bit4+topPart
secondByte=bottomPart
return spi.xfer2([firstByte,secondByte])
def which_channel():
channel = raw_input("Which channel do you want to test? Type 0 or 1.\n")
while not channel.isdigit():
channel = raw_input("Try again - just numbers 0 or 1 please!\n")
return channel
def main():
channel = 3
while not (channel == 1 or channel == 0):
channel = int(which_channel())
print "These are the connections for the digital to analogue test:"
print "jumper connecting GP11 to SCLK"
print "jumper connecting GP10 to MOSI"
print "jumper connecting GP9 to MISO"
print "jumper connecting GP7 to CSnB"
print "Multimeter connections (set your meter to read V DC):"
print " connect black probe to GND"
print " connect red probe to DA%d on J29" % channel
raw_input("When ready hit enter.\n")
percent=[0,25,75,100]
for p in percent:
r = changeLevel(channel,1,p)
print "Your meter should read about {0:.2f}V".format(p*2.048/100.0)
raw_input("When ready hit enter.\n")
r = changeLevel(0,0,0)
r = changeLevel(1,0,0)
if __name__ == '__main__':
main()
#!/usr/bin/env python
# -*- coding: euc-jp -*-
import sys
import time
import spidev
class ADC:
def __init__(self):
self.spi = spidev.SpiDev()
self.spi.open(0, 0)
def get_value(self, channel):
sned_ch = [0x00,0x08,0x10,0x18]
if ((channel > 3) or (channel < 0)):
return -1
r = self.spi.xfer2([sned_ch[channel],0,0,0])
ret = ((r[2] << 6 ) & 0x300) | ((r[2] << 6) & 0xc0) | ((r[3] >> 2) & 0x3f)
return ret
def get_voltage(self, channel):
ret = self.get_value(channel) * 5.0 / 1024
return ret
def main():
adc = ADC()
while 1:
adc1 = adc.get_value(0)
msg1 = "%1.5fV(%04x)" % ((float(adc1)*5/1024),adc1)
print msg1,
adc1 = adc.get_value(1)
msg1 = "%1.5fV(%04x)" % ((float(adc1)*5/1024),adc1)
print msg1,
adc2 = adc.get_value(2)
msg2 = "%1.5fV(%04x)" % ((float(adc2)*5/1024),adc2)
print msg2,
adc3 = adc.get_value(3)
msg3 = "%1.5fV(%04x)" % ((float(adc3)*5/1024),adc3)
print msg3,
sys.stdout.write("\n")
time.sleep(0.5)
if __name__ == '__main__':
main()
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# -*- Python -*-
"""
@file Ministick.py
@brief Phidget ministick sensor component
@date $Date$
"""
import sys
import time
sys.path.append(".")
# Import RTM module
import RTC
import OpenRTM_aist
import math
import spidev
# Import Service implementation class
# <rtc-template block="service_impl">
# </rtc-template>
# Import Service stub modules
# <rtc-template block="consumer_import">
# </rtc-template>
# This module's spesification
# <rtc-template block="module_spec">
ministick_spec = ["implementation_id", "Ministick",
"type_name", "Ministick",
"description", "Phidget ministick sensor component",
"version", "1.0.0",
"vendor", "AIST",
"category", "Input Devic",
"activity_type", "STATIC",
"max_instance", "1",
"language", "Python",
"lang_type", "SCRIPT",
"conf.default.scaling", "1.0",
"conf.default.tread", "0.2",
"conf.default.print_xy", "NO",
"conf.default.print_vel", "NO",
"conf.default.print_wvel", "NO",
"conf.__widget__.scaling", "slider.0.1",
"conf.__widget__.tread", "slider.0.01",
"conf.__widget__.print_xy", "radio",
"conf.__widget__.print_vel", "radio",
"conf.__widget__.print_wvel", "radio",
"conf.__constraints__.scaling", "0.0<=x<=10.0",
"conf.__constraints__.tread", "0.0<=x<=1.0",
"conf.__constraints__.print_xy", "(YES,NO)",
"conf.__constraints__.print_vel", "(YES,NO)",
"conf.__constraints__.print_wvel", "(YES,NO)",
""]
# </rtc-template>
##
# @class Ministick
# @brief Phidget ministick sensor component
#
#
class Ministick(OpenRTM_aist.DataFlowComponentBase):
##
# @brief constructor
# @param manager Maneger Object
#
def __init__(self, manager):
OpenRTM_aist.DataFlowComponentBase.__init__(self, manager)
self._d_pos = RTC.TimedFloatSeq(RTC.Time(0,0),[])
"""
ジョイスティックのX-Y位置データ
- Semantics: data[0]:x位置, data[1]:y位置
"""
self._posOut = OpenRTM_aist.OutPort("pos", self._d_pos)
self._d_vel = RTC.TimedVelocity2D(RTC.Time(0,0),0)
"""
移動ロボットの速度ベクトル
- Semantics: vx:並進速度, vy:0.0m va:角速度
- Unit: vx[m/s], va[rad/s]
"""
self._velOut = OpenRTM_aist.OutPort("vel", self._d_vel)
self._d_whell_vel = RTC.TimedFloatSeq(RTC.Time(0,0),[])
"""
車輪速度
- Semantics: data[0]:左車輪角速度, data[1]:右車輪角速度
- Unit: [rad/s]
"""
self._whell_velOut = OpenRTM_aist.OutPort("whell_vel", self._d_whell_vel)
# initialize of configuration-data.
# <rtc-template block="init_conf_param">
"""
スケーリングファクタ
- Name: scaling
- DefaultValue: 1.0
"""
self._scaling = [1.0]
"""
移動ロボットのトレッド値
- Name: tread
- DefaultValue: 0.2
"""
self._tread = [0.2]
"""
- Name: print_xy
- DefaultValue: NO
"""
self._print_xy = ['NO']
"""
velデータのデバッグプリントフラグ
- Name: print_vel
- DefaultValue: NO
"""
self._print_vel = ['NO']
"""
wheel_velデータのデバッグプリントフラグ
- Name: print_wvel
- DefaultValue: NO
"""
self._print_wvel = ['NO']
# </rtc-template>
self.x = 0.0
self.y = 0.0
self.spi = spidev.SpiDev()
self.spi.open(0, 0)
def get_adc(self, channel):
sned_ch = [0x00, 0x08, 0x10, 0x18]
if ((channel > 3) or (channel < 0)):
return -1
r = self.spi.xfer2([sned_ch[channel], 0, 0, 0])
ret = ((r[2] << 6 ) & 0x300) | ((r[2] << 6) & 0xc0) | ((r[3] >> 2) & 0x3f)
return ret
def xy_to_wvel(self, x, y):
th = math.atan2(y, x)
v = math.hypot(x, y)
vl = v * math.cos(th - (math.pi/4.0))
vr = v * math.sin(th - (math.pi/4.0))
return (vl, vr)
def wvel_to_vel2d(self, vl, vr):
v = (vr + vl) / 2.0
if v < 0.0:
w = - (vr - vl) / self._tread[0]
else:
w = (vr - vl) / self._tread[0]
return RTC.Velocity2D(v, 0.0, w)
##
#
# The initialize action (on CREATED->ALIVE transition)
# formaer rtc_init_entry()
#
# @return RTC::ReturnCode_t
#
#
def onInitialize(self):
# Bind variables and configuration variable
self.bindParameter("scaling", self._scaling, "1.0")
self.bindParameter("tread", self._tread, "0.2")
self.bindParameter("print_xy", self._print_xy, "NO")
self.bindParameter("print_vel", self._print_vel, "NO")
self.bindParameter("print_wvel", self._print_wvel, "NO")
# Set InPort buffers
# Set OutPort buffers
self.addOutPort("pos",self._posOut)
self.addOutPort("vel",self._velOut)
self.addOutPort("whell_vel",self._whell_velOut)
# Set service provider to Ports
# Set service consumers to Ports
# Set CORBA Service Ports
self.x_offset_v = 0.0
self.y_offset_v = 0.0
for i in range(1, 100):
self.x_offset_v += self.get_adc(0)
self.y_offset_v += self.get_adc(1)
self.x_offset_v = self.x_offset_v / 100.0
self.y_offset_v = self.y_offset_v / 100.0
return RTC.RTC_OK
##
#
# The finalize action (on ALIVE->END transition)
# formaer rtc_exiting_entry()
#
# @return RTC::ReturnCode_t
#
def onFinalize(self):
return RTC.RTC_OK
# ##
# #
# # The startup action when ExecutionContext startup
# # former rtc_starting_entry()
# #
# # @param ec_id target ExecutionContext Id
# #
# # @return RTC::ReturnCode_t
# #
# #
#def onStartup(self, ec_id):
#
# return RTC.RTC_OK
# ##
# #
# # The shutdown action when ExecutionContext stop
# # former rtc_stopping_entry()
# #
# # @param ec_id target ExecutionContext Id
# #
# # @return RTC::ReturnCode_t
# #
# #
#def onShutdown(self, ec_id):
#
# return RTC.RTC_OK
##
#
# The activated action (Active state entry action)
# former rtc_active_entry()
#
# @param ec_id target ExecutionContext Id
#
# @return RTC::ReturnCode_t
#
#
#def onActivated(self, ec_id):
# return RTC.RTC_OK
##
#
# The deactivated action (Active state exit action)
# former rtc_active_exit()
#
# @param ec_id target ExecutionContext Id
#
# @return RTC::ReturnCode_t
#
#
def onDeactivated(self, ec_id):
return RTC.RTC_OK
##
#
# The execution action that is invoked periodically
# former rtc_active_do()
#
# @param ec_id target ExecutionContext Id
#
# @return RTC::ReturnCode_t
#
#
def onExecute(self, ec_id):
self.x = - (self.get_adc(0) - self.x_offset_v) * self._scaling[0] / 1000.0
self.y = (self.get_adc(1) - self.y_offset_v) * self._scaling[0] / 1000.0
if self._print_xy[0] != "NO":
print "(x, y) = ", self.x, self.y
self._d_pos.data = [self.x, self.y]
self._d_whell_vel.data = self.xy_to_wvel(self.x, self.y)
if self._print_wvel[0] != "NO":
print "(vl, vr) = ", self._d_whell_vel.data[0], self._d_whell_vel.data[1]
self._d_vel.data = self.wvel_to_vel2d(self._d_whell_vel.data[0],
self._d_whell_vel.data[1])
if self._print_vel[0] != "NO":
print "(vx, va) = ", self._d_vel.data.vx, self._d_vel.data.va
self._posOut.write()
self._velOut.write()
self._whell_velOut.write()
return RTC.RTC_OK
# ##
# #
# # The aborting action when main logic error occurred.
# # former rtc_aborting_entry()
# #
# # @param ec_id target ExecutionContext Id
# #
# # @return RTC::ReturnCode_t
# #
# #
#def onAborting(self, ec_id):
#
# return RTC.RTC_OK
# ##
# #
# # The error action in ERROR state
# # former rtc_error_do()
# #
# # @param ec_id target ExecutionContext Id
# #
# # @return RTC::ReturnCode_t
# #
# #
#def onError(self, ec_id):
#
# return RTC.RTC_OK
# ##
# #
# # The reset action that is invoked resetting
# # This is same but different the former rtc_init_entry()
# #
# # @param ec_id target ExecutionContext Id
# #
# # @return RTC::ReturnCode_t
# #
# #
#def onReset(self, ec_id):
#
# return RTC.RTC_OK
# ##
# #
# # The state update action that is invoked after onExecute() action
# # no corresponding operation exists in OpenRTm-aist-0.2.0
# #
# # @param ec_id target ExecutionContext Id
# #
# # @return RTC::ReturnCode_t
# #
# #
#def onStateUpdate(self, ec_id):
#
# return RTC.RTC_OK
# ##
# #
# # The action that is invoked when execution context's rate is changed
# # no corresponding operation exists in OpenRTm-aist-0.2.0
# #
# # @param ec_id target ExecutionContext Id
# #
# # @return RTC::ReturnCode_t
# #
# #
#def onRateChanged(self, ec_id):
#
# return RTC.RTC_OK
def MinistickInit(manager):
profile = OpenRTM_aist.Properties(defaults_str=ministick_spec)
manager.registerFactory(profile,
Ministick,
OpenRTM_aist.Delete)
def MyModuleInit(manager):
MinistickInit(manager)
# Create a component
comp = manager.createComponent("Ministick")
def main():
mgr = OpenRTM_aist.Manager.init(sys.argv)
mgr.setModuleInitProc(MyModuleInit)
mgr.activateManager()
mgr.runManager()
if __name__ == "__main__":
main()
以上是关于python Raspberry pi拡张ユニットテストと実装。的主要内容,如果未能解决你的问题,请参考以下文章
python Raspberry pi拡张ユニットテストと実装。
python SQLでSELECT DISTINCTしてユニークな组み合わせをカウントするような问い合わせを,熊猫のデータフレームに书き変えてみました。以下「お昼のセットメニューの组み合わせに投票」と