プロジェクト

全般

プロフィール

test_CORBA_RTCUtil.py

miyamoto, 2016/02/26 00:23

 
1
#!/usr/bin/env python
2
# -*- coding: euc-jp -*-
3

    
4
#
5
# \file test_CORBA_RTCUtil.py
6
# \brief 
7
# \date $Date: $
8
# \author Nobuhiko Miyamoto
9
#
10

    
11

    
12
import sys
13
sys.path.insert(1,"../")
14

    
15
try:
16
    import unittest2 as unittest
17
except (ImportError):
18
    import unittest
19

    
20
import time
21

    
22
#from Manager import *
23
import OpenRTM_aist
24
import RTC, RTC__POA
25
import OpenRTM, OpenRTM__POA
26

    
27
testcomp1_spec = ["implementation_id", "TestComp1",
28
                 "type_name",         "TestComp1",
29
                 "description",       "Test example component",
30
                 "version",           "1.0",
31
                 "vendor",            "Nobuhiko Myiyamoto",
32
                 "category",          "example",
33
                 "activity_type",     "DataFlowComponent",
34
                 "max_instance",      "10",
35
                 "language",          "C++",
36
                 "lang_type",         "compile",
37
                 "conf.default.test1", "0",
38
                 ""]
39

    
40
testcomp2_spec = ["implementation_id", "TestComp2",
41
                 "type_name",         "TestComp2",
42
                 "description",       "Test example component",
43
                 "version",           "1.0",
44
                 "vendor",            "Nobuhiko Myiyamoto",
45
                 "category",          "example",
46
                 "activity_type",     "DataFlowComponent",
47
                 "max_instance",      "10",
48
                 "language",          "C++",
49
                 "lang_type",         "compile",
50
                 ""]
51

    
52
class Test_i(OpenRTM__POA.InPortCdr):
53
  def __init__(self):
54
    pass
55
  def put(self, data):
56
    return OpenRTM.PORT_OK
57

    
58

    
59
class TestComp1(OpenRTM_aist.DataFlowComponentBase):
60
  def __init__(self, manager):
61
    OpenRTM_aist.DataFlowComponentBase.__init__(self, manager)
62
    self._d_out = RTC.TimedLong(RTC.Time(0,0),0)
63
    self._outOut = OpenRTM_aist.OutPort("out", self._d_out)
64
    self._d_in = RTC.TimedLong(RTC.Time(0,0),0)
65
    self._inIn = OpenRTM_aist.InPort("in", self._d_in)
66

    
67
    self._servicePort_provided = OpenRTM_aist.CorbaPort("service")
68
    self._testService_provided = Test_i()
69

    
70
    self._test1 = [0]
71

    
72
  def onInitialize(self):
73
    self.addOutPort("out",self._outOut)
74
    self.addInPort("in",self._inIn)
75
    
76
    self._servicePort_provided.registerProvider("service", "TestService", self._testService_provided)
77
    self.addPort(self._servicePort_provided)
78
    
79
    #self._servicePort_provided.activateInterfaces()
80

    
81
    self.bindParameter("test1", self._test1, "0")
82
    
83

    
84
    return RTC.RTC_OK
85
  
86
class TestComp2(OpenRTM_aist.DataFlowComponentBase):
87
  def __init__(self, manager):
88
    OpenRTM_aist.DataFlowComponentBase.__init__(self, manager)
89
    self._d_out = RTC.TimedLong(RTC.Time(0,0),0)
90
    self._outOut = OpenRTM_aist.OutPort("out", self._d_out)
91
    self._d_in = RTC.TimedLong(RTC.Time(0,0),0)
92
    self._inIn = OpenRTM_aist.InPort("in", self._d_in)
93
    
94
        
95
    
96
    
97
    self._servicePort_required = OpenRTM_aist.CorbaPort("service")
98
    self._testService_required = OpenRTM_aist.CorbaConsumer(interfaceType=OpenRTM.InPortCdr)
99

    
100

    
101
    
102
    return
103
  
104
  def onInitialize(self):
105
    self.addInPort("in",self._inIn)
106
    self.addOutPort("out",self._outOut)
107
    
108
    
109
    
110
    self._servicePort_required.registerConsumer("service", "TestService", self._testService_required)
111
    self.addPort(self._servicePort_required)
112

    
113
    
114
    return RTC.RTC_OK
115
    
116
    
117

    
118
    
119
def TestComp1Init(manager):
120
  profile = OpenRTM_aist.Properties(defaults_str=testcomp1_spec)
121
  manager.registerFactory(profile,
122
                          TestComp1,
123
                          OpenRTM_aist.Delete)
124

    
125

    
126
    
127
def TestComp2Init(manager):
128
  profile = OpenRTM_aist.Properties(defaults_str=testcomp2_spec)
129
  manager.registerFactory(profile,
130
                          TestComp2,
131
                          OpenRTM_aist.Delete)
132
  
133

    
134
def MyModuleInit(manager):
135
  TestComp1Init(manager)
136
  TestComp2Init(manager)
137
  com = manager.createComponent("TestComp1")
138
  com = manager.createComponent("TestComp2")
139
  
140

    
141

    
142

    
143

    
144
class test_CORBA_RTCUtil(unittest.TestCase):
145
  
146
  def setUp(self):
147
    sys.argv.extend(['-d'])
148
    sys.argv.extend(['-o', 'exec_cxt.periodic.rate:1000'])
149
    sys.argv.extend(['-o','naming.type:corba,manager'])
150
    self.manager = OpenRTM_aist.Manager.init(sys.argv)
151
    self.manager.setModuleInitProc(MyModuleInit)
152
    self.manager.activateManager()
153
    self.comp1 = self.manager.getComponent("TestComp10").getObjRef()
154
    self.comp2 = self.manager.getComponent("TestComp20").getObjRef()
155
    
156
    
157

    
158
  def tearDown(self):
159
    self.manager.shutdownManager()
160
    time.sleep(0.1)
161

    
162
  def test_Component(self):
163
    compProf = OpenRTM_aist.get_component_profile(self.comp1)
164
    self.assertEqual(compProf.getProperty("implementation_id"), "TestComp1")
165
    ret = OpenRTM_aist.is_existing(self.comp1)
166
    self.assertTrue(ret)
167

    
168
  def test_EC(self):
169
    ec = OpenRTM_aist.get_actual_ec(self.comp1,0)
170
    ec_id = OpenRTM_aist.get_ec_id(self.comp1, ec)
171
    self.assertEqual(ec_id, 0)
172

    
173
    ret = OpenRTM_aist.add_rtc_to_default_ec(self.comp1, self.comp2)
174
    self.assertEqual(ret, RTC.RTC_OK)
175
    
176

    
177
    ret = OpenRTM_aist.is_alive_in_default_ec(self.comp1)
178
    self.assertEqual(ret, True)
179

    
180
    ret = OpenRTM_aist.set_default_rate(self.comp1, 500)
181
    self.assertEqual(ret, RTC.RTC_OK)
182

    
183
    rate = OpenRTM_aist.get_default_rate(self.comp1)
184
    self.assertEqual(int(rate), 500)
185

    
186
    ret = OpenRTM_aist.set_current_rate(self.comp2,1000, 100)
187
    self.assertEqual(ret, RTC.RTC_OK)
188
    
189
    rate = OpenRTM_aist.get_current_rate(self.comp2, 1000)
190
    self.assertEqual(int(rate), 100)
191
    
192
    ret = OpenRTM_aist.activate(self.comp1,0)
193
    self.assertEqual(ret, RTC.RTC_OK)
194
    state = OpenRTM_aist.is_in_active(self.comp1, 0)
195
    self.assertTrue(state)
196
    
197
    ret = OpenRTM_aist.deactivate(self.comp1,0)
198
    self.assertEqual(ret, RTC.RTC_OK)
199
    state = OpenRTM_aist.is_in_inactive(self.comp1, 0)
200
    self.assertTrue(state)
201
    
202
    #state = OpenRTM_aist.is_in_error(self.comp1, 0)
203
    #ret = OpenRTM_aist.reset(self.comp1,0)
204
    #self.assertEqual(ret, RTC.RTC_OK)
205

    
206
    ret = [None]
207
    ans = OpenRTM_aist.get_state(self.comp1, 0, ret)
208
    self.assertEqual(ret[0], RTC.INACTIVE_STATE)
209

    
210
    ret = OpenRTM_aist.remove_rtc_to_default_ec(self.comp1, self.comp2)
211
    self.assertEqual(ret, RTC.RTC_OK)
212

    
213
    
214
  def test_Port(self):
215
    port_names = OpenRTM_aist.get_port_names(self.comp1)
216
    self.assertTrue("TestComp10.out" in port_names)
217
    
218
    inport_names = OpenRTM_aist.get_inport_names(self.comp2)
219
    self.assertTrue("TestComp20.in" in inport_names)
220
    
221
    outport_names = OpenRTM_aist.get_outport_names(self.comp1)
222
    self.assertTrue("TestComp10.out" in outport_names)
223
    
224
    svcport_names = OpenRTM_aist.get_svcport_names(self.comp1)
225
    self.assertTrue("TestComp10.service" in svcport_names)
226

    
227
    rtc1_port_out = OpenRTM_aist.get_port_by_name(self.comp1,"TestComp10.out")
228
    rtc2_port_out = OpenRTM_aist.get_port_by_name(self.comp2,"TestComp20.out")
229
    rtc2_port_in = OpenRTM_aist.get_port_by_name(self.comp2,"TestComp20.in")
230
    rtc1_port_in = OpenRTM_aist.get_port_by_name(self.comp1,"TestComp10.in")
231
    
232
    pp = rtc1_port_out.get_port_profile()
233
    self.assertEqual(pp.name, "TestComp10.out")
234

    
235
    
236
    prop = OpenRTM_aist.Properties()
237
    connect_profile = OpenRTM_aist.create_connector("con1",prop,rtc1_port_out,rtc2_port_in)
238
    ret = OpenRTM_aist.connect("con1",prop,rtc1_port_out,rtc2_port_in)
239
    self.assertEqual(ret, RTC.RTC_OK)
240

    
241
    con_names = OpenRTM_aist.get_connector_names(rtc1_port_out)
242
    self.assertTrue("con1" in con_names)
243
    con_ids = OpenRTM_aist.get_connector_ids(rtc1_port_out)
244

    
245
    
246
    ret = OpenRTM_aist.already_connected(rtc1_port_out,rtc2_port_in)
247
    self.assertTrue(ret)
248

    
249
    ret = OpenRTM_aist.disconnect(rtc1_port_out.get_connector_profiles()[0])
250
    self.assertEqual(ret, RTC.RTC_OK)
251
    ret = OpenRTM_aist.already_connected(rtc1_port_out,rtc2_port_in)
252
    self.assertFalse(ret)
253

    
254
    
255
    
256
    ret = OpenRTM_aist.connect_multi("con2",prop,rtc1_port_out,[rtc1_port_in,rtc2_port_in])
257
    self.assertEqual(ret, RTC.RTC_OK)
258

    
259
    ret = OpenRTM_aist.already_connected(rtc1_port_out,rtc1_port_in)
260
    self.assertTrue(ret)
261

    
262
    ret = OpenRTM_aist.disconnect_all_by_ref(rtc1_port_out)
263
    self.assertTrue(ret)
264
    ret = OpenRTM_aist.already_connected(rtc1_port_out,rtc2_port_in)
265
    self.assertFalse(ret)
266

    
267
    
268

    
269
    ret = OpenRTM_aist.connect_by_name("con3",prop,self.comp1,"TestComp10.out",self.comp2,"TestComp20.in")
270
    self.assertEqual(ret, RTC.RTC_OK)
271
    ret = OpenRTM_aist.already_connected(rtc1_port_out,rtc2_port_in)
272
    self.assertTrue(ret)
273

    
274
    ret = OpenRTM_aist.disconnect_by_portref_connector_name(rtc1_port_out, "con3")
275
    self.assertEqual(ret, RTC.RTC_OK)
276
    ret = OpenRTM_aist.already_connected(rtc1_port_out,rtc2_port_in)
277
    self.assertFalse(ret)
278

    
279
    ret = OpenRTM_aist.connect("con1",prop,rtc1_port_out,rtc2_port_in)
280
    ret = OpenRTM_aist.disconnect_by_portref_connector_id(rtc1_port_out,rtc1_port_out.get_connector_profiles()[0].connector_id)
281
    self.assertEqual(ret, RTC.RTC_OK)
282
    ret = OpenRTM_aist.already_connected(rtc1_port_out,rtc2_port_in)
283
    self.assertFalse(ret)
284

    
285
    ret = OpenRTM_aist.connect("con1",prop,rtc1_port_out,rtc2_port_in)
286
    ret = OpenRTM_aist.disconnect_by_port_name(rtc1_port_out,"TestComp20.in")
287
    self.assertEqual(ret, RTC.RTC_OK)
288
    ret = OpenRTM_aist.already_connected(rtc1_port_out,rtc2_port_in)
289
    self.assertFalse(ret)
290

    
291

    
292
    ret = OpenRTM_aist.connect("con1",prop,rtc1_port_out,rtc2_port_in)
293
    ret = OpenRTM_aist.disconnect_all_by_name("rtcloc://localhost:2810/example/TestComp20.in")
294
    self.assertEqual(ret, RTC.RTC_OK)
295
    ret = OpenRTM_aist.already_connected(rtc1_port_out,rtc2_port_in)
296
    self.assertFalse(ret)
297

    
298
    ret = OpenRTM_aist.connect("con1",prop,rtc1_port_out,rtc2_port_in)
299
    ret = OpenRTM_aist.disconnect_by_portname_connector_name("rtcloc://localhost:2810/example/TestComp20.in","con1")
300
    self.assertEqual(ret, RTC.RTC_OK)
301
    ret = OpenRTM_aist.already_connected(rtc1_port_out,rtc2_port_in)
302
    self.assertFalse(ret)
303

    
304
    ret = OpenRTM_aist.connect("con1",prop,rtc1_port_out,rtc2_port_in)
305
    ret = OpenRTM_aist.disconnect_by_portname_connector_id("rtcloc://localhost:2810/example/TestComp10.out",rtc1_port_out.get_connector_profiles()[0].connector_id)
306
    self.assertEqual(ret, RTC.RTC_OK)
307
    ret = OpenRTM_aist.already_connected(rtc1_port_out,rtc2_port_in)
308
    self.assertFalse(ret)
309

    
310
    
311
    
312

    
313
  def test_configuration(self):
314
    conf = OpenRTM_aist.get_configuration(self.comp1, "default")
315
    self.assertEqual(conf.getProperty("test1"),"0")
316

    
317
    ret = OpenRTM_aist.get_parameter_by_key(self.comp1,"default","test1")
318
    self.assertEqual(ret,"0")
319

    
320
    ret = OpenRTM_aist.get_active_configuration(self.comp1)
321

    
322
    ret = OpenRTM_aist.set_configuration(self.comp1,"default","test1","1")
323
    self.assertTrue(ret)
324
    ret = OpenRTM_aist.get_parameter_by_key(self.comp1,"default","test1")
325
    self.assertEqual(ret,"1")
326

    
327
    ret = OpenRTM_aist.set_active_configuration(self.comp1,"test1","2")
328
    self.assertTrue(ret)
329
    ret = OpenRTM_aist.get_parameter_by_key(self.comp1,"default","test1")
330
    self.assertEqual(ret,"2")
331

    
332
    confsetname = OpenRTM_aist.get_active_configuration_name(self.comp1)
333
    self.assertEqual(confsetname,"default")
334

    
335
    
336
    #ec = self.comp1.get_owned_contexts()[0]
337
    
338
    #print OpenRTM_aist.get_participants_rtc(ec)
339
    
340
    #print ec.add_component(self.comp2)
341
    #print ec.get_profile()
342
    #prop = OpenRTM_aist.Properties()
343
    #conf = self.comp1.get_configuration()
344
    #print conf.get_configuration_sets()
345
    #confsets = conf.get_configuration_sets()
346
    #print confsets
347
    #OpenRTM_aist.NVUtil.copyToProperties(prop,confsets)
348
    #print prop
349
    # self.comp1.get_status_list()
350

    
351
############### test #################
352
if __name__ == '__main__':
353
        unittest.main()