[openrtm-commit:00347] r455 - branches/RELENG_1_1/OpenRTM-aist-Python/OpenRTM_aist/ext/sdo/observer/test

openrtm @ openrtm.org openrtm @ openrtm.org
2011年 8月 12日 (金) 11:16:19 JST


Author: kurihara
Date: 2011-08-12 11:16:18 +0900 (Fri, 12 Aug 2011)
New Revision: 455

Added:
   branches/RELENG_1_1/OpenRTM-aist-Python/OpenRTM_aist/ext/sdo/observer/test/COCTestRTC.py
   branches/RELENG_1_1/OpenRTM-aist-Python/OpenRTM_aist/ext/sdo/observer/test/ComponentObserverProvider.py
   branches/RELENG_1_1/OpenRTM-aist-Python/OpenRTM_aist/ext/sdo/observer/test/rtc.conf
Log:
Some test programs for ComponentObserverConsumer have been added.
-ComponentObserverProvider.py: Provider side (test for ComponentObserverConsumer).
-COCTestRTC.py: Sample component using the ComponentObserverConsumer.
-rtc.conf: Configuration file for COCTestRTC.py

Added: branches/RELENG_1_1/OpenRTM-aist-Python/OpenRTM_aist/ext/sdo/observer/test/COCTestRTC.py
===================================================================
--- branches/RELENG_1_1/OpenRTM-aist-Python/OpenRTM_aist/ext/sdo/observer/test/COCTestRTC.py	                        (rev 0)
+++ branches/RELENG_1_1/OpenRTM-aist-Python/OpenRTM_aist/ext/sdo/observer/test/COCTestRTC.py	2011-08-12 02:16:18 UTC (rev 455)
@@ -0,0 +1,134 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+# -*- Python -*-
+
+import sys
+
+import RTC
+import OpenRTM_aist
+
+coctestrtc_spec = ["implementation_id", "COCTestRTC",
+                  "type_name",         "COCTestRTC",
+                  "description",       "Console input component",
+                  "version",           "1.0",
+                  "vendor",            "Shinji Kurihara",
+                  "category",          "example",
+                  "activity_type",     "DataFlowComponent",
+                  "max_instance",      "10",
+                  "language",          "Python",
+                  "lang_type",         "script",
+                  ""]
+
+
+class DataListener(OpenRTM_aist.ConnectorDataListenerT):
+  def __init__(self, name):
+    self._name = name
+    
+  def __del__(self):
+    print "dtor of ", self._name
+
+  def __call__(self, info, cdrdata):
+    data = OpenRTM_aist.ConnectorDataListenerT.__call__(self, info, cdrdata, RTC.TimedLong(RTC.Time(0,0),0))
+    print "------------------------------"
+    print "Listener:       ", self._name
+    print "Profile::name:  ", info.name
+    print "Profile::id:    ", info.id
+    print "Data:           ", data.data
+    print "------------------------------"
+    
+class ConnListener(OpenRTM_aist.ConnectorListener):
+  def __init__(self, name):
+    self._name = name
+
+  def __del__(self):
+    print "dtor of ", self._name
+
+  def __call__(self, info):
+    print "------------------------------"
+    print "Listener:       ", self._name
+    print "Profile::name:  ", info.name
+    print "Profile::id:    ", info.id
+    print "------------------------------"
+
+
+class COCTestRTC(OpenRTM_aist.DataFlowComponentBase):
+  def __init__(self, manager):
+    OpenRTM_aist.DataFlowComponentBase.__init__(self, manager)
+    return
+
+  def onInitialize(self):
+    self._data = RTC.TimedLong(RTC.Time(0,0),0)
+    self._outport = OpenRTM_aist.OutPort("out", self._data)
+    # Set OutPort buffer
+    self.addOutPort("out", self._outport)
+    self._outport.addConnectorDataListener(OpenRTM_aist.ConnectorDataListenerType.ON_BUFFER_WRITE,
+                                           DataListener("ON_BUFFER_WRITE"))
+    self._outport.addConnectorDataListener(OpenRTM_aist.ConnectorDataListenerType.ON_BUFFER_FULL, 
+                                           DataListener("ON_BUFFER_FULL"))
+    self._outport.addConnectorDataListener(OpenRTM_aist.ConnectorDataListenerType.ON_BUFFER_WRITE_TIMEOUT, 
+                                           DataListener("ON_BUFFER_WRITE_TIMEOUT"))
+    self._outport.addConnectorDataListener(OpenRTM_aist.ConnectorDataListenerType.ON_BUFFER_OVERWRITE, 
+                                           DataListener("ON_BUFFER_OVERWRITE"))
+    self._outport.addConnectorDataListener(OpenRTM_aist.ConnectorDataListenerType.ON_BUFFER_READ, 
+                                           DataListener("ON_BUFFER_READ"))
+    self._outport.addConnectorDataListener(OpenRTM_aist.ConnectorDataListenerType.ON_SEND, 
+                                           DataListener("ON_SEND"))
+    self._outport.addConnectorDataListener(OpenRTM_aist.ConnectorDataListenerType.ON_RECEIVED,
+                                           DataListener("ON_RECEIVED"))
+    self._outport.addConnectorDataListener(OpenRTM_aist.ConnectorDataListenerType.ON_RECEIVER_FULL, 
+                                           DataListener("ON_RECEIVER_FULL"))
+    self._outport.addConnectorDataListener(OpenRTM_aist.ConnectorDataListenerType.ON_RECEIVER_TIMEOUT, 
+                                           DataListener("ON_RECEIVER_TIMEOUT"))
+    self._outport.addConnectorDataListener(OpenRTM_aist.ConnectorDataListenerType.ON_RECEIVER_ERROR,
+                                           DataListener("ON_RECEIVER_ERROR"))
+
+    self._outport.addConnectorListener(OpenRTM_aist.ConnectorListenerType.ON_CONNECT,
+                                       ConnListener("ON_CONNECT"))
+    self._outport.addConnectorListener(OpenRTM_aist.ConnectorListenerType.ON_DISCONNECT,
+                                       ConnListener("ON_DISCONNECT"))
+
+    return RTC.RTC_OK
+
+        
+  def onExecute(self, ec_id):
+    print "Please input number: ",
+    self._data.data = long(sys.stdin.readline())
+    OpenRTM_aist.setTimestamp(self._data)
+    print "Sending to subscriber: ", self._data.data
+    self._outport.write()
+    return RTC.RTC_OK
+
+
+def COCTestRTCInit(manager):
+  profile = OpenRTM_aist.Properties(defaults_str=coctestrtc_spec)
+  manager.registerFactory(profile,
+                          COCTestRTC,
+                          OpenRTM_aist.Delete)
+
+
+def MyModuleInit(manager):
+  COCTestRTCInit(manager)
+
+  # Create a component
+  comp = manager.createComponent("COCTestRTC")
+
+def main():
+  # Initialize manager
+  mgr = OpenRTM_aist.Manager.init(sys.argv)
+
+  # Set module initialization proceduer
+  # This procedure will be invoked in activateManager() function.
+  mgr.setModuleInitProc(MyModuleInit)
+
+  # Activate manager and register to naming service
+  mgr.activateManager()
+
+  # run the manager in blocking mode
+  # runManager(False) is the default
+  mgr.runManager()
+
+  # If you want to run the manager in non-blocking mode, do like this
+  # mgr.runManager(True)
+
+if __name__ == "__main__":
+  main()

Added: branches/RELENG_1_1/OpenRTM-aist-Python/OpenRTM_aist/ext/sdo/observer/test/ComponentObserverProvider.py
===================================================================
--- branches/RELENG_1_1/OpenRTM-aist-Python/OpenRTM_aist/ext/sdo/observer/test/ComponentObserverProvider.py	                        (rev 0)
+++ branches/RELENG_1_1/OpenRTM-aist-Python/OpenRTM_aist/ext/sdo/observer/test/ComponentObserverProvider.py	2011-08-12 02:16:18 UTC (rev 455)
@@ -0,0 +1,63 @@
+#!/usr/bin/env python
+# -*- coding: euc-jp -*-
+
+##
+# @file ComponentObserverProvider.py
+# @brief test for ComponentObserverConsumer
+# @date $Date$
+# @author Shinji Kurihara
+#
+# Copyright (C) 2011
+#     Noriaki Ando
+#     Intelligent Systems Research Institute,
+#     National Institute of
+#         Advanced Industrial Science and Technology (AIST), Japan
+#     All rights reserved.
+#
+
+import sys
+
+from omniORB import CORBA, PortableServer
+import RTC
+import OpenRTM, OpenRTM__POA
+import SDOPackage
+import OpenRTM_aist
+
+class ComponentObserver_i(OpenRTM__POA.ComponentObserver):
+  def __init__(self):
+    pass
+
+  def update_status(self, status_kind, hint):
+    print "update_status: ", status_kind, ", ", hint
+    return
+
+def main():
+  orb = CORBA.ORB_init(sys.argv)
+  poa = orb.resolve_initial_references("RootPOA")
+  poa._get_the_POAManager().activate()
+  naming = OpenRTM_aist.CorbaNaming(orb, "localhost")
+  servant = ComponentObserver_i()
+  oid = poa.servant_to_id(servant)
+  provider = poa.id_to_reference(oid)
+
+  rtc = naming.resolve("ConsoleIn0.rtc")._narrow(RTC.RTObject)
+  config = rtc.get_configuration()
+  properties = [OpenRTM_aist.NVUtil.newNV("heartbeat.enable","YES"),
+                OpenRTM_aist.NVUtil.newNV("heartbeat.interval","10"),
+                OpenRTM_aist.NVUtil.newNV("observed_status","ALL")]
+
+  id = OpenRTM_aist.toTypename(servant)
+  sprof = SDOPackage.ServiceProfile("test_id", id,
+                                    properties, provider)
+
+  ret = config.add_service_profile(sprof)
+  flag = True
+  print "If you exit program, please input 'q'."
+  sys.stdin.readline()
+  ret = config.remove_service_profile("test_id")
+  print "test program end. ret : ", ret
+  return
+    
+############### test #################
+if __name__ == '__main__':
+  main()

Added: branches/RELENG_1_1/OpenRTM-aist-Python/OpenRTM_aist/ext/sdo/observer/test/rtc.conf
===================================================================
--- branches/RELENG_1_1/OpenRTM-aist-Python/OpenRTM_aist/ext/sdo/observer/test/rtc.conf	                        (rev 0)
+++ branches/RELENG_1_1/OpenRTM-aist-Python/OpenRTM_aist/ext/sdo/observer/test/rtc.conf	2011-08-12 02:16:18 UTC (rev 455)
@@ -0,0 +1,8 @@
+corba.nameservers: localhost
+naming.formats: %n.rtc
+logger.enable: NO
+logger.log_level: VERBOSE
+#logger.file_name: rtc%p.log,    STDOUT
+#manager.naming_formats: %n.mgr
+manager.modules.load_path:  ../
+manager.modules.preload: ComponentObserverConsumer



openrtm-commit メーリングリストの案内