Project

General

Profile

test_Manager_new.py

miyamoto, 01/14/2016 08:45 PM

 
1
#!/usr/bin/env python
2
# -*- coding: euc-jp -*-
3

    
4
#
5
# \file test_Manager_12.py
6
# \brief 
7
# \date $Date: $
8
# \author Nobuhiko Miyamoto
9
#
10

    
11

    
12
import sys
13
sys.path.insert(1,"../")
14

    
15
try:
16
    import unittest2 as unittest
17
except (ImportError):
18
    import unittest
19

    
20
import time
21

    
22
#from Manager import *
23
import OpenRTM_aist
24
import RTC, RTC__POA
25
import OpenRTM, OpenRTM__POA
26

    
27
testcomp1_spec = ["implementation_id", "TestComp1",
28
                 "type_name",         "TestComp1",
29
                 "description",       "Test example component",
30
                 "version",           "1.0",
31
                 "vendor",            "Nobuhiko Myiyamoto",
32
                 "category",          "example",
33
                 "activity_type",     "DataFlowComponent",
34
                 "max_instance",      "10",
35
                 "language",          "C++",
36
                 "lang_type",         "compile",
37
                 ""]
38

    
39
testcomp2_spec = ["implementation_id", "TestComp2",
40
                 "type_name",         "TestComp2",
41
                 "description",       "Test example component",
42
                 "version",           "1.0",
43
                 "vendor",            "Nobuhiko Myiyamoto",
44
                 "category",          "example",
45
                 "activity_type",     "DataFlowComponent",
46
                 "max_instance",      "10",
47
                 "language",          "C++",
48
                 "lang_type",         "compile",
49
                 ""]
50

    
51
class Test_i(OpenRTM__POA.InPortCdr):
52
  def __init__(self):
53
    pass
54
  def put(self, data):
55
    return OpenRTM.PORT_OK
56

    
57

    
58
class TestComp1(OpenRTM_aist.DataFlowComponentBase):
59
  def __init__(self, manager):
60
    OpenRTM_aist.DataFlowComponentBase.__init__(self, manager)
61
    self._d_out = RTC.TimedLong(RTC.Time(0,0),0)
62
    self._outOut = OpenRTM_aist.OutPort("out", self._d_out)
63

    
64
    self._d_in = RTC.TimedLong(RTC.Time(0,0),0)
65
    self._inIn = OpenRTM_aist.InPort("in", self._d_in)
66

    
67
    self._servicePort_provided = OpenRTM_aist.CorbaPort("service")
68
    self._testService_provided = Test_i()
69

    
70
    self._d_topic_out = RTC.TimedLong(RTC.Time(0,0),0)
71
    self._topic_outOut = OpenRTM_aist.OutPort("topic_out", self._d_topic_out)
72
    
73
    self._topic_servicePort_provided = OpenRTM_aist.CorbaPort("topic_service")
74
    self._topic_Service_provided = Test_i()
75

    
76
  def onInitialize(self):
77
    self.addOutPort("out",self._outOut)
78
    self.addInPort("in",self._inIn)
79
    
80
    self._servicePort_provided.registerProvider("service", "TestService", self._testService_provided)
81
    self.addPort(self._servicePort_provided)
82
    
83
    #self._servicePort_provided.activateInterfaces()
84

    
85
    self.addOutPort("topic_out",self._topic_outOut)
86
    self._topic_outOut.appendProperty("publish_topic","test")
87
    
88
    self._topic_servicePort_provided.registerProvider("topic_service", "TestService", self._topic_Service_provided)
89
    self.addPort(self._topic_servicePort_provided)
90
    self._topic_servicePort_provided.appendProperty("publish_topic","test")
91

    
92

    
93
    return RTC.RTC_OK
94
  
95
class TestComp2(OpenRTM_aist.DataFlowComponentBase):
96
  def __init__(self, manager):
97
    OpenRTM_aist.DataFlowComponentBase.__init__(self, manager)
98
    self._d_out = RTC.TimedLong(RTC.Time(0,0),0)
99
    self._outOut = OpenRTM_aist.OutPort("out", self._d_out)
100

    
101
    self._d_in = RTC.TimedLong(RTC.Time(0,0),0)
102
    self._inIn = OpenRTM_aist.InPort("in", self._d_in)
103
    
104
        
105
    
106
    
107
    self._servicePort_required = OpenRTM_aist.CorbaPort("service")
108
    self._testService_required = OpenRTM_aist.CorbaConsumer(interfaceType=OpenRTM.InPortCdr)
109

    
110

    
111
    self._d_topic_in = RTC.TimedLong(RTC.Time(0,0),0)
112
    self._topic_inIn = OpenRTM_aist.InPort("topic_in", self._d_topic_in)
113
    
114
    
115
    
116
    self._topic_servicePort_required = OpenRTM_aist.CorbaPort("topic_service")
117
    self._topic_Service_required = OpenRTM_aist.CorbaConsumer(interfaceType=OpenRTM.InPortCdr)
118
    
119
    return
120
  
121
  def onInitialize(self):
122
    self.addOutPort("out",self._outOut)
123
    self.addInPort("in",self._inIn)
124
    
125
    
126
    
127
    
128
    self._servicePort_required.registerConsumer("service", "TestService", self._testService_required)
129
    self.addPort(self._servicePort_required)
130

    
131
    self.addInPort("topic_in",self._topic_inIn)
132
    self._topic_inIn.appendProperty("publish_topic","test")
133
    
134
        
135
    self._topic_servicePort_required.registerConsumer("topic_service", "TestService", self._topic_Service_required)
136
    self.addPort(self._topic_servicePort_required)
137
    self._topic_servicePort_required.appendProperty("publish_topic","test")
138
    
139
    return RTC.RTC_OK
140
    
141
    
142

    
143
    
144
def TestComp1Init(manager):
145
  profile = OpenRTM_aist.Properties(defaults_str=testcomp1_spec)
146
  manager.registerFactory(profile,
147
                          TestComp1,
148
                          OpenRTM_aist.Delete)
149

    
150

    
151
    
152
def TestComp2Init(manager):
153
  profile = OpenRTM_aist.Properties(defaults_str=testcomp2_spec)
154
  manager.registerFactory(profile,
155
                          TestComp2,
156
                          OpenRTM_aist.Delete)
157
  
158

    
159
def MyModuleInit(manager):
160
  TestComp1Init(manager)
161
  TestComp2Init(manager)
162
  com = manager.createComponent("TestComp1")
163
  com = manager.createComponent("TestComp2")
164
  
165

    
166

    
167

    
168

    
169
class TestManager(unittest.TestCase):
170
  
171
  def setUp(self):
172
    self.dataPortConnectorName = "TestComp20.in:TestComp10.out(interface_type=direct)"
173
    self.servicePortConnectorName = "TestComp10.service:TestComp20.service()"
174
    sys.argv.extend(['-o', 'manager.components.preconnect:'+self.dataPortConnectorName+","+self.servicePortConnectorName])
175
    sys.argv.extend(['-o', 'manager.components.preactivate:TestComp10'])
176
    self.manager = OpenRTM_aist.Manager.init(sys.argv)
177
    self.manager.setModuleInitProc(MyModuleInit)
178
    self.manager.activateManager()
179
    self.comp1 = self.manager.getComponent("TestComp10").getObjRef()
180
    self.comp2 = self.manager.getComponent("TestComp20").getObjRef()
181

    
182
    
183

    
184
  def tearDown(self):
185
    self.manager.shutdownManager()
186
    time.sleep(0.1)
187

    
188
  def test_Topic(self):
189
    
190
    inport = OpenRTM_aist.get_port_by_name(self.comp2, "TestComp20.topic_in")
191
    outport = OpenRTM_aist.get_port_by_name(self.comp1, "TestComp10.topic_out")
192
    ans = OpenRTM_aist.already_connected(inport, outport)
193
    self.assertTrue(ans)
194
    
195
    
196
    
197

    
198
    provided = OpenRTM_aist.get_port_by_name(self.comp1, "TestComp10.topic_service")
199
    required = OpenRTM_aist.get_port_by_name(self.comp2, "TestComp20.topic_service")
200
    ans = OpenRTM_aist.already_connected(provided, required)
201
    self.assertTrue(ans)
202
    
203

    
204
  def test_PreActivation(self):
205
    
206
    state = OpenRTM_aist.is_in_active(self.comp1)
207
    self.assertTrue(state)
208
    
209
                                      
210
  def test_PreConnection(self):
211
    
212
    inport = OpenRTM_aist.get_port_by_name(self.comp2, "TestComp20.in")
213
    outport = OpenRTM_aist.get_port_by_name(self.comp1, "TestComp10.out")
214
    ans = OpenRTM_aist.already_connected(inport, outport)
215
    self.assertTrue(ans)
216
    
217
    #con = OpenRTM_aist.get_connector_names(inport)[0]
218
    profile = inport.get_connector_profiles()[0]
219
    prop = OpenRTM_aist.Properties()
220
    OpenRTM_aist.NVUtil.copyToProperties(prop, profile.properties)
221
    self.assertEqual(prop.getProperty("dataport.interface_type"),"direct")
222
    
223

    
224
    provided = OpenRTM_aist.get_port_by_name(self.comp1, "TestComp10.service")
225
    required = OpenRTM_aist.get_port_by_name(self.comp2, "TestComp20.service")
226
    ans = OpenRTM_aist.already_connected(provided, required)
227
    self.assertTrue(ans)
228
  
229

    
230

    
231
  
232

    
233

    
234
############### test #################
235
if __name__ == '__main__':
236
        unittest.main()