Project

General

Profile

test_Manager_Servant_new.py

miyamoto, 02/27/2016 05:22 PM

 
1
#!/usr/bin/env python
2
# -*- coding: euc-jp -*-
3

    
4
#
5
# \file test_Manager_12.py
6
# \brief 
7
# \date $Date: $
8
# \author Nobuhiko Miyamoto
9
#
10

    
11

    
12
import sys
13
sys.path.insert(1,"../")
14

    
15
try:
16
    import unittest2 as unittest
17
except (ImportError):
18
    import unittest
19

    
20
import time
21

    
22
#from Manager import *
23
import OpenRTM_aist
24
import RTC, RTC__POA
25
import multiprocessing
26

    
27
testcomp1_spec = ["implementation_id", "TestComp1",
28
                 "type_name",         "TestComp1",
29
                 "description",       "Test example component",
30
                 "version",           "1.0",
31
                 "vendor",            "Nobuhiko Myiyamoto",
32
                 "category",          "example",
33
                 "activity_type",     "DataFlowComponent",
34
                 "max_instance",      "10",
35
                 "language",          "C++",
36
                 "lang_type",         "compile",
37
                 "conf.default.test1", "0",
38
                 ""]
39

    
40
testcomp2_spec = ["implementation_id", "TestComp2",
41
                 "type_name",         "TestComp2",
42
                 "description",       "Test example component",
43
                 "version",           "1.0",
44
                 "vendor",            "Nobuhiko Myiyamoto",
45
                 "category",          "example",
46
                 "activity_type",     "DataFlowComponent",
47
                 "max_instance",      "10",
48
                 "language",          "C++",
49
                 "lang_type",         "compile",
50
                 ""]
51

    
52

    
53

    
54

    
55
class TestComp1(OpenRTM_aist.DataFlowComponentBase):
56
  def __init__(self, manager):
57
    OpenRTM_aist.DataFlowComponentBase.__init__(self, manager)
58
    
59

    
60
  def onInitialize(self):
61

    
62
    return RTC.RTC_OK
63
  
64
class TestComp2(OpenRTM_aist.DataFlowComponentBase):
65
  def __init__(self, manager):
66
    OpenRTM_aist.DataFlowComponentBase.__init__(self, manager)
67

    
68
    
69
    return
70
  
71
  def onInitialize(self):
72

    
73
    
74
    return RTC.RTC_OK
75
    
76
    
77

    
78
    
79
def TestComp1Init(manager):
80
  profile = OpenRTM_aist.Properties(defaults_str=testcomp1_spec)
81
  manager.registerFactory(profile,
82
                          TestComp1,
83
                          OpenRTM_aist.Delete)
84

    
85

    
86
    
87
def TestComp2Init(manager):
88
  profile = OpenRTM_aist.Properties(defaults_str=testcomp2_spec)
89
  manager.registerFactory(profile,
90
                          TestComp2,
91
                          OpenRTM_aist.Delete)
92
  
93

    
94
def TestComp1ModuleInit(manager):
95
  TestComp1Init(manager)
96
  com = manager.createComponent("TestComp1")
97

    
98
def TestComp2ModuleInit(manager):
99
  TestComp2Init(manager)
100
  com = manager.createComponent("TestComp2")
101
  
102

    
103
def runTestComp2(q):
104
    
105
    argv = [""]
106
    argv.extend(['-d'])
107
    argv.extend(['-o','naming.type:corba,manager'])
108
    argv.extend(['-o','naming.formats:test.host_cxt/%n.rtc'])
109

    
110
    
111
    manager = OpenRTM_aist.Manager.init(argv)
112
    manager.setModuleInitProc(TestComp2ModuleInit)
113
    manager.activateManager()
114
    
115
    
116
    q.get()
117
        
118
    manager.shutdownManager()
119

    
120

    
121
class test_Manager_Servant_new(unittest.TestCase):
122
  
123
  def setUp(self):
124
    self.queue = multiprocessing.Queue()
125
    self.outport_process = multiprocessing.Process(target=runTestComp2, args=(self.queue,))
126
    self.outport_process.start()
127
    
128
    time.sleep(1)
129
    sys.argv.extend(['-o','naming.type:corba,manager'])
130
    sys.argv.extend(['-o','naming.formats:test.host_cxt/%n.rtc'])
131
    self.manager = OpenRTM_aist.Manager.init(sys.argv)
132
    self.manager.setModuleInitProc(TestComp1ModuleInit)
133
    self.manager.activateManager()
134
    
135

    
136
  def tearDown(self):
137
    self.manager.shutdownManager()
138
    self.queue.put("")
139
    time.sleep(0.1)
140

    
141
  def test_getComponent(self):
142
      #mgr_sev = self.manager._mgrservant.getObjRef()
143
      #print mgr_sev.get_components_by_name("example/TestComp10")
144
      rtcs = self.manager._namingManager.string_to_component("rtcloc://localhost:2810/example/TestComp20")
145
      name = rtcs[0].get_component_profile().instance_name
146
      self.assertEqual(name,"TestComp20")
147
      rtcs = self.manager._namingManager.string_to_component("rtcloc://*/example/TestComp20")
148
      name = rtcs[0].get_component_profile().instance_name
149
      self.assertEqual(name,"TestComp20")
150
      rtcs = self.manager._namingManager.string_to_component("rtcloc://*/*/TestComp20")
151
      name = rtcs[0].get_component_profile().instance_name
152
      self.assertEqual(name,"TestComp20")
153
      #print rtcs
154
      rtcs = self.manager._namingManager.string_to_component("rtcname://localhost/test.host_cxt/TestComp20")
155
      name = rtcs[0].get_component_profile().instance_name
156
      self.assertEqual(name,"TestComp20")
157
      rtcs = self.manager._namingManager.string_to_component("rtcname://*/test.host_cxt/TestComp20")
158
      name = rtcs[0].get_component_profile().instance_name
159
      self.assertEqual(name,"TestComp20")
160
      rtcs = self.manager._namingManager.string_to_component("rtcname://*/*/TestComp20")
161
      name = rtcs[0].get_component_profile().instance_name
162
      self.assertEqual(name,"TestComp20")
163
      
164

    
165
############### test #################
166
if __name__ == '__main__':
167
        unittest.main()