プロジェクト

全般

プロフィール

test_Manager_new2.py

miyamoto, 2016/02/26 18:33

 
1
#!/usr/bin/env python
2
# -*- coding: euc-jp -*-
3

    
4
#
5
# \file test_Manager_12.py
6
# \brief 
7
# \date $Date: $
8
# \author Nobuhiko Miyamoto
9
#
10

    
11

    
12
import sys
13
sys.path.insert(1,"../")
14

    
15
try:
16
    import unittest2 as unittest
17
except (ImportError):
18
    import unittest
19

    
20
import time
21

    
22
#from Manager import *
23
import OpenRTM_aist
24
import RTC, RTC__POA
25
import multiprocessing
26
import OpenRTM, OpenRTM__POA
27

    
28
testcomp1_spec = ["implementation_id", "TestComp1",
29
                 "type_name",         "TestComp1",
30
                 "description",       "Test example component",
31
                 "version",           "1.0",
32
                 "vendor",            "Nobuhiko Myiyamoto",
33
                 "category",          "example",
34
                 "activity_type",     "DataFlowComponent",
35
                 "max_instance",      "10",
36
                 "language",          "C++",
37
                 "lang_type",         "compile",
38
                 "conf.default.test1", "0",
39
                 ""]
40

    
41
testcomp2_spec = ["implementation_id", "TestComp2",
42
                 "type_name",         "TestComp2",
43
                 "description",       "Test example component",
44
                 "version",           "1.0",
45
                 "vendor",            "Nobuhiko Myiyamoto",
46
                 "category",          "example",
47
                 "activity_type",     "DataFlowComponent",
48
                 "max_instance",      "10",
49
                 "language",          "C++",
50
                 "lang_type",         "compile",
51
                 ""]
52

    
53
class Test_i(OpenRTM__POA.InPortCdr):
54
  def __init__(self):
55
    pass
56
  def put(self, data):
57
    return OpenRTM.PORT_OK
58

    
59

    
60
class TestComp1(OpenRTM_aist.DataFlowComponentBase):
61
  def __init__(self, manager):
62
    OpenRTM_aist.DataFlowComponentBase.__init__(self, manager)
63
    self._d_out = RTC.TimedLong(RTC.Time(0,0),0)
64
    self._outOut = OpenRTM_aist.OutPort("out", self._d_out)
65

    
66
    self._d_in = RTC.TimedLong(RTC.Time(0,0),0)
67
    self._inIn = OpenRTM_aist.InPort("in", self._d_in)
68

    
69
    self._servicePort_provided = OpenRTM_aist.CorbaPort("service")
70
    self._testService_provided = Test_i()
71

    
72
  def onInitialize(self):
73
    self.addOutPort("out",self._outOut)
74
    self.addInPort("in",self._inIn)
75
    
76
    self._servicePort_provided.registerProvider("service", "TestService", self._testService_provided)
77
    self.addPort(self._servicePort_provided)
78

    
79
    return RTC.RTC_OK
80
  
81
class TestComp2(OpenRTM_aist.DataFlowComponentBase):
82
  def __init__(self, manager):
83
    OpenRTM_aist.DataFlowComponentBase.__init__(self, manager)
84
    self._d_out = RTC.TimedLong(RTC.Time(0,0),0)
85
    self._outOut = OpenRTM_aist.OutPort("out", self._d_out)
86

    
87
    self._d_in = RTC.TimedLong(RTC.Time(0,0),0)
88
    self._inIn = OpenRTM_aist.InPort("in", self._d_in)
89

    
90
    self._servicePort_required = OpenRTM_aist.CorbaPort("service")
91
    self._testService_required = OpenRTM_aist.CorbaConsumer(interfaceType=OpenRTM.InPortCdr)
92

    
93

    
94
    
95
    return
96
  
97
  def onInitialize(self):
98
    self.addOutPort("out",self._outOut)
99
    self.addInPort("in",self._inIn)
100
    
101
    
102
    
103
    
104
    self._servicePort_required.registerConsumer("service", "TestService", self._testService_required)
105
    self.addPort(self._servicePort_required)
106
    
107
    return RTC.RTC_OK
108
    
109
    
110

    
111
    
112
def TestComp1Init(manager):
113
  profile = OpenRTM_aist.Properties(defaults_str=testcomp1_spec)
114
  manager.registerFactory(profile,
115
                          TestComp1,
116
                          OpenRTM_aist.Delete)
117

    
118

    
119
    
120
def TestComp2Init(manager):
121
  profile = OpenRTM_aist.Properties(defaults_str=testcomp2_spec)
122
  manager.registerFactory(profile,
123
                          TestComp2,
124
                          OpenRTM_aist.Delete)
125
  
126

    
127
def TestComp1ModuleInit(manager):
128
  TestComp1Init(manager)
129
  com = manager.createComponent("TestComp1")
130

    
131
def TestComp2ModuleInit(manager):
132
  TestComp2Init(manager)
133
  com = manager.createComponent("TestComp2")
134
  
135

    
136
def runTestComp2(q):
137
    
138
    argv = [""]
139
    argv.extend(['-d'])
140
    argv.extend(['-o','naming.type:corba,manager'])
141
    argv.extend(['-o','naming.formats:test.host_cxt/%n.rtc'])
142

    
143
    
144
    manager = OpenRTM_aist.Manager.init(argv)
145
    
146
    manager.setModuleInitProc(TestComp2ModuleInit)
147
    manager.activateManager()
148
    
149
    
150
    q.get()
151
        
152
    manager.shutdownManager()
153

    
154

    
155
class test_Manager_new2(unittest.TestCase):
156
  
157
  def setUp(self):
158
    self.queue = multiprocessing.Queue()
159
    self.outport_process = multiprocessing.Process(target=runTestComp2, args=(self.queue,))
160
    self.outport_process.start()
161
    
162
    time.sleep(1)
163
    sys.argv.extend(['-o','naming.type:corba,manager'])
164
    sys.argv.extend(['-o','naming.formats:test.host_cxt/%n.rtc'])
165

    
166
    RTC2_URL = "rtcloc://localhost:2810/example/TestComp20"
167
    self.dataPortConnectorName = RTC2_URL+".in"+"^TestComp10.out()"
168
    self.servicePortConnectorName = "TestComp10.service^"+RTC2_URL+".service()"
169
    sys.argv.extend(['-o', 'manager.components.preconnect:'+self.dataPortConnectorName+","+self.servicePortConnectorName])
170
    sys.argv.extend(['-o', 'manager.components.preactivate:'+RTC2_URL])
171
    
172
    self.manager = OpenRTM_aist.Manager.init(sys.argv)
173
    self.manager.setModuleInitProc(TestComp1ModuleInit)
174
    self.manager.activateManager()
175

    
176
    self.comp1 = self.manager.getComponent("TestComp10").getObjRef()
177
    self.comp2 = self.manager._namingManager.string_to_component(RTC2_URL)[0]
178
    
179

    
180
  def tearDown(self):
181
    self.manager.shutdownManager()
182
    self.queue.put("")
183
    time.sleep(0.1)
184

    
185
  def test_PreActivation(self):
186
    state = OpenRTM_aist.is_in_active(self.comp2)
187
    self.assertTrue(state)
188

    
189
  def test_PreConnection(self):
190
    inport = OpenRTM_aist.get_port_by_name(self.comp2, "TestComp20.in")
191
    outport = OpenRTM_aist.get_port_by_name(self.comp1, "TestComp10.out")
192
    ans = OpenRTM_aist.already_connected(inport, outport)
193
    self.assertTrue(ans)
194
    
195

    
196
############### test #################
197
if __name__ == '__main__':
198
        unittest.main()