Project

General

Profile

test_SharedMemory.py

miyamoto, 02/25/2016 11:14 PM

 
1
#!/usr/bin/env python
2
# -*- coding: euc-jp -*-
3

    
4
#
5
# \file test_Manager_12.py
6
# \brief 
7
# \date $Date: $
8
# \author Nobuhiko Miyamoto
9
#
10

    
11

    
12
import sys
13
sys.path.insert(1,"../")
14

    
15
try:
16
    import unittest2 as unittest
17
except (ImportError):
18
    import unittest
19

    
20
import time
21

    
22
#from Manager import *
23
import OpenRTM_aist
24
import RTC
25

    
26

    
27

    
28
  
29

    
30

    
31

    
32

    
33
class TestSharedMemory(unittest.TestCase):
34
  
35
  def setUp(self):
36
    sys.argv.extend(['-o', 'port.outport.out.shem_default_size:1k'])
37
    #sys.argv.extend(['-o', 'port.inport.in.shem_default_size:1k'])
38
    self.manager = OpenRTM_aist.Manager.init(sys.argv)
39
    self.manager.activateManager()
40

    
41
    self._d_in = RTC.TimedOctetSeq(RTC.Time(0,0),[])
42
    self._inIn = OpenRTM_aist.InPort("in", self._d_in)
43

    
44
    prop = self.manager.getConfig().getNode("port.inport.in")
45
    self._inIn.init(prop)
46
    self.inport_obj = self._inIn.getPortRef()
47

    
48

    
49
    self._d_out = RTC.TimedOctetSeq(RTC.Time(0,0),[])
50
    self._outOut = OpenRTM_aist.OutPort("out", self._d_out)
51
    prop = self.manager.getConfig().getNode("port.outport.out")
52
    self._outOut.init(prop)
53
    self.outport_obj = self._outOut.getPortRef()
54
    
55
    
56

    
57
  def tearDown(self):
58
    self.manager.shutdownManager()
59

    
60
  def test_Push(self):
61
    prop = OpenRTM_aist.Properties()
62
    prop.setProperty("dataport.interface_type","shared_memory")
63
    prop.setProperty("dataport.dataflow_type","push")
64
    ret = OpenRTM_aist.connect("con1",prop,self.inport_obj,self.outport_obj)
65
    
66
    self._d_out.data = "a"*10000
67
    self._outOut.write()
68

    
69
    ret = self._inIn.isNew()
70
    self.assertTrue(ret)
71

    
72
    data = self._inIn.read()
73
    self.assertEqual(data.data, self._d_out.data)
74

    
75
    
76
    self._d_out.data = "a"*50000
77
    self._outOut.write()
78

    
79
    data = self._inIn.read()
80
    self.assertEqual(data.data, self._d_out.data)
81
    
82

    
83
    self.outport_obj.disconnect_all()
84

    
85

    
86

    
87
  def test_Pull(self):
88
    prop = OpenRTM_aist.Properties()
89
    prop.setProperty("dataport.interface_type","shared_memory")
90
    prop.setProperty("dataport.dataflow_type","pull")
91
    ret = OpenRTM_aist.connect("con1",prop,self.inport_obj,self.outport_obj)
92
    
93
    self._d_out.data = "a"*10000
94
    self._outOut.write()
95

    
96
    #ret = self._inIn.isNew()
97
    #self.assertTrue(ret)
98

    
99
    data = self._inIn.read()
100
    self.assertEqual(data.data, self._d_out.data)
101

    
102
    
103
    self._d_out.data = "a"*50000
104
    self._outOut.write()
105
    
106

    
107
    data = self._inIn.read()
108
    self.assertEqual(data.data, self._d_out.data)
109
    
110

    
111
    self.outport_obj.disconnect_all()
112

    
113
############### test #################
114
if __name__ == '__main__':
115
        unittest.main()