We will now add the NXTBrick.py functionality to the generated component.
NXTBrick.py import: Import the NXTBrick.py file to get access to the NXTBrick class. The file extension is not necessary in the import statement.
import NXTBrick
Implement onInitialize(self): In onInitialize(), instantiate the NXTBrick class. If this causes an error, return RTC_ERROR and the component will transition to the end state.
Implement onActivated(self, ec_id) and onDeactivated(self, ec_id): In onActivated() and onDeactivated(), the NXTBrick class's resetPosition() method should be called.
self._nxtbrick.resetPosition()
Implement onExecute(self, ec_id): The following steps should be performed by onExecute():
Read speeds from the data InPort
Based on configuration values, which port of the NXT the speed values are to be sent to should be set.
Call the setMotors() method of the NXTBrick class to set motor speeds.
Call the getSensors() method of the NXTBrick class to read the ultrasonic sensor data, and write this to an OutPort.
Call the getMotors() method of the NXTBrick class to get the motor encoder angles, and write this to an OutPort.
Code that implements the above functionality is shown below:
This sample code adds a callback, OnWrite, to the above sample. When data is written to the InPort's buffer, the motor's speeds will be set immediately.
Callback class
Create a callback class as below.
# @class CallBackClass
# @brief callback class
#
# when data is written in the buffer of InPort,
# it is called.
class CallBackClass:
def __init__(self, nxtbrick_, map_):
self._nxtbrick = nxtbrick_
self._map = map_
self._mapping = {'A':0,'B':1,'C':2}
def __call__(self, pData):
vel_ = [0,0,0]
vel_[self._mapping[self._map[0][0]]] = pData.data[0]
vel_[self._mapping[self._map[0][1]]] = pData.data[1]
# set velocity
self._nxtbrick.setMotors(vel_)
This class receives a NXTBrick instance and a configuration parameter map in its constructor.
Callback class registration
Use the setOnWrite() method to register a CallBackClass instance with the component.
# set callback class
self._velIn.setOnWrite(CallBackClass(self._ntxbrick,self._map))
After this call to setOnWrite(), whenever data is written to the InPort, CallBackClass.call() will be called.
Sample code that implements the complete component using a callback class is shown below.
#!/usr/bin/env python
# -*- coding:shift_jis -*-
# -*- Python -*-
import sys
import time
sys.path.append(".")
# Import RTM module
import OpenRTM
import RTC
# import NXTBrick class
import NXTBrick
# This module's spesification
# <rtc-template block="module_spec">
nxtrtc_spec = ["implementation_id", "NXTRTC",
"type_name", "NXTRTC",
"description", "NXT sample component",
"version", "0.1",
"vendor", "AIST",
"category", "example",
"activity_type", "DataFlowComponent",
"max_instance", "10",
"language", "Python",
"lang_type", "SCRIPT",
"conf.default.map", "A,B",
""]
# </rtc-template>
# @class CallBackClass
# @brief callback class
#
# when data is written in the buffer of InPort,
# it is called.
class CallBackClass:
def __init__(self, nxtbrick_, map_):
self._nxtbrick = nxtbrick_
self._map = map_
self._mapping = {'A':0,'B':1,'C':2}
def __call__(self, pData):
vel_ = [0,0,0]
vel_[self._mapping[self._map[0][0]]] = pData.data[0]
vel_[self._mapping[self._map[0][1]]] = pData.data[1]
# set velocity
self._nxtbrick.setMotors(vel_)
class NXTRTC(OpenRTM.DataFlowComponentBase):
def __init__(self, manager):
OpenRTM.DataFlowComponentBase.__init__(self, manager)
# DataPorts initialization
# <rtc-template block="data_ports">
self._d_vel = RTC.TimedFloatSeq(RTC.Time(0,0),[])
self._velIn = OpenRTM.InPort("vel", self._d_vel, OpenRTM.RingBuffer(8))
self.registerInPort("vel",self._velIn)
self._d_pos = RTC.TimedFloatSeq(RTC.Time(0,0),[])
self._posOut = OpenRTM.OutPort("pos", self._d_pos, OpenRTM.RingBuffer(8))
self.registerOutPort("pos",self._posOut)
self._d_sens = RTC.TimedFloatSeq(RTC.Time(0,0),[])
self._sensOut = OpenRTM.OutPort("sens", self._d_sens, OpenRTM.RingBuffer(8))
self.registerOutPort("sens",self._sensOut)
# initialize of configuration-data.
# <rtc-template block="configurations">
self._map = [['A', 'B']]
self._nxtbrick = None
self._mapping = {'A':0,'B':1,'C':2}
def onInitialize(self):
# Bind variables and configuration variable
# <rtc-template block="bind_config">
self.bindParameter("map", self._map, "A,B")
# create NXTBrick object
try:
print "Connecting to NXT brick ...."
self._nxtbrick = NXTBrick.NXTBrick()
print "Connection established."
except:
print "NXTBrick connection failed."
return RTC.RTC_ERROR
# set callback class
self._velIn.setOnWrite(CallBackClass(self._ntxbrick,self._map))
return RTC.RTC_OK
def onFinalize(self):
self._nxtbrick.close()
def onActivated(self, ec_id):
# reset NXTBrick's position.
self._nxtbrick.resetPosition()
return RTC.RTC_OK
def onDeactivated(self, ec_id):
# reset NXTBrick's position.
self._nxtbrick.resetPosition()
return RTC.RTC_OK
def onExecute(self, ec_id):
# get sensor data.
sensor_ = self._nxtbrick.getSensors()
if sensor_:
self._d_sens.data = [sensor_[3]]
# write sensor data to outport.
self._sensOut.write()
# get position data.
position_ = self._nxtbrick.getMotors()
if position_:
self._d_pos.data = [position_[self._mapping[self._map[0][0]]][9],position_[self._mapping[self._map[0][1]]][9]]
# write position data to outport.
self._posOut.write()
return RTC.RTC_OK
def MyModuleInit(manager):
profile = OpenRTM.Properties(defaults_str=nxtrtc_spec)
manager.registerFactory(profile,
NXTRTC,
OpenRTM.Delete)
# Create a component
comp = manager.createComponent("NXTRTC")
def main():
mgr = OpenRTM.Manager.init(len(sys.argv), sys.argv)
#mgr = OpenRTM.Manager.init(sys.argv)
mgr.setModuleInitProc(MyModuleInit)
mgr.activateManager()
mgr.runManager()
if __name__ == "__main__":
main()
In the first example, the motor output, sensor reading and motor encoder reading are all done in a single synchronous loop. By using a callback, the motor output is done asynchronously when data arrives.
Sample code explanation
We will now add the NXTBrick.py functionality to the generated component.
Code that implements the above functionality is shown below:
Sample code explanation (Callback object usage example)
This sample code adds a callback, OnWrite, to the above sample. When data is written to the InPort's buffer, the motor's speeds will be set immediately.
Create a callback class as below.
This class receives a NXTBrick instance and a configuration parameter map in its constructor.
Use the setOnWrite() method to register a CallBackClass instance with the component.
After this call to setOnWrite(), whenever data is written to the InPort, CallBackClass.call() will be called.
Sample code that implements the complete component using a callback class is shown below.
In the first example, the motor output, sensor reading and motor encoder reading are all done in a single synchronous loop. By using a callback, the motor output is done asynchronously when data arrives.