Project

General

Profile

test_NumberingPolicy_node.py

miyamoto, 02/25/2016 02:25 PM

 
1
#!/usr/bin/env python
2
# -*- coding: euc-jp -*-
3

    
4
#
5
# \file test_Manager_12.py
6
# \brief 
7
# \date $Date: $
8
# \author Nobuhiko Miyamoto
9
#
10

    
11

    
12
import sys
13
sys.path.insert(1,"../")
14

    
15
try:
16
    import unittest2 as unittest
17
except (ImportError):
18
    import unittest
19

    
20
import time
21

    
22
#from Manager import *
23
import OpenRTM_aist
24
import RTC, RTC__POA
25
import multiprocessing
26

    
27
testcomp1_spec = ["implementation_id", "TestComp1",
28
                 "type_name",         "TestComp1",
29
                 "description",       "Test example component",
30
                 "version",           "1.0",
31
                 "vendor",            "Nobuhiko Myiyamoto",
32
                 "category",          "example",
33
                 "activity_type",     "DataFlowComponent",
34
                 "max_instance",      "10",
35
                 "language",          "C++",
36
                 "lang_type",         "compile",
37
                 "conf.default.test1", "0",
38
                 ""]
39

    
40

    
41

    
42

    
43

    
44

    
45
class TestComp1(OpenRTM_aist.DataFlowComponentBase):
46
  def __init__(self, manager):
47
    OpenRTM_aist.DataFlowComponentBase.__init__(self, manager)
48
    
49

    
50
  def onInitialize(self):
51

    
52
    return RTC.RTC_OK
53
  
54

    
55
    
56
    
57

    
58
    
59
def TestComp1Init(manager):
60
  profile = OpenRTM_aist.Properties(defaults_str=testcomp1_spec)
61
  manager.registerFactory(profile,
62
                          TestComp1,
63
                          OpenRTM_aist.Delete)
64

    
65

    
66
    
67

    
68
  
69

    
70
def TestComp1ModuleInit(manager):
71
  TestComp1Init(manager)
72
  com = manager.createComponent("TestComp1")
73

    
74

    
75
  
76

    
77
def runTestComp2(q):
78
    
79
    argv = [""]
80
    argv.extend(['-d'])
81
    argv.extend(['-o','manager.components.naming_policy:node_unique'])
82

    
83
    
84
    manager = OpenRTM_aist.Manager.init(argv)
85
    manager.setModuleInitProc(TestComp1ModuleInit)
86
    manager.activateManager()
87
    
88
    
89
    q.get()
90
        
91
    manager.shutdownManager()
92

    
93

    
94
class test_CORBA_RTCUtil(unittest.TestCase):
95
  
96
  def setUp(self):
97
    self.queue = multiprocessing.Queue()
98
    self.outport_process = multiprocessing.Process(target=runTestComp2, args=(self.queue,))
99
    self.outport_process.start()
100
    
101
    time.sleep(1)
102
    sys.argv.extend(['-o','manager.components.naming_policy:node_unique'])
103
    self.manager = OpenRTM_aist.Manager.init(sys.argv)
104
    self.manager.setModuleInitProc(TestComp1ModuleInit)
105
    self.manager.activateManager()
106
    
107

    
108
  def tearDown(self):
109
    self.manager.shutdownManager()
110
    self.queue.put("")
111
    time.sleep(0.1)
112

    
113
  def test_getComponent(self):
114
      comp = self.manager.getComponent("TestComp11")
115
      self.assertTrue(comp is not None)
116
      
117
    
118

    
119
############### test #################
120
if __name__ == '__main__':
121
        unittest.main()