python Raspberry pi拡张ユニットテストと実装。

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python Raspberry pi拡张ユニットテストと実装。相关的知识,希望对你有一定的参考价值。

#!/usr/bin/env python
# -*- coding: euc-jp -*-
import smbus
import time

def main():
  # LCD initialize
  i2c = smbus.SMBus(1)
  addr = 0x3e
  contrast = 42   # 0-63
  i2c.write_byte_data(addr, 0, 0x38)  # function set(IS=0)
  i2c.write_byte_data(addr, 0, 0x39)  # function set(IS=1)
  i2c.write_byte_data(addr, 0, 0x14)  # internal osc
  i2c.write_byte_data(addr, 0,(0x70 | (contrast & 0x0f))) # contrast
  i2c.write_byte_data(addr, 0,(0x54 | ((contrast >> 4) & 0x03)))  # contrast/icon/power
  i2c.write_byte_data(addr, 0, 0x6c)  # follower control
  time.sleep(0.2)
  i2c.write_byte_data(addr, 0, 0x38)  # function set(IS=0)
  i2c.write_byte_data(addr, 0, 0x0C)  # Display On
  i2c.write_byte_data(addr, 0, 0x01)  # Clear Display
  i2c.write_byte_data(addr, 0, 0x06)  # Entry Mode Set
  time.sleep(0.2)

  # LCD Clear
  i2c.write_byte_data(addr, 0, 0x38)  # function set(IS=0)
  i2c.write_byte_data(addr, 0, 0x0C)  # Display On
  i2c.write_byte_data(addr, 0, 0x01)  # Clear Display
  i2c.write_byte_data(addr, 0, 0x06)  # Entry Mode Set
  time.sleep(0.2)

  # Send to LCD
  line1 = '__test__'
  for c in line1:
    i2c.write_byte_data(addr, 0x40, ord(c))
  i2c.write_byte_data(addr, 0, 0xc0)  # 2nd line
  line2 = '__!(^^)!__'
  for c in line2:
    i2c.write_byte_data(addr, 0x40, ord(c))


if __name__ == '__main__':
  main()
#!/usr/bin/env python
# -*- coding: euc-jp -*-
import spidev
from time import sleep

spi = spidev.SpiDev()
spi.open(0,1)

def changeLevel(ch,  onOff, percent):
  bit7 = ch    << 7
  bit6 = 0     << 6
  bit5 = 1     << 5
  bit4 = onOff << 4
  size=12
  number=(2**size-1)*percent/100
  number= number<<(12-size)
  number= number<<(12-size)
  bottomPart= number % 256
  topPart=(number-bottomPart)>>8
  firstByte=bit7+bit6+bit5+bit4+topPart
  secondByte=bottomPart
  return spi.xfer2([firstByte,secondByte])


def which_channel():
  channel = raw_input("Which channel do you want to test? Type 0 or 1.\n")
  while not channel.isdigit():
      channel = raw_input("Try again - just numbers 0 or 1 please!\n")
  return channel


def main():
  channel = 3
  while not (channel == 1 or channel == 0):
      channel = int(which_channel())

  print "These are the connections for the digital to analogue test:"
  print "jumper connecting GP11 to SCLK"
  print "jumper connecting GP10 to MOSI"
  print "jumper connecting GP9 to MISO"
  print "jumper connecting GP7 to CSnB"
  print "Multimeter connections (set your meter to read V DC):"
  print "  connect black probe to GND"
  print "  connect red probe to DA%d on J29" % channel

  raw_input("When ready hit enter.\n")
  percent=[0,25,75,100]

  for p in percent:
      r = changeLevel(channel,1,p)
      print "Your meter should read about {0:.2f}V".format(p*2.048/100.0)
      raw_input("When ready hit enter.\n")

  r = changeLevel(0,0,0)
  r = changeLevel(1,0,0)

if __name__ == '__main__':
  main()
#!/usr/bin/env python
# -*- coding: euc-jp -*-
import sys
import time
import spidev

class ADC:
  def __init__(self):
    self.spi = spidev.SpiDev()
    self.spi.open(0, 0)

  def get_value(self, channel):
    sned_ch = [0x00,0x08,0x10,0x18]
    if ((channel > 3) or (channel < 0)):
      return -1
    r = self.spi.xfer2([sned_ch[channel],0,0,0])
    ret = ((r[2] << 6 ) & 0x300) |  ((r[2] << 6) & 0xc0) | ((r[3] >> 2) & 0x3f)
    return ret

  def get_voltage(self, channel):
    ret = self.get_value(channel) * 5.0 / 1024
    return ret

def main():
  adc = ADC()
  while 1:
    adc1 = adc.get_value(0)
    msg1 = "%1.5fV(%04x)" % ((float(adc1)*5/1024),adc1)
    print msg1,

    adc1 = adc.get_value(1)
    msg1 = "%1.5fV(%04x)" % ((float(adc1)*5/1024),adc1)
    print msg1,

    adc2 = adc.get_value(2)
    msg2 = "%1.5fV(%04x)" % ((float(adc2)*5/1024),adc2)
    print msg2,

    adc3 = adc.get_value(3)
    msg3 = "%1.5fV(%04x)" % ((float(adc3)*5/1024),adc3)
    print msg3,

    sys.stdout.write("\n")
    time.sleep(0.5)

if __name__ == '__main__':
  main()
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# -*- Python -*-

"""
 @file Ministick.py
 @brief Phidget ministick sensor component
 @date $Date$


"""
import sys
import time
sys.path.append(".")

# Import RTM module
import RTC
import OpenRTM_aist
import math
import spidev


# Import Service implementation class
# <rtc-template block="service_impl">

# </rtc-template>

# Import Service stub modules
# <rtc-template block="consumer_import">
# </rtc-template>


# This module's spesification
# <rtc-template block="module_spec">
ministick_spec = ["implementation_id", "Ministick",
     "type_name",         "Ministick",
     "description",       "Phidget ministick sensor component",
     "version",           "1.0.0",
     "vendor",            "AIST",
     "category",          "Input Devic",
     "activity_type",     "STATIC",
     "max_instance",      "1",
     "language",          "Python",
     "lang_type",         "SCRIPT",
     "conf.default.scaling", "1.0",
     "conf.default.tread", "0.2",
     "conf.default.print_xy", "NO",
     "conf.default.print_vel", "NO",
     "conf.default.print_wvel", "NO",
     "conf.__widget__.scaling", "slider.0.1",
     "conf.__widget__.tread", "slider.0.01",
     "conf.__widget__.print_xy", "radio",
     "conf.__widget__.print_vel", "radio",
     "conf.__widget__.print_wvel", "radio",
     "conf.__constraints__.scaling", "0.0<=x<=10.0",
     "conf.__constraints__.tread", "0.0<=x<=1.0",
     "conf.__constraints__.print_xy", "(YES,NO)",
     "conf.__constraints__.print_vel", "(YES,NO)",
     "conf.__constraints__.print_wvel", "(YES,NO)",
     ""]
# </rtc-template>

##
# @class Ministick
# @brief Phidget ministick sensor component
#
#
class Ministick(OpenRTM_aist.DataFlowComponentBase):

  ##
  # @brief constructor
  # @param manager Maneger Object
  #
  def __init__(self, manager):
    OpenRTM_aist.DataFlowComponentBase.__init__(self, manager)

    self._d_pos = RTC.TimedFloatSeq(RTC.Time(0,0),[])
    """
    ジョイスティックのX-Y位置データ
     - Semantics: data[0]:x位置, data[1]:y位置
    """
    self._posOut = OpenRTM_aist.OutPort("pos", self._d_pos)
    self._d_vel = RTC.TimedVelocity2D(RTC.Time(0,0),0)
    """
    移動ロボットの速度ベクトル
     - Semantics: vx:並進速度, vy:0.0m va:角速度
     - Unit: vx[m/s], va[rad/s]
    """
    self._velOut = OpenRTM_aist.OutPort("vel", self._d_vel)
    self._d_whell_vel = RTC.TimedFloatSeq(RTC.Time(0,0),[])
    """
    車輪速度
     - Semantics: data[0]:左車輪角速度, data[1]:右車輪角速度
     - Unit: [rad/s]
    """
    self._whell_velOut = OpenRTM_aist.OutPort("whell_vel", self._d_whell_vel)





    # initialize of configuration-data.
    # <rtc-template block="init_conf_param">
    """
    スケーリングファクタ
     - Name:  scaling
     - DefaultValue: 1.0
    """
    self._scaling = [1.0]
    """
    移動ロボットのトレッド値
     - Name:  tread
     - DefaultValue: 0.2
    """
    self._tread = [0.2]
    """

     - Name:  print_xy
     - DefaultValue: NO
    """
    self._print_xy = ['NO']
    """
    velデータのデバッグプリントフラグ
     - Name:  print_vel
     - DefaultValue: NO
    """
    self._print_vel = ['NO']
    """
    wheel_velデータのデバッグプリントフラグ
     - Name:  print_wvel
     - DefaultValue: NO
    """
    self._print_wvel = ['NO']

    # </rtc-template>
    self.x = 0.0
    self.y = 0.0

    self.spi = spidev.SpiDev()
    self.spi.open(0, 0)

  def get_adc(self,  channel):
    sned_ch = [0x00, 0x08, 0x10, 0x18]
    if ((channel > 3) or (channel < 0)):
      return -1
    r = self.spi.xfer2([sned_ch[channel], 0, 0, 0])
    ret = ((r[2] << 6 ) & 0x300) |  ((r[2] << 6) & 0xc0) | ((r[3] >> 2) & 0x3f)
    return ret

  def xy_to_wvel(self,  x,  y):
    th = math.atan2(y,  x)
    v = math.hypot(x,  y)
    vl = v * math.cos(th - (math.pi/4.0))
    vr = v * math.sin(th - (math.pi/4.0))
    return (vl,  vr)

  def wvel_to_vel2d(self,  vl,  vr):
    v = (vr + vl) / 2.0
    if v < 0.0:
      w = - (vr - vl) / self._tread[0]
    else:
      w = (vr - vl) / self._tread[0]
    return RTC.Velocity2D(v,  0.0,  w)

  ##
  #
  # The initialize action (on CREATED->ALIVE transition)
  # formaer rtc_init_entry()
  #
  # @return RTC::ReturnCode_t
  #
  #
  def onInitialize(self):
    # Bind variables and configuration variable
    self.bindParameter("scaling", self._scaling, "1.0")
    self.bindParameter("tread", self._tread, "0.2")
    self.bindParameter("print_xy", self._print_xy, "NO")
    self.bindParameter("print_vel", self._print_vel, "NO")
    self.bindParameter("print_wvel", self._print_wvel, "NO")

    # Set InPort buffers

    # Set OutPort buffers
    self.addOutPort("pos",self._posOut)
    self.addOutPort("vel",self._velOut)
    self.addOutPort("whell_vel",self._whell_velOut)

    # Set service provider to Ports

    # Set service consumers to Ports

    # Set CORBA Service Ports

    self.x_offset_v = 0.0
    self.y_offset_v = 0.0
    for i in range(1,  100):
      self.x_offset_v += self.get_adc(0)
      self.y_offset_v += self.get_adc(1)
    self.x_offset_v = self.x_offset_v / 100.0
    self.y_offset_v = self.y_offset_v / 100.0

    return RTC.RTC_OK

    ##
    #
    # The finalize action (on ALIVE->END transition)
    # formaer rtc_exiting_entry()
    #
    # @return RTC::ReturnCode_t

    #
  def onFinalize(self):

    return RTC.RTC_OK

  #  ##
  #  #
  #  # The startup action when ExecutionContext startup
  #  # former rtc_starting_entry()
  #  #
  #  # @param ec_id target ExecutionContext Id
  #  #
  #  # @return RTC::ReturnCode_t
  #  #
  #  #
  #def onStartup(self, ec_id):
  #
  #  return RTC.RTC_OK

  #  ##
  #  #
  #  # The shutdown action when ExecutionContext stop
  #  # former rtc_stopping_entry()
  #  #
  #  # @param ec_id target ExecutionContext Id
  #  #
  #  # @return RTC::ReturnCode_t
  #  #
  #  #
  #def onShutdown(self, ec_id):
  #
  #  return RTC.RTC_OK

    ##
    #
    # The activated action (Active state entry action)
    # former rtc_active_entry()
    #
    # @param ec_id target ExecutionContext Id
    #
    # @return RTC::ReturnCode_t
    #
    #
  #def onActivated(self, ec_id):

  #  return RTC.RTC_OK

    ##
    #
    # The deactivated action (Active state exit action)
    # former rtc_active_exit()
    #
    # @param ec_id target ExecutionContext Id
    #
    # @return RTC::ReturnCode_t
    #
    #
  def onDeactivated(self, ec_id):

    return RTC.RTC_OK

    ##
    #
    # The execution action that is invoked periodically
    # former rtc_active_do()
    #
    # @param ec_id target ExecutionContext Id
    #
    # @return RTC::ReturnCode_t
    #
    #
  def onExecute(self, ec_id):
    self.x = - (self.get_adc(0) - self.x_offset_v) * self._scaling[0] / 1000.0
    self.y = (self.get_adc(1) - self.y_offset_v) * self._scaling[0] / 1000.0

    if self._print_xy[0] != "NO":
      print "(x,  y) = ",  self.x,  self.y
    self._d_pos.data = [self.x,  self.y]

    self._d_whell_vel.data = self.xy_to_wvel(self.x,  self.y)


    if self._print_wvel[0] != "NO":
      print "(vl,  vr) = ",  self._d_whell_vel.data[0],  self._d_whell_vel.data[1]
    self._d_vel.data = self.wvel_to_vel2d(self._d_whell_vel.data[0],
                                          self._d_whell_vel.data[1])

    if self._print_vel[0] != "NO":
      print "(vx,  va) = ",  self._d_vel.data.vx,  self._d_vel.data.va

    self._posOut.write()
    self._velOut.write()
    self._whell_velOut.write()


    return RTC.RTC_OK

  #  ##
  #  #
  #  # The aborting action when main logic error occurred.
  #  # former rtc_aborting_entry()
  #  #
  #  # @param ec_id target ExecutionContext Id
  #  #
  #  # @return RTC::ReturnCode_t
  #  #
  #  #
  #def onAborting(self, ec_id):
  #
  #  return RTC.RTC_OK

  #  ##
  #  #
  #  # The error action in ERROR state
  #  # former rtc_error_do()
  #  #
  #  # @param ec_id target ExecutionContext Id
  #  #
  #  # @return RTC::ReturnCode_t
  #  #
  #  #
  #def onError(self, ec_id):
  #
  #  return RTC.RTC_OK

  #  ##
  #  #
  #  # The reset action that is invoked resetting
  #  # This is same but different the former rtc_init_entry()
  #  #
  #  # @param ec_id target ExecutionContext Id
  #  #
  #  # @return RTC::ReturnCode_t
  #  #
  #  #
  #def onReset(self, ec_id):
  #
  #  return RTC.RTC_OK

  #  ##
  #  #
  #  # The state update action that is invoked after onExecute() action
  #  # no corresponding operation exists in OpenRTm-aist-0.2.0
  #  #
  #  # @param ec_id target ExecutionContext Id
  #  #
  #  # @return RTC::ReturnCode_t
  #  #

  #  #
  #def onStateUpdate(self, ec_id):
  #
  #  return RTC.RTC_OK

  #  ##
  #  #
  #  # The action that is invoked when execution context's rate is changed
  #  # no corresponding operation exists in OpenRTm-aist-0.2.0
  #  #
  #  # @param ec_id target ExecutionContext Id
  #  #
  #  # @return RTC::ReturnCode_t
  #  #
  #  #
  #def onRateChanged(self, ec_id):
  #
  #  return RTC.RTC_OK




def MinistickInit(manager):
    profile = OpenRTM_aist.Properties(defaults_str=ministick_spec)
    manager.registerFactory(profile,
                            Ministick,
                            OpenRTM_aist.Delete)

def MyModuleInit(manager):
    MinistickInit(manager)

    # Create a component
    comp = manager.createComponent("Ministick")

def main():
  mgr = OpenRTM_aist.Manager.init(sys.argv)
  mgr.setModuleInitProc(MyModuleInit)
  mgr.activateManager()
  mgr.runManager()

if __name__ == "__main__":
  main()

以上是关于python Raspberry pi拡张ユニットテストと実装。的主要内容,如果未能解决你的问题,请参考以下文章

python Raspberry pi拡张ユニットテストと実装。

python SQLでSELECT DISTINCTしてユニークな组み合わせをカウントするような问い合わせを,熊猫のデータフレームに书き変えてみました。以下「お昼のセットメニューの组み合わせに投票」と

慢响应Firebase + Python + Raspberry pi

在Raspberry Pi上使用python3进行Bash

在raspberry pi中安装缺少的python包

并行计算Python / Raspberry Pi