プロジェクト

全般

プロフィール

test_Manager_Servant_new.py

miyamoto, 2016/02/25 06:20

 
1
#!/usr/bin/env python
2
# -*- coding: euc-jp -*-
3

    
4
#
5
# \file test_Manager_12.py
6
# \brief 
7
# \date $Date: $
8
# \author Nobuhiko Miyamoto
9
#
10

    
11

    
12
import sys
13
sys.path.insert(1,"../")
14

    
15
try:
16
    import unittest2 as unittest
17
except (ImportError):
18
    import unittest
19

    
20
import time
21

    
22
#from Manager import *
23
import OpenRTM_aist
24
import RTC, RTC__POA
25
import multiprocessing
26

    
27
testcomp1_spec = ["implementation_id", "TestComp1",
28
                 "type_name",         "TestComp1",
29
                 "description",       "Test example component",
30
                 "version",           "1.0",
31
                 "vendor",            "Nobuhiko Myiyamoto",
32
                 "category",          "example",
33
                 "activity_type",     "DataFlowComponent",
34
                 "max_instance",      "10",
35
                 "language",          "C++",
36
                 "lang_type",         "compile",
37
                 "conf.default.test1", "0",
38
                 ""]
39

    
40
testcomp2_spec = ["implementation_id", "TestComp2",
41
                 "type_name",         "TestComp2",
42
                 "description",       "Test example component",
43
                 "version",           "1.0",
44
                 "vendor",            "Nobuhiko Myiyamoto",
45
                 "category",          "example",
46
                 "activity_type",     "DataFlowComponent",
47
                 "max_instance",      "10",
48
                 "language",          "C++",
49
                 "lang_type",         "compile",
50
                 ""]
51

    
52

    
53

    
54

    
55
class TestComp1(OpenRTM_aist.DataFlowComponentBase):
56
  def __init__(self, manager):
57
    OpenRTM_aist.DataFlowComponentBase.__init__(self, manager)
58
    
59

    
60
  def onInitialize(self):
61

    
62
    return RTC.RTC_OK
63
  
64
class TestComp2(OpenRTM_aist.DataFlowComponentBase):
65
  def __init__(self, manager):
66
    OpenRTM_aist.DataFlowComponentBase.__init__(self, manager)
67

    
68
    
69
    return
70
  
71
  def onInitialize(self):
72

    
73
    
74
    return RTC.RTC_OK
75
    
76
    
77

    
78
    
79
def TestComp1Init(manager):
80
  profile = OpenRTM_aist.Properties(defaults_str=testcomp1_spec)
81
  manager.registerFactory(profile,
82
                          TestComp1,
83
                          OpenRTM_aist.Delete)
84

    
85

    
86
    
87
def TestComp2Init(manager):
88
  profile = OpenRTM_aist.Properties(defaults_str=testcomp2_spec)
89
  manager.registerFactory(profile,
90
                          TestComp2,
91
                          OpenRTM_aist.Delete)
92
  
93

    
94
def TestComp1ModuleInit(manager):
95
  TestComp1Init(manager)
96
  com = manager.createComponent("TestComp1")
97

    
98
def TestComp2ModuleInit(manager):
99
  TestComp2Init(manager)
100
  com = manager.createComponent("TestComp2")
101
  
102

    
103
def runTestComp2(q):
104
    
105
    argv = [""]
106
    argv.extend(['-d'])
107
    argv.extend(['-o','naming.type:corba,manager'])
108

    
109
    
110
    manager = OpenRTM_aist.Manager.init(argv)
111
    manager.setModuleInitProc(TestComp2ModuleInit)
112
    manager.activateManager()
113
    
114
    
115
    q.get()
116
        
117
    manager.shutdownManager()
118

    
119

    
120
class test_CORBA_RTCUtil(unittest.TestCase):
121
  
122
  def setUp(self):
123
    self.queue = multiprocessing.Queue()
124
    self.outport_process = multiprocessing.Process(target=runTestComp2, args=(self.queue,))
125
    self.outport_process.start()
126
    
127
    time.sleep(1)
128
    sys.argv.extend(['-o','naming.type:corba,manager'])
129
    self.manager = OpenRTM_aist.Manager.init(sys.argv)
130
    self.manager.setModuleInitProc(TestComp1ModuleInit)
131
    self.manager.activateManager()
132
    
133

    
134
  def tearDown(self):
135
    self.manager.shutdownManager()
136
    self.queue.put("")
137
    time.sleep(0.1)
138

    
139
  def test_getComponent(self):
140
      #mgr_sev = self.manager._mgrservant.getObjRef()
141
      #print mgr_sev.get_components_by_name("example/TestComp10")
142
      rtc = self.manager._namingManager.string_to_component("rtcloc://localhost:2810/example/TestComp20")
143
      name = rtc[0].get_component_profile().instance_name
144
      self.assertEqual(name,"TestComp20")
145
    
146

    
147
############### test #################
148
if __name__ == '__main__':
149
        unittest.main()