1
|
|
2
|
|
3
|
|
4
|
|
5
|
|
6
|
|
7
|
|
8
|
|
9
|
|
10
|
|
11
|
|
12
|
import sys
|
13
|
sys.path.insert(1,"../")
|
14
|
|
15
|
try:
|
16
|
import unittest2 as unittest
|
17
|
except (ImportError):
|
18
|
import unittest
|
19
|
|
20
|
import time
|
21
|
|
22
|
|
23
|
import OpenRTM_aist
|
24
|
import RTC, RTC__POA
|
25
|
import multiprocessing
|
26
|
|
27
|
testcomp1_spec = ["implementation_id", "TestComp1",
|
28
|
"type_name", "TestComp1",
|
29
|
"description", "Test example component",
|
30
|
"version", "1.0",
|
31
|
"vendor", "Nobuhiko Myiyamoto",
|
32
|
"category", "example",
|
33
|
"activity_type", "DataFlowComponent",
|
34
|
"max_instance", "10",
|
35
|
"language", "C++",
|
36
|
"lang_type", "compile",
|
37
|
"conf.default.test1", "0",
|
38
|
""]
|
39
|
|
40
|
|
41
|
|
42
|
|
43
|
|
44
|
|
45
|
class TestComp1(OpenRTM_aist.DataFlowComponentBase):
|
46
|
def __init__(self, manager):
|
47
|
OpenRTM_aist.DataFlowComponentBase.__init__(self, manager)
|
48
|
|
49
|
|
50
|
def onInitialize(self):
|
51
|
|
52
|
return RTC.RTC_OK
|
53
|
|
54
|
|
55
|
|
56
|
|
57
|
|
58
|
|
59
|
def TestComp1Init(manager):
|
60
|
profile = OpenRTM_aist.Properties(defaults_str=testcomp1_spec)
|
61
|
manager.registerFactory(profile,
|
62
|
TestComp1,
|
63
|
OpenRTM_aist.Delete)
|
64
|
|
65
|
|
66
|
|
67
|
|
68
|
|
69
|
|
70
|
def TestComp1ModuleInit(manager):
|
71
|
TestComp1Init(manager)
|
72
|
com = manager.createComponent("TestComp1")
|
73
|
|
74
|
|
75
|
|
76
|
|
77
|
def runTestComp2(q):
|
78
|
|
79
|
argv = [""]
|
80
|
argv.extend(['-d'])
|
81
|
argv.extend(['-o','manager.components.naming_policy:ns_unique'])
|
82
|
|
83
|
|
84
|
manager = OpenRTM_aist.Manager.init(argv)
|
85
|
manager.setModuleInitProc(TestComp1ModuleInit)
|
86
|
manager.activateManager()
|
87
|
|
88
|
|
89
|
q.get()
|
90
|
|
91
|
manager.shutdownManager()
|
92
|
|
93
|
|
94
|
class test_CORBA_RTCUtil(unittest.TestCase):
|
95
|
|
96
|
def setUp(self):
|
97
|
self.queue = multiprocessing.Queue()
|
98
|
self.outport_process = multiprocessing.Process(target=runTestComp2, args=(self.queue,))
|
99
|
self.outport_process.start()
|
100
|
|
101
|
time.sleep(1)
|
102
|
sys.argv.extend(['-o','manager.components.naming_policy:ns_unique'])
|
103
|
self.manager = OpenRTM_aist.Manager.init(sys.argv)
|
104
|
self.manager.setModuleInitProc(TestComp1ModuleInit)
|
105
|
self.manager.activateManager()
|
106
|
|
107
|
|
108
|
def tearDown(self):
|
109
|
self.manager.shutdownManager()
|
110
|
self.queue.put("")
|
111
|
time.sleep(0.1)
|
112
|
|
113
|
def test_getComponent(self):
|
114
|
comp = self.manager.getComponent("TestComp11")
|
115
|
self.assertTrue(comp is not None)
|
116
|
|
117
|
|
118
|
if __name__ == '__main__':
|
119
|
unittest.main()
|