プロジェクト

全般

プロフィール

test_CORBA_RTCUtil.py

miyamoto, 2016/01/16 00:12

 
1
#!/usr/bin/env python
2
# -*- coding: euc-jp -*-
3

    
4
#
5
# \file test_Manager_12.py
6
# \brief 
7
# \date $Date: $
8
# \author Nobuhiko Miyamoto
9
#
10

    
11

    
12
import sys
13
sys.path.insert(1,"../")
14

    
15
try:
16
    import unittest2 as unittest
17
except (ImportError):
18
    import unittest
19

    
20
import time
21

    
22
#from Manager import *
23
import OpenRTM_aist
24
import RTC, RTC__POA
25
import OpenRTM, OpenRTM__POA
26

    
27
testcomp1_spec = ["implementation_id", "TestComp1",
28
                 "type_name",         "TestComp1",
29
                 "description",       "Test example component",
30
                 "version",           "1.0",
31
                 "vendor",            "Nobuhiko Myiyamoto",
32
                 "category",          "example",
33
                 "activity_type",     "DataFlowComponent",
34
                 "max_instance",      "10",
35
                 "language",          "C++",
36
                 "lang_type",         "compile",
37
                 "conf.default.test1", "0",
38
                 ""]
39

    
40
testcomp2_spec = ["implementation_id", "TestComp2",
41
                 "type_name",         "TestComp2",
42
                 "description",       "Test example component",
43
                 "version",           "1.0",
44
                 "vendor",            "Nobuhiko Myiyamoto",
45
                 "category",          "example",
46
                 "activity_type",     "DataFlowComponent",
47
                 "max_instance",      "10",
48
                 "language",          "C++",
49
                 "lang_type",         "compile",
50
                 ""]
51

    
52
class Test_i(OpenRTM__POA.InPortCdr):
53
  def __init__(self):
54
    pass
55
  def put(self, data):
56
    return OpenRTM.PORT_OK
57

    
58

    
59
class TestComp1(OpenRTM_aist.DataFlowComponentBase):
60
  def __init__(self, manager):
61
    OpenRTM_aist.DataFlowComponentBase.__init__(self, manager)
62
    self._d_out = RTC.TimedLong(RTC.Time(0,0),0)
63
    self._outOut = OpenRTM_aist.OutPort("out", self._d_out)
64
    self._d_in = RTC.TimedLong(RTC.Time(0,0),0)
65
    self._inIn = OpenRTM_aist.InPort("in", self._d_in)
66

    
67
    self._servicePort_provided = OpenRTM_aist.CorbaPort("service")
68
    self._testService_provided = Test_i()
69

    
70
    self._test1 = [0]
71

    
72
  def onInitialize(self):
73
    self.addOutPort("out",self._outOut)
74
    self.addInPort("in",self._inIn)
75
    
76
    self._servicePort_provided.registerProvider("service", "TestService", self._testService_provided)
77
    self.addPort(self._servicePort_provided)
78
    
79
    #self._servicePort_provided.activateInterfaces()
80

    
81
    self.bindParameter("test1", self._test1, "0")
82
    
83

    
84
    return RTC.RTC_OK
85
  
86
class TestComp2(OpenRTM_aist.DataFlowComponentBase):
87
  def __init__(self, manager):
88
    OpenRTM_aist.DataFlowComponentBase.__init__(self, manager)
89
    self._d_out = RTC.TimedLong(RTC.Time(0,0),0)
90
    self._outOut = OpenRTM_aist.OutPort("out", self._d_out)
91
    self._d_in = RTC.TimedLong(RTC.Time(0,0),0)
92
    self._inIn = OpenRTM_aist.InPort("in", self._d_in)
93
    
94
        
95
    
96
    
97
    self._servicePort_required = OpenRTM_aist.CorbaPort("service")
98
    self._testService_required = OpenRTM_aist.CorbaConsumer(interfaceType=OpenRTM.InPortCdr)
99

    
100

    
101
    
102
    return
103
  
104
  def onInitialize(self):
105
    self.addInPort("in",self._inIn)
106
    self.addOutPort("out",self._outOut)
107
    
108
    
109
    
110
    self._servicePort_required.registerConsumer("service", "TestService", self._testService_required)
111
    self.addPort(self._servicePort_required)
112

    
113
    
114
    return RTC.RTC_OK
115
    
116
    
117

    
118
    
119
def TestComp1Init(manager):
120
  profile = OpenRTM_aist.Properties(defaults_str=testcomp1_spec)
121
  manager.registerFactory(profile,
122
                          TestComp1,
123
                          OpenRTM_aist.Delete)
124

    
125

    
126
    
127
def TestComp2Init(manager):
128
  profile = OpenRTM_aist.Properties(defaults_str=testcomp2_spec)
129
  manager.registerFactory(profile,
130
                          TestComp2,
131
                          OpenRTM_aist.Delete)
132
  
133

    
134
def MyModuleInit(manager):
135
  TestComp1Init(manager)
136
  TestComp2Init(manager)
137
  com = manager.createComponent("TestComp1")
138
  com = manager.createComponent("TestComp2")
139
  
140

    
141

    
142

    
143

    
144
class test_CORBA_RTCUtil(unittest.TestCase):
145
  
146
  def setUp(self):
147
    sys.argv.extend(['-o', 'exec_cxt.periodic.rate:1000'])
148
    self.manager = OpenRTM_aist.Manager.init(sys.argv)
149
    self.manager.setModuleInitProc(MyModuleInit)
150
    self.manager.activateManager()
151
    self.comp1 = self.manager.getComponent("TestComp10").getObjRef()
152
    self.comp2 = self.manager.getComponent("TestComp20").getObjRef()
153
    
154
    
155

    
156
  def tearDown(self):
157
    self.manager.shutdownManager()
158
    time.sleep(0.1)
159

    
160
  def test_Component(self):
161
    compProf = OpenRTM_aist.get_component_profile(self.comp1)
162
    self.assertEqual(compProf.getProperty("implementation_id"), "TestComp1")
163
    ret = OpenRTM_aist.is_existing(self.comp1)
164
    self.assertFalse(ret)
165

    
166
  def test_EC(self):
167
    ec = OpenRTM_aist.get_actual_ec(self.comp1,0)
168
    ec_id = OpenRTM_aist.get_ec_id(self.comp1, ec)
169
    self.assertEqual(ec_id, 0)
170

    
171
    ret = OpenRTM_aist.add_rtc_to_default_ec(self.comp1, self.comp2)
172
    self.assertEqual(ret, RTC.RTC_OK)
173
    
174

    
175
    ret = OpenRTM_aist.is_alive_in_default_ec(self.comp1)
176
    self.assertEqual(ret, True)
177

    
178
    ret = OpenRTM_aist.set_default_rate(self.comp1, 500)
179
    self.assertEqual(ret, RTC.RTC_OK)
180

    
181
    rate = OpenRTM_aist.get_default_rate(self.comp1)
182
    self.assertEqual(int(rate), 500)
183

    
184
    ret = OpenRTM_aist.set_current_rate(self.comp2,1000, 100)
185
    self.assertEqual(ret, RTC.RTC_OK)
186
    
187
    rate = OpenRTM_aist.get_current_rate(self.comp2, 1000)
188
    self.assertEqual(int(rate), 100)
189
    
190
    ret = OpenRTM_aist.activate(self.comp1,0)
191
    self.assertEqual(ret, RTC.RTC_OK)
192
    state = OpenRTM_aist.is_in_active(self.comp1, 0)
193
    self.assertTrue(state)
194
    
195
    ret = OpenRTM_aist.deactivate(self.comp1,0)
196
    self.assertEqual(ret, RTC.RTC_OK)
197
    state = OpenRTM_aist.is_in_inactive(self.comp1, 0)
198
    self.assertTrue(state)
199
    
200
    #state = OpenRTM_aist.is_in_error(self.comp1, 0)
201
    #ret = OpenRTM_aist.reset(self.comp1,0)
202
    #self.assertEqual(ret, RTC.RTC_OK)
203

    
204
    ret = [None]
205
    ans = OpenRTM_aist.get_state(self.comp1, 0, ret)
206
    self.assertEqual(ret[0], RTC.INACTIVE_STATE)
207

    
208
    ret = OpenRTM_aist.remove_rtc_to_default_ec(self.comp1, self.comp2)
209
    self.assertEqual(ret, RTC.RTC_OK)
210

    
211
    
212
  def test_Port(self):
213
    port_names = OpenRTM_aist.get_port_names(self.comp1)
214
    self.assertTrue("TestComp10.out" in port_names)
215
    
216
    inport_names = OpenRTM_aist.get_inport_names(self.comp2)
217
    self.assertTrue("TestComp20.in" in inport_names)
218
    
219
    outport_names = OpenRTM_aist.get_outport_names(self.comp1)
220
    self.assertTrue("TestComp10.out" in outport_names)
221
    
222
    svcport_names = OpenRTM_aist.get_svcport_names(self.comp1)
223
    self.assertTrue("TestComp10.service" in svcport_names)
224

    
225
    rtc1_port_out = OpenRTM_aist.get_port_by_name(self.comp1,"TestComp10.out")
226
    rtc2_port_out = OpenRTM_aist.get_port_by_name(self.comp2,"TestComp20.out")
227
    rtc2_port_in = OpenRTM_aist.get_port_by_name(self.comp2,"TestComp20.in")
228
    rtc1_port_in = OpenRTM_aist.get_port_by_name(self.comp1,"TestComp10.in")
229
    
230
    pp = rtc1_port_out.get_port_profile()
231
    self.assertEqual(pp.name, "TestComp10.out")
232

    
233
    
234
    prop = OpenRTM_aist.Properties()
235
    connect_profile = OpenRTM_aist.create_connector("con1",prop,rtc1_port_out,rtc2_port_in)
236
    ret = OpenRTM_aist.connect("con1",prop,rtc1_port_out,rtc2_port_in)
237
    self.assertEqual(ret, RTC.RTC_OK)
238

    
239
    con_names = OpenRTM_aist.get_connector_names(rtc1_port_out)
240
    self.assertTrue("con1" in con_names)
241
    con_ids = OpenRTM_aist.get_connector_ids(rtc1_port_out)
242

    
243
    
244
    ret = OpenRTM_aist.already_connected(rtc1_port_out,rtc2_port_in)
245
    self.assertTrue(ret)
246

    
247
    ret = OpenRTM_aist.disconnect(rtc1_port_out.get_connector_profiles()[0])
248
    self.assertEqual(ret, RTC.RTC_OK)
249
    ret = OpenRTM_aist.already_connected(rtc1_port_out,rtc2_port_in)
250
    self.assertFalse(ret)
251

    
252
    
253
    ret = OpenRTM_aist.connect_multi("con2",prop,rtc1_port_out,[rtc1_port_in,rtc2_port_in])
254
    self.assertEqual(ret, RTC.RTC_OK)
255

    
256
    ret = OpenRTM_aist.already_connected(rtc1_port_out,rtc1_port_in)
257
    self.assertTrue(ret)
258

    
259
    ret = OpenRTM_aist.already_connected(rtc1_port_out,rtc2_port_in)
260
    self.assertTrue(ret)
261

    
262
    rtc1_port_out.disconnect_all()
263

    
264
    ret = OpenRTM_aist.connect_by_name("con3",prop,self.comp1,"TestComp10.out",self.comp2,"TestComp20.in")
265
    self.assertEqual(ret, RTC.RTC_OK)
266
    ret = OpenRTM_aist.already_connected(rtc1_port_out,rtc2_port_in)
267
    self.assertTrue(ret)
268

    
269
    ret = OpenRTM_aist.disconnect_by_connector_name(rtc1_port_out, "con3")
270
    self.assertEqual(ret, RTC.RTC_OK)
271
    ret = OpenRTM_aist.already_connected(rtc1_port_out,rtc2_port_in)
272
    self.assertFalse(ret)
273

    
274
    ret = OpenRTM_aist.connect("con1",prop,rtc1_port_out,rtc2_port_in)
275
    ret = OpenRTM_aist.disconnect_by_connector_id(rtc1_port_out,rtc1_port_out.get_connector_profiles()[0].connector_id)
276
    self.assertEqual(ret, RTC.RTC_OK)
277
    ret = OpenRTM_aist.already_connected(rtc1_port_out,rtc2_port_in)
278
    self.assertFalse(ret)
279

    
280
    ret = OpenRTM_aist.connect("con1",prop,rtc1_port_out,rtc2_port_in)
281
    ret = OpenRTM_aist.disconnect_by_port_name(rtc1_port_out,"TestComp20.in")
282
    self.assertEqual(ret, RTC.RTC_OK)
283
    ret = OpenRTM_aist.already_connected(rtc1_port_out,rtc2_port_in)
284
    self.assertFalse(ret)
285
    
286

    
287
  def test_configuration(self):
288
    conf = OpenRTM_aist.get_configuration(self.comp1)
289

    
290

    
291
    ret = OpenRTM_aist.get_parameter_by_key(self.comp1,"default","test1")
292
    self.assertEqual(ret,"0")
293

    
294
    ret = OpenRTM_aist.get_active_configuration(self.comp1)
295

    
296
    ret = OpenRTM_aist.set_configuration(self.comp1,"default","test1","1")
297
    self.assertTrue(ret)
298
    ret = OpenRTM_aist.get_parameter_by_key(self.comp1,"default","test1")
299
    self.assertEqual(ret,"1")
300

    
301
    confsetname = OpenRTM_aist.get_current_configuration_name(self.comp1)
302
    self.assertEqual(confsetname,"default")
303

    
304
    
305
    #ec = self.comp1.get_owned_contexts()[0]
306
    
307
    #print OpenRTM_aist.get_participants_rtc(ec)
308
    
309
    #print ec.add_component(self.comp2)
310
    #print ec.get_profile()
311

    
312
############### test #################
313
if __name__ == '__main__':
314
        unittest.main()