Project

General

Profile

test_DirectPort.py

miyamoto, 01/14/2016 10:45 PM

 
1
#!/usr/bin/env python
2
# -*- coding: euc-jp -*-
3

    
4
#
5
# \file test_Manager_12.py
6
# \brief 
7
# \date $Date: $
8
# \author Nobuhiko Miyamoto
9
#
10

    
11

    
12
import sys
13
sys.path.insert(1,"../")
14

    
15
try:
16
    import unittest2 as unittest
17
except (ImportError):
18
    import unittest
19

    
20
import time
21

    
22
#from Manager import *
23
import OpenRTM_aist
24
import RTC
25

    
26

    
27

    
28
  
29

    
30

    
31

    
32

    
33
class TestDirectPort(unittest.TestCase):
34
  
35
  def setUp(self):
36
    self.manager = OpenRTM_aist.Manager.init(sys.argv)
37
    self.manager.activateManager()
38

    
39
    self._d_in = RTC.TimedLong(RTC.Time(0,0),0)
40
    self._inIn = OpenRTM_aist.InPort("in", self._d_in)
41
    prop = OpenRTM_aist.Properties()
42
    self._inIn.init(prop)
43
    self.inport_obj = self._inIn.getPortRef()
44

    
45

    
46
    self._d_out = RTC.TimedLong(RTC.Time(0,0),0)
47
    self._outOut = OpenRTM_aist.OutPort("out", self._d_out)
48
    prop = OpenRTM_aist.Properties()
49
    self._outOut.init(prop)
50
    self.outport_obj = self._outOut.getPortRef()
51
    
52
    
53

    
54
  def tearDown(self):
55
    self.manager.shutdownManager()
56

    
57
  def test_Push(self):
58
    prop = OpenRTM_aist.Properties()
59
    prop.setProperty("dataport.interface_type","direct")
60
    prop.setProperty("dataport.dataflow_type","push")
61
    ret = OpenRTM_aist.connect("con1",prop,self.inport_obj,self.outport_obj)
62
    
63
    self._d_out.data = 100
64
    self._outOut.write()
65

    
66
    ret = self._inIn.isNew()
67
    self.assertTrue(ret)
68

    
69
    data = self._inIn.read()
70
    self.assertEqual(data.data, 100)
71
    self.assertTrue(data is self._d_out)
72

    
73
    self.outport_obj.disconnect_all()
74

    
75

    
76

    
77
  def test_Pull(self):
78
    prop = OpenRTM_aist.Properties()
79
    prop.setProperty("dataport.interface_type","direct")
80
    prop.setProperty("dataport.dataflow_type","pull")
81
    ret = OpenRTM_aist.connect("con1",prop,self.inport_obj,self.outport_obj)
82
    
83
    self._d_out.data = 100
84
    self._outOut.write()
85

    
86
    #ret = self._inIn.isNew()
87
    #self.assertTrue(ret)
88

    
89
    data = self._inIn.read()
90
    self.assertEqual(data.data, 100)
91
    self.assertTrue(data is self._d_out)
92

    
93
    self.outport_obj.disconnect_all()
94

    
95
############### test #################
96
if __name__ == '__main__':
97
        unittest.main()